You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "BjornPrime (via GitHub)" <gi...@apache.org> on 2023/03/24 15:45:32 UTC

[GitHub] [beam] BjornPrime opened a new pull request, #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

BjornPrime opened a new pull request, #25965:
URL: https://github.com/apache/beam/pull/25965

   Addresses #25676.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1582639345

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1576890025

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1601567910

   > For the performance, I will do some investigation on my own next week.
   
   Is there any new information about this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1551647423

   I'm not sure if that resolves my issue. I'm seeing this, and I can't resolve the conflicts myself.
   ![image](https://github.com/apache/beam/assets/32173247/a4354831-cde9-49b0-bd00-7f47c341c927)
   It's also showing fewer tests running than normal, though I'm not sure if that's related to the conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1210679761


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -788,111 +788,116 @@ def noop(table, **kwargs):
               with_auto_sharding=True,
               test_client=client))
 
-  @parameterized.expand([
-      param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_message='accessDenied'),
-      param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_message='backendError')
-  ])
-  def test_load_job_exception(self, exception_type, error_message):
-
-    with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
-                     'Insert') as mock_load_job,\
-      mock.patch('apache_beam.io.gcp.internal.clients'
-                 '.storage.storage_v1_client.StorageV1.ObjectsService'),\
-      mock.patch('time.sleep'),\
-      self.assertRaises(Exception) as exc,\
-      beam.Pipeline() as p:
-
-      mock_load_job.side_effect = exception_type(error_message)
-
-      _ = (
-          p
-          | beam.Create([{
-              'columnA': 'value1'
-          }])
-          | WriteToBigQuery(
-              table='project:dataset.table',
-              schema={
-                  'fields': [{
-                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                  }]
-              },
-              create_disposition='CREATE_NEVER',
-              custom_gcs_temp_location="gs://temp_location",
-              method='FILE_LOADS'))
-
-    mock_load_job.assert_called()
-    self.assertIn(error_message, exc.exception.args[0])
-
-  @parameterized.expand([
-      param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_message='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_message='internalError'),
-  ])
-  def test_copy_load_job_exception(self, exception_type, error_message):
-
-    from apache_beam.io.gcp import bigquery_file_loads
-
-    old_max_file_size = bigquery_file_loads._DEFAULT_MAX_FILE_SIZE
-    old_max_partition_size = bigquery_file_loads._MAXIMUM_LOAD_SIZE
-    old_max_files_per_partition = bigquery_file_loads._MAXIMUM_SOURCE_URIS
-    bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = 15
-    bigquery_file_loads._MAXIMUM_LOAD_SIZE = 30
-    bigquery_file_loads._MAXIMUM_SOURCE_URIS = 1
-
-    with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
-                        'Insert') as mock_insert_copy_job, \
-      mock.patch.object(BigQueryWrapper,
-                        'perform_load_job') as mock_load_job, \
-      mock.patch.object(BigQueryWrapper,
-                        'wait_for_bq_job'), \
-      mock.patch('apache_beam.io.gcp.internal.clients'
-        '.storage.storage_v1_client.StorageV1.ObjectsService'), \
-      mock.patch('time.sleep'), \
-      self.assertRaises(Exception) as exc, \
-      beam.Pipeline() as p:
-
-      mock_insert_copy_job.side_effect = exception_type(error_message)
-
-      dummy_job_reference = beam.io.gcp.internal.clients.bigquery.JobReference()
-      dummy_job_reference.jobId = 'job_id'
-      dummy_job_reference.location = 'US'
-      dummy_job_reference.projectId = 'apache-beam-testing'
-
-      mock_load_job.return_value = dummy_job_reference
-
-      _ = (
-          p
-          | beam.Create([{
-              'columnA': 'value1'
-          }, {
-              'columnA': 'value2'
-          }, {
-              'columnA': 'value3'
-          }])
-          | WriteToBigQuery(
-              table='project:dataset.table',
-              schema={
-                  'fields': [{
-                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                  }]
-              },
-              create_disposition='CREATE_NEVER',
-              custom_gcs_temp_location="gs://temp_location",
-              method='FILE_LOADS'))
-
-    bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = old_max_file_size
-    bigquery_file_loads._MAXIMUM_LOAD_SIZE = old_max_partition_size
-    bigquery_file_loads._MAXIMUM_SOURCE_URIS = old_max_files_per_partition
 
-    self.assertEqual(4, mock_insert_copy_job.call_count)
-    self.assertIn(error_message, exc.exception.args[0])
+#   ## Commented out due to unittest.skip not working

Review Comment:
   How about put skip inside `parameterized.expand`? Or you can even skip it inside the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1213269143


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -162,20 +124,16 @@ class GcsIOError(IOError, retry.PermanentException):
 class GcsIO(object):
   """Google Cloud Storage I/O client."""
   def __init__(self, storage_client=None, pipeline_options=None):
-    # type: (Optional[storage.StorageV1], Optional[Union[dict, PipelineOptions]]) -> None
+    # type: (Optional[storage.Client], Optional[Union[dict, PipelineOptions]]) -> None
     if storage_client is None:
       if not pipeline_options:
         pipeline_options = PipelineOptions()
       elif isinstance(pipeline_options, dict):
         pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
-      storage_client = storage.StorageV1(
-          credentials=auth.get_service_credentials(pipeline_options),
-          get_credentials=False,
-          http=get_new_http(),
-          response_encoding='utf8',
-          additional_http_headers={
-              "User-Agent": "apache-beam-%s" % apache_beam.__version__
-          })
+      credentials = auth.get_service_credentials(pipeline_options)
+      if credentials:
+        credentials = credentials.get_google_auth_credentials()

Review Comment:
   oops this was a typo, I meant
   ```
   if credentials is not None:
     storage_client = storage.Client(credentials=credentials.get_google_auth_credentials())
   else:
     storage_client = storage.Client.create_anonymous_client()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1574339708

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1215138142


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))

Review Comment:
   I see you are write for the previous behavior, and it should be considered a bug (after 100th element ignored). This might be a source of leftover temp test files in our test system. How about divide the incoming files into slices of 100, and sending batch request sequentially?
   
   ```
   for i in range(0, len(paths), MAX_BATCH_OPERATION_SIZE):
     slice = paths[i:i + MAX_BATCH_OPERATION_SIZE]
     try:
       with self.client.batch():
         for path in paths::
         bucket_name, blob_path = parse_gcs_path(path)
         bucket = self.client.get_bucket(bucket_name)
         blob = storage.Blob(blob_path, bucket)
         blob.delete()
     except Exception as err:
       ...
   ```
   
   btw max batch size is 1000 for storage client:  https://github.com/googleapis/python-storage/blob/2b449cd289cb573ca1653bba37ecb84d35a025ad/google/cloud/storage/batch.py#LL138C5-L138C21)
   
   Looking into the client library source code, it only tracks the lastly saw error (and drops previous one): https://github.com/googleapis/python-storage/blob/2b449cd289cb573ca1653bba37ecb84d35a025ad/google/cloud/storage/batch.py#LL240C19-L240C19 So we would not able to get the status of single delete requests as it is for now.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1614022437

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1558002738

   you can force-push.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1551804491

   other useful commands if you mess something up during the rebase:
   
   `git reflog --oneline`  - find revision numbers
   
   `git reset --hard HEAD@{3}` - go to the prior state of your repo (replace 3 with necessary number as per reflog.
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1198279700


##########
sdks/python/apache_beam/examples/complete/game/user_score.py:
##########
@@ -177,12 +178,20 @@ def format_user_score_sums(user_score):
       (user, score) = user_score
       return 'user: %s, total_score: %s' % (user, score)
 
-    (  # pylint: disable=expression-not-assigned
-        p
-        | 'ReadInputText' >> beam.io.ReadFromText(args.input)
-        | 'UserScore' >> UserScore()
-        | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
-        | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
+    try:

Review Comment:
   The exception is thrown due to our test environments not having credentials be available. They should work in production environments. I've filed #26774 to have credentials added to the environments so the try-except blocks can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1516722116

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1507323446

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1558004097

   If git rebase showing conflicts
   
   after resolve conflicts, run `git add -u` then `git rebase --continue`
   
   at anypoint, you can check the status of files involved by `git status`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1551798513

   you can rebase locally and re-push the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1206052795


##########
sdks/python/setup.py:
##########
@@ -309,6 +309,7 @@ def get_portability_package_data():
             'google-cloud-datastore>=2.0.0,<3',
             'google-cloud-pubsub>=2.1.0,<3',
             'google-cloud-pubsublite>=1.2.0,<2',
+            'google-cloud-storage>=2.7.0,<3',

Review Comment:
   we should update CHANGES.md about this client library change for 2.49.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1199344220


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
+    with self.client.batch():
+      for path in paths:
+        bucket_name, blob_path = parse_gcs_path(path)
+        bucket = self.client.get_bucket(bucket_name)
+        blob = storage.Blob(blob_path, bucket)
         exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
+        try:
+          blob.delete()

Review Comment:
   I'm fairly sure based on my interpretation of this method: https://github.com/googleapis/python-storage/blob/2b449cd289cb573ca1653bba37ecb84d35a025ad/google/cloud/storage/batch.py#L222, but we don't explicitly test this, so I'll add a test just to make sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1213492906


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -162,20 +124,16 @@ class GcsIOError(IOError, retry.PermanentException):
 class GcsIO(object):
   """Google Cloud Storage I/O client."""
   def __init__(self, storage_client=None, pipeline_options=None):
-    # type: (Optional[storage.StorageV1], Optional[Union[dict, PipelineOptions]]) -> None
+    # type: (Optional[storage.Client], Optional[Union[dict, PipelineOptions]]) -> None
     if storage_client is None:
       if not pipeline_options:
         pipeline_options = PipelineOptions()
       elif isinstance(pipeline_options, dict):
         pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
-      storage_client = storage.StorageV1(
-          credentials=auth.get_service_credentials(pipeline_options),
-          get_credentials=False,
-          http=get_new_http(),
-          response_encoding='utf8',
-          additional_http_headers={
-              "User-Agent": "apache-beam-%s" % apache_beam.__version__
-          })
+      credentials = auth.get_service_credentials(pipeline_options)
+      if credentials:
+        credentials = credentials.get_google_auth_credentials()

Review Comment:
   Local environment tests GitHub Action now succeeded



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1212301218


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -162,20 +124,16 @@ class GcsIOError(IOError, retry.PermanentException):
 class GcsIO(object):
   """Google Cloud Storage I/O client."""
   def __init__(self, storage_client=None, pipeline_options=None):
-    # type: (Optional[storage.StorageV1], Optional[Union[dict, PipelineOptions]]) -> None
+    # type: (Optional[storage.Client], Optional[Union[dict, PipelineOptions]]) -> None
     if storage_client is None:
       if not pipeline_options:
         pipeline_options = PipelineOptions()
       elif isinstance(pipeline_options, dict):
         pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
-      storage_client = storage.StorageV1(
-          credentials=auth.get_service_credentials(pipeline_options),
-          get_credentials=False,
-          http=get_new_http(),
-          response_encoding='utf8',
-          additional_http_headers={
-              "User-Agent": "apache-beam-%s" % apache_beam.__version__
-          })
+      credentials = auth.get_service_credentials(pipeline_options)
+      if credentials:
+        credentials = credentials.get_google_auth_credentials()

Review Comment:
   ```suggestion
         if credentials is not None:
           credentials = credentials.get_google_auth_credentials()
         else:
           credentials = storage.Client.create_anonymous_client()
   ```
   
   This fixes the test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1213257008


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -162,20 +124,16 @@ class GcsIOError(IOError, retry.PermanentException):
 class GcsIO(object):
   """Google Cloud Storage I/O client."""
   def __init__(self, storage_client=None, pipeline_options=None):
-    # type: (Optional[storage.StorageV1], Optional[Union[dict, PipelineOptions]]) -> None
+    # type: (Optional[storage.Client], Optional[Union[dict, PipelineOptions]]) -> None
     if storage_client is None:
       if not pipeline_options:
         pipeline_options = PipelineOptions()
       elif isinstance(pipeline_options, dict):
         pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
-      storage_client = storage.StorageV1(
-          credentials=auth.get_service_credentials(pipeline_options),
-          get_credentials=False,
-          http=get_new_http(),
-          response_encoding='utf8',
-          additional_http_headers={
-              "User-Agent": "apache-beam-%s" % apache_beam.__version__
-          })
+      credentials = auth.get_service_credentials(pipeline_options)
+      if credentials:
+        credentials = credentials.get_google_auth_credentials()

Review Comment:
   Should we not just directly generate AnonymousCredentials? Then we're returning a google.auth.credentials.Credentials object instead of a client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1158892009


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -162,20 +144,13 @@ class GcsIOError(IOError, retry.PermanentException):
 class GcsIO(object):
   """Google Cloud Storage I/O client."""
   def __init__(self, storage_client=None, pipeline_options=None):
-    # type: (Optional[storage.StorageV1], Optional[Union[dict, PipelineOptions]]) -> None

Review Comment:
   can we keep typehints? or are they very different?



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -303,149 +256,79 @@ def delete_batch(self, paths):
       paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
       if not paths_chunk:
         return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+      with self.client.batch():
+        for path in paths_chunk:
+          bucket, blob_path = parse_gcs_path(path)
+          blob = storage.Blob(blob_path, bucket)
+          exception = None
+          try:
+            blob.delete()
+          except Exception as err:
+            if err is NotFound:
+              exception = None
+            else:
+              exception = err
+          finally:
+            result_statuses.append((path, exception))
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
+      !!!

Review Comment:
   typo?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1536722600

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1237633868


##########
sdks/python/apache_beam/examples/complete/game/user_score.py:
##########
@@ -177,12 +178,20 @@ def format_user_score_sums(user_score):
       (user, score) = user_score
       return 'user: %s, total_score: %s' % (user, score)
 
-    (  # pylint: disable=expression-not-assigned
-        p
-        | 'ReadInputText' >> beam.io.ReadFromText(args.input)
-        | 'UserScore' >> UserScore()
-        | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
-        | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
+    try:

Review Comment:
   I'm not sure why they're there now, the issue is resolved. I'll remove them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1242653443


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -270,186 +213,86 @@ def delete(self, path):
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsDeleteRequest(
-        bucket=bucket, object=object_path)
+    bucket_name, target_name = parse_gcs_path(path)
     try:
-      self.client.objects.Delete(request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # Return success when the file doesn't exist anymore for idempotency.
-        return
-      raise
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
+      bucket = self.client.get_bucket(bucket_name)
+      bucket.delete_blob(target_name)
+    except NotFound:
+      return
+
   def delete_batch(self, paths):
     """Deletes the objects at the given GCS paths.
 
     Args:
       paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
              not to exceed MAX_BATCH_OPERATION_SIZE in length.
-
-    Returns: List of tuples of (path, exception) in the same order as the paths
-             argument, where exception is None if the operation succeeded or
-             the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    start = 0
+    while start < len(paths):
+      if (start + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[start:start + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[start:]
+      try:
+        with self.client.batch():
+          for path in current_paths:
+            bucket_name, blob_path = parse_gcs_path(path)
+            bucket = self.client.get_bucket(bucket_name)
+            blob = storage.Blob(blob_path, bucket)
+            blob.delete()
+      except NotFound:
+        pass
+      start += MAX_BATCH_OPERATION_SIZE
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+    src_bucket_name, src_path = parse_gcs_path(src)

Review Comment:
   Updated to blob_name or src/dest_blob_name throughout the file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1237725603


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -270,186 +213,86 @@ def delete(self, path):
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsDeleteRequest(
-        bucket=bucket, object=object_path)
+    bucket_name, target_name = parse_gcs_path(path)
     try:
-      self.client.objects.Delete(request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # Return success when the file doesn't exist anymore for idempotency.
-        return
-      raise
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
+      bucket = self.client.get_bucket(bucket_name)
+      bucket.delete_blob(target_name)
+    except NotFound:
+      return
+
   def delete_batch(self, paths):
     """Deletes the objects at the given GCS paths.
 
     Args:
       paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
              not to exceed MAX_BATCH_OPERATION_SIZE in length.
-
-    Returns: List of tuples of (path, exception) in the same order as the paths
-             argument, where exception is None if the operation succeeded or
-             the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    start = 0
+    while start < len(paths):
+      if (start + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[start:start + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[start:]
+      try:
+        with self.client.batch():
+          for path in current_paths:
+            bucket_name, blob_path = parse_gcs_path(path)
+            bucket = self.client.get_bucket(bucket_name)
+            blob = storage.Blob(blob_path, bucket)
+            blob.delete()
+      except NotFound:
+        pass
+      start += MAX_BATCH_OPERATION_SIZE
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+    src_bucket_name, src_path = parse_gcs_path(src)

Review Comment:
   src_path / dest_path as a confiusing name. let's use src_object_name or src_blob_name. would be good to use the same name for consistency throughout this file.
   
   Note that in GCS there is no concept of path or subdirectories. there are buckets and objects. forward-slashes are simply characters in object names.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1212261011


##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
   with beam.Pipeline(options=pipeline_options) as p:
-
-    lines = p | ReadFromText(known_args.input)
-
-    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
-    split_lines_result = (
-        lines
-        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
-            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
-            main='words'))
-
-    # split_lines_result is an object of type DoOutputsTuple. It supports
-    # accessing result in alternative ways.
-    words, _, _ = split_lines_result
-    short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
-    character_count = split_lines_result.tag_character_count
-
-    # pylint: disable=expression-not-assigned
-    (
-        character_count
-        | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
-        | beam.GroupByKey()
-        | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
-        | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        short_words
-        | 'count short words' >> CountWords()
-        |
-        'write short words' >> WriteToText(known_args.output + '-short-words'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        words
-        | 'count words' >> CountWords()
-        | 'write words' >> WriteToText(known_args.output + '-words'))
+    try:
+      lines = p | ReadFromText(known_args.input)
+
+      # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+      split_lines_result = (
+          lines
+          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+              SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+              main='words'))
+
+      # split_lines_result is an object of type DoOutputsTuple. It supports
+      # accessing result in alternative ways.
+      words, _, _ = split_lines_result
+      short_words = split_lines_result[
+          SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+      character_count = split_lines_result.tag_character_count
+
+      # pylint: disable=expression-not-assigned
+      (
+          character_count
+          | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+          | beam.GroupByKey()
+          | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+          | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          short_words
+          | 'count short words' >> CountWords()
+          | 'write short words' >>
+          WriteToText(known_args.output + '-short-words'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          words
+          | 'count words' >> CountWords()
+          | 'write words' >> WriteToText(known_args.output + '-words'))
+    except BeamIOError as err:

Review Comment:
   opened #23785 for testing (removing changes to examples). I tested locally with either
   - gcloud application-default login
   - a service account
   
   wordcount_minimal (`python -m apache_beam.examples.wordcount_minimal --output gs://clouddfe-yihu-test/temp/output-min.txt`) just works
   
   will see the exact error on CI/CD



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1210626027


##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
   with beam.Pipeline(options=pipeline_options) as p:
-
-    lines = p | ReadFromText(known_args.input)
-
-    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
-    split_lines_result = (
-        lines
-        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
-            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
-            main='words'))
-
-    # split_lines_result is an object of type DoOutputsTuple. It supports
-    # accessing result in alternative ways.
-    words, _, _ = split_lines_result
-    short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
-    character_count = split_lines_result.tag_character_count
-
-    # pylint: disable=expression-not-assigned
-    (
-        character_count
-        | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
-        | beam.GroupByKey()
-        | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
-        | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        short_words
-        | 'count short words' >> CountWords()
-        |
-        'write short words' >> WriteToText(known_args.output + '-short-words'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        words
-        | 'count words' >> CountWords()
-        | 'write words' >> WriteToText(known_args.output + '-words'))
+    try:
+      lines = p | ReadFromText(known_args.input)
+
+      # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+      split_lines_result = (
+          lines
+          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+              SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+              main='words'))
+
+      # split_lines_result is an object of type DoOutputsTuple. It supports
+      # accessing result in alternative ways.
+      words, _, _ = split_lines_result
+      short_words = split_lines_result[
+          SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+      character_count = split_lines_result.tag_character_count
+
+      # pylint: disable=expression-not-assigned
+      (
+          character_count
+          | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+          | beam.GroupByKey()
+          | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+          | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          short_words
+          | 'count short words' >> CountWords()
+          | 'write short words' >>
+          WriteToText(known_args.output + '-short-words'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          words
+          | 'count words' >> CountWords()
+          | 'write words' >> WriteToText(known_args.output + '-words'))
+    except BeamIOError as err:

Review Comment:
   I have created an issue (#26774) to update the test environments to include google.auth credentials, the absence of which is causing these tests to fail. For the moment, the try-except blocks only catch the specific error that is caused by that deficiency in our testing environments. With the apitools implementation, a lack of appropriate credentials didn't raise an exception but GCS still couldn't be accessed, so these changes don't alter the functioning of the test, they preserve it by handling the error that GCS (correctly) raises in a situation when apitools wouldn't. Does that address your concern?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1574060065

   I'm updating `gcsio_integration_test.py` now. I don't think it needs to test kms_key since we're not really managing that anymore. In fact, I dropped it from most method signatures in GCSIO since it was marked as experimental and not necessarily backwards compatible (though given our history with "experimental" features, I'm open to revisiting that decision in order to avoid breakages).
   
   As far as performance goes, I'm not convinced it needs further investigation. The regressions we initially saw were due to the tests being non-representative of typical jobs and under more typical jobs the regression disappeared. There are improvements that could be made to our perf testing approach but I think that's a separate issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1536601615

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1509054396

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1515191639

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1196696610


##########
sdks/python/apache_beam/io/filesystem.py:
##########
@@ -219,28 +219,33 @@ def _initialize_compressor(self):
 
   def readable(self):
     # type: () -> bool
-    mode = self._file.mode
-    return 'r' in mode or 'a' in mode
+    try:
+      return 'r' in self._file.mode or 'a' in self._file.mode
+    except AttributeError:
+      return self._file.readable is True
 
   def writeable(self):
     # type: () -> bool
-    mode = self._file.mode
-    return 'w' in mode or 'a' in mode
+    try:
+      return 'w' in self._file.mode or 'a' in self._file.mode
+    except AttributeError:
+      return self._file.writable is True
 
   def write(self, data):
     # type: (bytes) -> None
 
     """Write data to file."""
     if not self._compressor:
-      raise ValueError('compressor not initialized')
+      self._initialize_compressor()

Review Comment:
   As discussed, I'll be restoring this file and implementing a wrapper class for BlobReader and BlobWriter in gcsio.py



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1159854581


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -162,20 +144,13 @@ class GcsIOError(IOError, retry.PermanentException):
 class GcsIO(object):
   """Google Cloud Storage I/O client."""
   def __init__(self, storage_client=None, pipeline_options=None):
-    # type: (Optional[storage.StorageV1], Optional[Union[dict, PipelineOptions]]) -> None

Review Comment:
   Yeah, I guess I can just update storage.StorageV1 to storage.Client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1611587973

   Run Python Dataflow ValidatesContainer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1245601310


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -59,20 +48,10 @@
 
 _LOGGER = logging.getLogger(__name__)
 
-# Issue a friendlier error message if the storage library is not available.
-# TODO(silviuc): Remove this guard when storage is available everywhere.
 try:
-  # pylint: disable=wrong-import-order, wrong-import-position
-  # pylint: disable=ungrouped-imports
-  from apitools.base.py.batch import BatchApiRequest
-  from apitools.base.py.exceptions import HttpError
-  from apitools.base.py import transfer
   from apache_beam.internal.gcp import auth
-  from apache_beam.io.gcp.internal.clients import storage
 except ImportError:
-  raise ImportError(
-      'Google Cloud Storage I/O not supported for this execution environment '
-      '(could not import storage API client).')
+  raise ImportError('Internal auth library not found')

Review Comment:
   Okay. Removed the exception handling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1536436616

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1551779245

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1237607711


##########
sdks/python/apache_beam/examples/complete/game/user_score.py:
##########
@@ -177,12 +178,20 @@ def format_user_score_sums(user_score):
       (user, score) = user_score
       return 'user: %s, total_score: %s' % (user, score)
 
-    (  # pylint: disable=expression-not-assigned
-        p
-        | 'ReadInputText' >> beam.io.ReadFromText(args.input)
-        | 'UserScore' >> UserScore()
-        | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
-        | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
+    try:

Review Comment:
   I am hoping that this is no longer necessary, but if it were necessary, i'd expect to have reasons in the PR description. I am sure this was discussed in PR review somewhere, but comments are hard to dig.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1237605944


##########
sdks/python/apache_beam/examples/complete/game/user_score.py:
##########
@@ -177,12 +178,20 @@ def format_user_score_sums(user_score):
       (user, score) = user_score
       return 'user: %s, total_score: %s' % (user, score)
 
-    (  # pylint: disable=expression-not-assigned
-        p
-        | 'ReadInputText' >> beam.io.ReadFromText(args.input)
-        | 'UserScore' >> UserScore()
-        | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
-        | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
+    try:

Review Comment:
   the issue is closed, but I still see the try-excepts. Are they still necessary?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1601746239

   left some comments, will defer to @Abacn for final merge. Per offline discussion we are also waiting for a new release of GCS client - we should increase the lower bound appropriately. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1242660618


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -59,20 +48,10 @@
 
 _LOGGER = logging.getLogger(__name__)
 
-# Issue a friendlier error message if the storage library is not available.
-# TODO(silviuc): Remove this guard when storage is available everywhere.
 try:
-  # pylint: disable=wrong-import-order, wrong-import-position
-  # pylint: disable=ungrouped-imports
-  from apitools.base.py.batch import BatchApiRequest
-  from apitools.base.py.exceptions import HttpError
-  from apitools.base.py import transfer
   from apache_beam.internal.gcp import auth
-  from apache_beam.io.gcp.internal.clients import storage
 except ImportError:
-  raise ImportError(
-      'Google Cloud Storage I/O not supported for this execution environment '
-      '(could not import storage API client).')
+  raise ImportError('Internal auth library not found')

Review Comment:
   Sorry, are you saying we shouldn't expect this import to fail or that we should handle the failure by creating a boolean to track whether or not it succeeded?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn merged pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn merged PR #25965:
URL: https://github.com/apache/beam/pull/25965


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1611694465

   Run Python Dataflow ValidatesContainer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1507032519

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1516646068

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1210677588


##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
   with beam.Pipeline(options=pipeline_options) as p:
-
-    lines = p | ReadFromText(known_args.input)
-
-    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
-    split_lines_result = (
-        lines
-        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
-            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
-            main='words'))
-
-    # split_lines_result is an object of type DoOutputsTuple. It supports
-    # accessing result in alternative ways.
-    words, _, _ = split_lines_result
-    short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
-    character_count = split_lines_result.tag_character_count
-
-    # pylint: disable=expression-not-assigned
-    (
-        character_count
-        | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
-        | beam.GroupByKey()
-        | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
-        | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        short_words
-        | 'count short words' >> CountWords()
-        |
-        'write short words' >> WriteToText(known_args.output + '-short-words'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        words
-        | 'count words' >> CountWords()
-        | 'write words' >> WriteToText(known_args.output + '-words'))
+    try:
+      lines = p | ReadFromText(known_args.input)
+
+      # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+      split_lines_result = (
+          lines
+          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+              SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+              main='words'))
+
+      # split_lines_result is an object of type DoOutputsTuple. It supports
+      # accessing result in alternative ways.
+      words, _, _ = split_lines_result
+      short_words = split_lines_result[
+          SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+      character_count = split_lines_result.tag_character_count
+
+      # pylint: disable=expression-not-assigned
+      (
+          character_count
+          | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+          | beam.GroupByKey()
+          | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+          | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          short_words
+          | 'count short words' >> CountWords()
+          | 'write short words' >>
+          WriteToText(known_args.output + '-short-words'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          words
+          | 'count words' >> CountWords()
+          | 'write words' >> WriteToText(known_args.output + '-words'))
+    except BeamIOError as err:

Review Comment:
   ok, I am going to verify that pipeline still run on full graph for tests listed #26774 
   
   In any case, a change need to modify word count example is concerning in terms of breaking change. We'd better find a way that not need to change unrelated test so substantially



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1211950706


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -788,111 +788,116 @@ def noop(table, **kwargs):
               with_auto_sharding=True,
               test_client=client))
 
-  @parameterized.expand([
-      param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_message='accessDenied'),
-      param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_message='backendError')
-  ])
-  def test_load_job_exception(self, exception_type, error_message):
-
-    with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
-                     'Insert') as mock_load_job,\
-      mock.patch('apache_beam.io.gcp.internal.clients'
-                 '.storage.storage_v1_client.StorageV1.ObjectsService'),\
-      mock.patch('time.sleep'),\
-      self.assertRaises(Exception) as exc,\
-      beam.Pipeline() as p:
-
-      mock_load_job.side_effect = exception_type(error_message)
-
-      _ = (
-          p
-          | beam.Create([{
-              'columnA': 'value1'
-          }])
-          | WriteToBigQuery(
-              table='project:dataset.table',
-              schema={
-                  'fields': [{
-                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                  }]
-              },
-              create_disposition='CREATE_NEVER',
-              custom_gcs_temp_location="gs://temp_location",
-              method='FILE_LOADS'))
-
-    mock_load_job.assert_called()
-    self.assertIn(error_message, exc.exception.args[0])
-
-  @parameterized.expand([
-      param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_message='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_message='internalError'),
-  ])
-  def test_copy_load_job_exception(self, exception_type, error_message):
-
-    from apache_beam.io.gcp import bigquery_file_loads
-
-    old_max_file_size = bigquery_file_loads._DEFAULT_MAX_FILE_SIZE
-    old_max_partition_size = bigquery_file_loads._MAXIMUM_LOAD_SIZE
-    old_max_files_per_partition = bigquery_file_loads._MAXIMUM_SOURCE_URIS
-    bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = 15
-    bigquery_file_loads._MAXIMUM_LOAD_SIZE = 30
-    bigquery_file_loads._MAXIMUM_SOURCE_URIS = 1
-
-    with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
-                        'Insert') as mock_insert_copy_job, \
-      mock.patch.object(BigQueryWrapper,
-                        'perform_load_job') as mock_load_job, \
-      mock.patch.object(BigQueryWrapper,
-                        'wait_for_bq_job'), \
-      mock.patch('apache_beam.io.gcp.internal.clients'
-        '.storage.storage_v1_client.StorageV1.ObjectsService'), \
-      mock.patch('time.sleep'), \
-      self.assertRaises(Exception) as exc, \
-      beam.Pipeline() as p:
-
-      mock_insert_copy_job.side_effect = exception_type(error_message)
-
-      dummy_job_reference = beam.io.gcp.internal.clients.bigquery.JobReference()
-      dummy_job_reference.jobId = 'job_id'
-      dummy_job_reference.location = 'US'
-      dummy_job_reference.projectId = 'apache-beam-testing'
-
-      mock_load_job.return_value = dummy_job_reference
-
-      _ = (
-          p
-          | beam.Create([{
-              'columnA': 'value1'
-          }, {
-              'columnA': 'value2'
-          }, {
-              'columnA': 'value3'
-          }])
-          | WriteToBigQuery(
-              table='project:dataset.table',
-              schema={
-                  'fields': [{
-                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                  }]
-              },
-              create_disposition='CREATE_NEVER',
-              custom_gcs_temp_location="gs://temp_location",
-              method='FILE_LOADS'))
-
-    bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = old_max_file_size
-    bigquery_file_loads._MAXIMUM_LOAD_SIZE = old_max_partition_size
-    bigquery_file_loads._MAXIMUM_SOURCE_URIS = old_max_files_per_partition
 
-    self.assertEqual(4, mock_insert_copy_job.call_count)
-    self.assertIn(error_message, exc.exception.args[0])
+#   ## Commented out due to unittest.skip not working

Review Comment:
   okay, giving that a try



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1573931477

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1214787798


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))

Review Comment:
   Should not raise error here. Deleting more than 100 objects was supported before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1559535009

   I think the only one remaining is about whether the logic for delete_batch works and I've made a note that I need to write a test for that, so I'm not worried about if that comment gets lost.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1194341172


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
+    with self.client.batch():
+      for path in paths:
+        bucket_name, blob_path = parse_gcs_path(path)
+        bucket = self.client.get_bucket(bucket_name)
+        blob = storage.Blob(blob_path, bucket)
         exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
+        try:
+          blob.delete()
+        except Exception as err:
+          if err is NotFound:
             exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+          else:
+            exception = err
+        finally:
+          result_statuses.append((path, exception))
+
+      return result_statuses
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,

Review Comment:
   The kms key for a bucket is applied to the objects within it by default, so when a blob is copied over, the new blob inherits the kms key of the destination bucket unless explicitly told to do otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1207191764


##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
   with beam.Pipeline(options=pipeline_options) as p:
-
-    lines = p | ReadFromText(known_args.input)
-
-    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
-    split_lines_result = (
-        lines
-        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
-            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
-            main='words'))
-
-    # split_lines_result is an object of type DoOutputsTuple. It supports
-    # accessing result in alternative ways.
-    words, _, _ = split_lines_result
-    short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
-    character_count = split_lines_result.tag_character_count
-
-    # pylint: disable=expression-not-assigned
-    (
-        character_count
-        | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
-        | beam.GroupByKey()
-        | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
-        | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        short_words
-        | 'count short words' >> CountWords()
-        |
-        'write short words' >> WriteToText(known_args.output + '-short-words'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        words
-        | 'count words' >> CountWords()
-        | 'write words' >> WriteToText(known_args.output + '-words'))
+    try:
+      lines = p | ReadFromText(known_args.input)
+
+      # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+      split_lines_result = (
+          lines
+          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+              SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+              main='words'))
+
+      # split_lines_result is an object of type DoOutputsTuple. It supports
+      # accessing result in alternative ways.
+      words, _, _ = split_lines_result
+      short_words = split_lines_result[
+          SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+      character_count = split_lines_result.tag_character_count
+
+      # pylint: disable=expression-not-assigned
+      (
+          character_count
+          | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+          | beam.GroupByKey()
+          | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+          | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          short_words
+          | 'count short words' >> CountWords()
+          | 'write short words' >>
+          WriteToText(known_args.output + '-short-words'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          words
+          | 'count words' >> CountWords()
+          | 'write words' >> WriteToText(known_args.output + '-words'))
+    except BeamIOError as err:

Review Comment:
   Mentioned before that, these tests now wrapped with catch - except and would pass even pipeline fails. Please fix them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1210693813


##########
sdks/python/apache_beam/io/gcp/gcsio_integration_test.py:
##########
@@ -168,33 +139,6 @@ def _test_copy_batch(
   def test_copy_batch(self):
     self._test_copy_batch("test_copy_batch")
 
-  @pytest.mark.it_postcommit

Review Comment:
   We still need test for kms. How does kms key handled now? Is it possible to add postcommit test using kms for new gcsio?



##########
sdks/python/apache_beam/io/gcp/gcsio_integration_test.py:
##########
@@ -168,33 +139,6 @@ def _test_copy_batch(
   def test_copy_batch(self):
     self._test_copy_batch("test_copy_batch")
 
-  @pytest.mark.it_postcommit

Review Comment:
   We still need test for kms. How is kms key handled now? Is it possible to add postcommit test using kms for new gcsio?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1212103298


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -788,111 +788,116 @@ def noop(table, **kwargs):
               with_auto_sharding=True,
               test_client=client))
 
-  @parameterized.expand([
-      param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_message='accessDenied'),
-      param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_message='backendError')
-  ])
-  def test_load_job_exception(self, exception_type, error_message):
-
-    with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
-                     'Insert') as mock_load_job,\
-      mock.patch('apache_beam.io.gcp.internal.clients'
-                 '.storage.storage_v1_client.StorageV1.ObjectsService'),\
-      mock.patch('time.sleep'),\
-      self.assertRaises(Exception) as exc,\
-      beam.Pipeline() as p:
-
-      mock_load_job.side_effect = exception_type(error_message)
-
-      _ = (
-          p
-          | beam.Create([{
-              'columnA': 'value1'
-          }])
-          | WriteToBigQuery(
-              table='project:dataset.table',
-              schema={
-                  'fields': [{
-                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                  }]
-              },
-              create_disposition='CREATE_NEVER',
-              custom_gcs_temp_location="gs://temp_location",
-              method='FILE_LOADS'))
-
-    mock_load_job.assert_called()
-    self.assertIn(error_message, exc.exception.args[0])
-
-  @parameterized.expand([
-      param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_message='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_message='internalError'),
-  ])
-  def test_copy_load_job_exception(self, exception_type, error_message):
-
-    from apache_beam.io.gcp import bigquery_file_loads
-
-    old_max_file_size = bigquery_file_loads._DEFAULT_MAX_FILE_SIZE
-    old_max_partition_size = bigquery_file_loads._MAXIMUM_LOAD_SIZE
-    old_max_files_per_partition = bigquery_file_loads._MAXIMUM_SOURCE_URIS
-    bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = 15
-    bigquery_file_loads._MAXIMUM_LOAD_SIZE = 30
-    bigquery_file_loads._MAXIMUM_SOURCE_URIS = 1
-
-    with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
-                        'Insert') as mock_insert_copy_job, \
-      mock.patch.object(BigQueryWrapper,
-                        'perform_load_job') as mock_load_job, \
-      mock.patch.object(BigQueryWrapper,
-                        'wait_for_bq_job'), \
-      mock.patch('apache_beam.io.gcp.internal.clients'
-        '.storage.storage_v1_client.StorageV1.ObjectsService'), \
-      mock.patch('time.sleep'), \
-      self.assertRaises(Exception) as exc, \
-      beam.Pipeline() as p:
-
-      mock_insert_copy_job.side_effect = exception_type(error_message)
-
-      dummy_job_reference = beam.io.gcp.internal.clients.bigquery.JobReference()
-      dummy_job_reference.jobId = 'job_id'
-      dummy_job_reference.location = 'US'
-      dummy_job_reference.projectId = 'apache-beam-testing'
-
-      mock_load_job.return_value = dummy_job_reference
-
-      _ = (
-          p
-          | beam.Create([{
-              'columnA': 'value1'
-          }, {
-              'columnA': 'value2'
-          }, {
-              'columnA': 'value3'
-          }])
-          | WriteToBigQuery(
-              table='project:dataset.table',
-              schema={
-                  'fields': [{
-                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                  }]
-              },
-              create_disposition='CREATE_NEVER',
-              custom_gcs_temp_location="gs://temp_location",
-              method='FILE_LOADS'))
-
-    bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = old_max_file_size
-    bigquery_file_loads._MAXIMUM_LOAD_SIZE = old_max_partition_size
-    bigquery_file_loads._MAXIMUM_SOURCE_URIS = old_max_files_per_partition
 
-    self.assertEqual(4, mock_insert_copy_job.call_count)
-    self.assertIn(error_message, exc.exception.args[0])
+#   ## Commented out due to unittest.skip not working

Review Comment:
   That worked. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1574071948

   >  we're not really managing that anymore.
   
   It's this PR not managing kms anymore, currently both in Java and Python SDK does
   
   > The regressions we initially saw were due to the tests being non-representative of typical jobs 
   
   How is conclusion reached? This is different than what I heard last time.
   
   Please do investigation before push back comments. The anonymous client issue was a real issue. I am sign off for the review for now before comments gets addressed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1483021278

   stop reviewer notifications


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1226660531


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
+    with self.client.batch():
+      for path in paths:
+        bucket_name, blob_path = parse_gcs_path(path)
+        bucket = self.client.get_bucket(bucket_name)
+        blob = storage.Blob(blob_path, bucket)
         exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
+        try:
+          blob.delete()

Review Comment:
   thanks, if it is the case that batch copy and batch delete is not actually used, I would suggest make them just call gcsio.copy and gcsio.delete one by one at this moment, to avoid functioning regression. and leave performance regression (use actual batch) as near follow up. At the same time file a Feature Request to gcs client  that make a batch that can return all request reaults.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1608092171

   Run Python Dataflow ValidatesContainer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1245797583


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -295,162 +208,92 @@ def delete_batch(self, paths):
              argument, where exception is None if the operation succeeded or
              the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    final_results = []
+    s = 0
+    while s < len(paths):
+      if (s + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[s:]
+      current_batch = self.client.batch(raise_exception=False)
+      with current_batch:
+        for path in current_paths:
+          bucket_name, blob_name = parse_gcs_path(path)
+          bucket = self.client.get_bucket(bucket_name)
+          blob = storage.Blob(blob_name, bucket)
+          blob.delete()
+
+      current_responses = current_batch._responses
+      for resp in current_responses:
+        if resp[1] == NotFound:
+          final_results.append((resp[0], None))
+        else:
+          final_results.append(resp)
+
+      s += MAX_BATCH_OPERATION_SIZE
+
+    return final_results
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
-    """Copies the given GCS object from src to dest.
+    src_bucket_name, src_blob_name = parse_gcs_path(src)
+    dest_bucket_name, dest_blob_name= parse_gcs_path(dest, object_optional=True)
+    src_bucket = self.get_bucket(src_bucket_name)
+    src_blob = src_bucket.get_blob(src_blob_name)
+    if not src_blob:
+      raise NotFound("Source %s not found", src)
+    dest_bucket = self.get_bucket(dest_bucket_name)
+    if not dest_blob_name:
+      dest_blob_name = None
+    src_bucket.copy_blob(src_blob, dest_bucket, new_name=dest_blob_name)
+
+  def copy_batch(self, src_dest_pairs):
+    """Copies the given GCS objects from src to dest.
 
     Args:
       src_dest_pairs: list of (src, dest) tuples of gs://<bucket>/<name> files
                       paths to copy from src to dest, not to exceed
                       MAX_BATCH_OPERATION_SIZE in length.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite call will return after these many bytes. Used
-        primarily for testing.
 
     Returns: List of tuples of (src, dest, exception) in the same order as the
              src_dest_pairs argument, where exception is None if the operation
              succeeded or the relevant exception if the operation failed.
     """
-    if not src_dest_pairs:
-      return []
-    pair_to_request = {}
-    for pair in src_dest_pairs:
-      src_bucket, src_path = parse_gcs_path(pair[0])
-      dest_bucket, dest_path = parse_gcs_path(pair[1])
-      request = storage.StorageObjectsRewriteRequest(
-          sourceBucket=src_bucket,
-          sourceObject=src_path,
-          destinationBucket=dest_bucket,
-          destinationObject=dest_path,
-          destinationKmsKeyName=dest_kms_key_name,
-          maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-      pair_to_request[pair] = request
-    pair_to_status = {}
-    while True:
-      pairs_in_batch = list(set(src_dest_pairs) - set(pair_to_status))
-      if not pairs_in_batch:
-        break
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for pair in pairs_in_batch:
-        batch_request.Add(self.client.objects, 'Rewrite', pair_to_request[pair])
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for pair, api_call in zip(pairs_in_batch, api_calls):
-        src, dest = pair
-        response = api_call.response
-        if self._rewrite_cb is not None:
-          self._rewrite_cb(response)
-        if api_call.is_error:
-          exception = api_call.exception
-          # Translate 404 to the appropriate not found exception.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = (
-                GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
-          pair_to_status[pair] = exception
-        elif not response.done:
-          _LOGGER.debug(
-              'Rewrite progress: %d of %d bytes, %s to %s',
-              response.totalBytesRewritten,
-              response.objectSize,
-              src,
-              dest)
-          pair_to_request[pair].rewriteToken = response.rewriteToken
-        else:
-          _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-          pair_to_status[pair] = None
+    final_results = []
+    s = 0
+    while s < len(src_dest_pairs):
+      if (s + MAX_BATCH_OPERATION_SIZE) < len(src_dest_pairs):
+        current_pairs = src_dest_pairs[s:s + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_pairs = src_dest_pairs[s:]
+      current_batch = self.client.batch(raise_exception=False)
+      with current_batch:
+        for pair in current_pairs:
+          src_bucket_name, src_blob_name = parse_gcs_path(pair[0])
+          dest_bucket_name, dest_blob_name = parse_gcs_path(pair[1])
+          src_bucket = self.client.get_bucket(src_bucket_name)
+          src_blob = src_bucket.get_blob(src_blob_name)
+          dest_bucket = self.client.get_bucket(dest_bucket_name)
+
+          src_bucket.copy_blob(src_blob, dest_bucket, dest_blob_name)
 
-    return [(pair[0], pair[1], pair_to_status[pair]) for pair in src_dest_pairs]
+      final_results += current_batch._responses

Review Comment:
   The return value is no longer "Returns: List of tuples of (src, dest, exception) " as the pydoc states. It is now [(headers, payload)] originated from here: https://github.com/googleapis/python-storage/blob/5b492d144216177714e95645467e01c7dbc82d19/google/cloud/storage/batch.py#L356
   
   We probably still need to keep the original return types



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1224652809


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
+    with self.client.batch():
+      for path in paths:
+        bucket_name, blob_path = parse_gcs_path(path)
+        bucket = self.client.get_bucket(bucket_name)
+        blob = storage.Blob(blob_path, bucket)
         exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
+        try:
+          blob.delete()

Review Comment:
   It's not difficult to re-implement. In the client library it has the all information you need at the code path I pinned (https://github.com/googleapis/python-storage/blob/2b449cd289cb573ca1653bba37ecb84d35a025ad/google/cloud/storage/batch.py#L238) what we would need here isimplement a batch, not using a with clause, get the response and check every copy/delete
   
   Please avoid to introduce regression. When it comes to the need of removing a bunch of unit test it's not going quite right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1531915035

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1503973048

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1194356006


##########
sdks/python/apache_beam/io/gcp/gcsio_integration_test.py:
##########
@@ -168,33 +139,6 @@ def _test_copy_batch(
   def test_copy_batch(self):
     self._test_copy_batch("test_copy_batch")
 
-  @pytest.mark.it_postcommit

Review Comment:
   I'm not sure they're testing anything since we aren't directly managing kms keys during copying anymore. Is there another reason to keep them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1563578353

   Please check the PR title since it is ready now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1212282164


##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
   with beam.Pipeline(options=pipeline_options) as p:
-
-    lines = p | ReadFromText(known_args.input)
-
-    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
-    split_lines_result = (
-        lines
-        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
-            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
-            main='words'))
-
-    # split_lines_result is an object of type DoOutputsTuple. It supports
-    # accessing result in alternative ways.
-    words, _, _ = split_lines_result
-    short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
-    character_count = split_lines_result.tag_character_count
-
-    # pylint: disable=expression-not-assigned
-    (
-        character_count
-        | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
-        | beam.GroupByKey()
-        | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
-        | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        short_words
-        | 'count short words' >> CountWords()
-        |
-        'write short words' >> WriteToText(known_args.output + '-short-words'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        words
-        | 'count words' >> CountWords()
-        | 'write words' >> WriteToText(known_args.output + '-words'))
+    try:
+      lines = p | ReadFromText(known_args.input)
+
+      # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+      split_lines_result = (
+          lines
+          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+              SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+              main='words'))
+
+      # split_lines_result is an object of type DoOutputsTuple. It supports
+      # accessing result in alternative ways.
+      words, _, _ = split_lines_result
+      short_words = split_lines_result[
+          SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+      character_count = split_lines_result.tag_character_count
+
+      # pylint: disable=expression-not-assigned
+      (
+          character_count
+          | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+          | beam.GroupByKey()
+          | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+          | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          short_words
+          | 'count short words' >> CountWords()
+          | 'write short words' >>
+          WriteToText(known_args.output + '-short-words'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          words
+          | 'count words' >> CountWords()
+          | 'write words' >> WriteToText(known_args.output + '-words'))
+    except BeamIOError as err:

Review Comment:
   I see, the error happens when there is no credential provided. log:
   
   ```
   $ python -m apache_beam.examples.wordcount_minimal --output /Users/.../Desktop/outputoutput.txt
   INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
   INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
   INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: timed out
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: timed out
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth._default:Authentication failed using Compute Engine authentication due to unavailable metadata server.
   WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.
   Connecting anonymously.
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth._default:Authentication failed using Compute Engine authentication due to unavailable metadata server.
   Traceback (most recent call last):
   File "/Users/.../beam/sdks/python/apache_beam/examples/wordcount_minimal.py", line 148, in <module>
       main()
     File "/Users/.../beam/sdks/python/apache_beam/examples/wordcount_minimal.py", line 123, in main
       lines = p | ReadFromText(known_args.input)
     File "/Users/.../beam/sdks/python/apache_beam/io/textio.py", line 781, in __init__
       self._source = self._source_class(
     File "/Users/.../beam/sdks/python/apache_beam/io/textio.py", line 140, in __init__
       super().__init__(
     File "/Users/.../beam/sdks/python/apache_beam/io/filebasedsource.py", line 127, in __init__
       self._validate()
     File "/Users/.../beam/sdks/python/apache_beam/options/value_provider.py", line 193, in _f
       return fnc(self, *args, **kwargs)
     File "/Users/.../beam/sdks/python/apache_beam/io/filebasedsource.py", line 188, in _validate
       match_result = FileSystems.match([pattern], limits=[1])[0]
     File "/Users/.../beam/sdks/python/apache_beam/io/filesystems.py", line 204, in match
       return filesystem.match(patterns, limits)
     File "/Users/.../beam/sdks/python/apache_beam/io/filesystem.py", line 804, in match
       raise BeamIOError("Match operation failed", exceptions)
   ```
   
   No gcloud credential caused pipeline expansion fail at "lines = p | ReadFromText(known_args.input)" this is the very beginning of the test. 
   
   https://github.com/apache/beam/blob/7179cce5624c304d5f8f7ba69dc73e96b9823d31/sdks/python/apache_beam/examples/wordcount_minimal.py#LL125-L125
   
   pipeline is not initiatiated successfully. The pipeline expansion is incomplete and lines after ReadFromText and before catch is never executed. This is a breaking change - read from public bucket anonymously now fails.
   
   Note that wordcount is the hello world example and first time user is guided to run it. We provide the public bucket and user do not need google cloud account to run these examples. We should fix the code. Here it essentially skipped the test.
   
   If cloud-storage-client does not support anonymous connection (would be a surprise) we would need to make this migration opt-in instead of replacement.
   
   on master, the test runs successfully and a file gets written. log:
   ```
   INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
   INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
   INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: timed out
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: timed out
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth._default:Authentication failed using Compute Engine authentication due to unavailable metadata server.
   WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.
   Connecting anonymously.
   INFO:root:Default Python SDK image for environment is apache/beam_python3.8_sdk:2.49.0.dev
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function annotate_downstream_side_inputs at 0x11255ee50> ====================
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1212279790


##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
   with beam.Pipeline(options=pipeline_options) as p:
-
-    lines = p | ReadFromText(known_args.input)
-
-    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
-    split_lines_result = (
-        lines
-        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
-            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
-            main='words'))
-
-    # split_lines_result is an object of type DoOutputsTuple. It supports
-    # accessing result in alternative ways.
-    words, _, _ = split_lines_result
-    short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
-    character_count = split_lines_result.tag_character_count
-
-    # pylint: disable=expression-not-assigned
-    (
-        character_count
-        | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
-        | beam.GroupByKey()
-        | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
-        | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        short_words
-        | 'count short words' >> CountWords()
-        |
-        'write short words' >> WriteToText(known_args.output + '-short-words'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        words
-        | 'count words' >> CountWords()
-        | 'write words' >> WriteToText(known_args.output + '-words'))
+    try:
+      lines = p | ReadFromText(known_args.input)
+
+      # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+      split_lines_result = (
+          lines
+          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+              SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+              main='words'))
+
+      # split_lines_result is an object of type DoOutputsTuple. It supports
+      # accessing result in alternative ways.
+      words, _, _ = split_lines_result
+      short_words = split_lines_result[
+          SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+      character_count = split_lines_result.tag_character_count
+
+      # pylint: disable=expression-not-assigned
+      (
+          character_count
+          | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+          | beam.GroupByKey()
+          | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+          | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          short_words
+          | 'count short words' >> CountWords()
+          | 'write short words' >>
+          WriteToText(known_args.output + '-short-words'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          words
+          | 'count words' >> CountWords()
+          | 'write words' >> WriteToText(known_args.output + '-words'))
+    except BeamIOError as err:

Review Comment:
   Did you test if the apitools implementation actually makes writes to GCS? That's what I'm concerned with because my understanding is that neither implementation actually has credentials available to log into GCS (not even defaults) on the remote runs. The only difference is that apitools doesn't complain about that and GCS client does (which is probably better, even if inconvenient in this exact scenario).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1574248293

   @BjornPrime you are correct for kms support. Current gcsio kms flags is not used anywhere other than its test, more over, the kms test would still pass on your branch (if adding back method parameter but do not thing)
   
   https://github.com/apache/beam/pull/23785/commits/96496893a763e5dd382525414f4d5747d8f01f14
   
   test command: `python -m pytest -v apache_beam/io/gcp/gcsio_integration_test.py::GcsIOIntegrationTest -m it_postcommit  --test-pipeline-options="--runner TestDataflowRunner --project <gcpproject> --temp_location gs://<bucket>/temp/ --kms_key_name=projects/<project>/locations/us-central1/keyRings/<keyring name>/cryptoKeys/test"`
   
   result:
   
   ========================================== short test summary info ===========================================
   FAILED apache_beam/io/gcp/gcsio_integration_test.py::GcsIOIntegrationTest::test_copy_batch_rewrite_token - ...
   FAILED apache_beam/io/gcp/gcsio_integration_test.py::GcsIOIntegrationTest::test_copy_rewrite_token - Assert...
   ============================= 2 failed, 4 passed, 2 warnings in 85.16s (0:01:25) =============================
   
   In particular, apache_beam/io/gcp/gcsio_integration_test.py::GcsIOIntegrationTest::test_copy_kms PASSED
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1505774024

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1516816919

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1512190506

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1237673893


##########
sdks/python/apache_beam/io/gcp/gcsfilesystem.py:
##########
@@ -340,8 +329,7 @@ def metadata(self, path):
     """
     try:
       file_metadata = self._gcsIO()._status(path)
-      return FileMetadata(
-          path, file_metadata['size'], file_metadata['last_updated'])
+      return FileMetadata(path, file_metadata['size'], file_metadata['updated'])

Review Comment:
   did the content of metadata change after swapping clients or it's still the same info?



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -270,186 +213,86 @@ def delete(self, path):
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsDeleteRequest(
-        bucket=bucket, object=object_path)
+    bucket_name, target_name = parse_gcs_path(path)
     try:
-      self.client.objects.Delete(request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # Return success when the file doesn't exist anymore for idempotency.
-        return
-      raise
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
+      bucket = self.client.get_bucket(bucket_name)
+      bucket.delete_blob(target_name)
+    except NotFound:
+      return
+
   def delete_batch(self, paths):
     """Deletes the objects at the given GCS paths.
 
     Args:
       paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
              not to exceed MAX_BATCH_OPERATION_SIZE in length.
-
-    Returns: List of tuples of (path, exception) in the same order as the paths
-             argument, where exception is None if the operation succeeded or
-             the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    start = 0
+    while start < len(paths):
+      if (start + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[start:start + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[start:]
+      try:
+        with self.client.batch():
+          for path in current_paths:
+            bucket_name, blob_path = parse_gcs_path(path)
+            bucket = self.client.get_bucket(bucket_name)
+            blob = storage.Blob(blob_path, bucket)
+            blob.delete()
+      except NotFound:
+        pass
+      start += MAX_BATCH_OPERATION_SIZE
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+    src_bucket_name, src_path = parse_gcs_path(src)
+    dest_bucket_name, dest_path = parse_gcs_path(dest)

Review Comment:
   do we need to specify  `parse_gcs_path(dest, object_optional=True)` ?



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -59,20 +48,10 @@
 
 _LOGGER = logging.getLogger(__name__)
 
-# Issue a friendlier error message if the storage library is not available.
-# TODO(silviuc): Remove this guard when storage is available everywhere.
 try:
-  # pylint: disable=wrong-import-order, wrong-import-position
-  # pylint: disable=ungrouped-imports
-  from apitools.base.py.batch import BatchApiRequest
-  from apitools.base.py.exceptions import HttpError
-  from apitools.base.py import transfer
   from apache_beam.internal.gcp import auth
-  from apache_beam.io.gcp.internal.clients import storage
 except ImportError:
-  raise ImportError(
-      'Google Cloud Storage I/O not supported for this execution environment '
-      '(could not import storage API client).')
+  raise ImportError('Internal auth library not found')

Review Comment:
   This is not necessary. This import shouldn't be throwing exceptions, see: https://github.com/apache/beam/blob/88383b940434ec1d97efab3cf9b5d2c9fd54a511/sdks/python/apache_beam/internal/gcp/auth.py#L37



##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -796,6 +796,7 @@ def noop(table, **kwargs):
           exception_type=exceptions.ServiceUnavailable if exceptions else None,
           error_message='backendError')
   ])
+  @unittest.skip('Not compatible with new GCS client. See GH issue #26334.')

Review Comment:
   left a comment on the issue. seems like we should find a way to run the necessary test scenario or delete the test if the coverage it provides is not meaningful. 



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -270,186 +213,86 @@ def delete(self, path):
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsDeleteRequest(
-        bucket=bucket, object=object_path)
+    bucket_name, target_name = parse_gcs_path(path)
     try:
-      self.client.objects.Delete(request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # Return success when the file doesn't exist anymore for idempotency.
-        return
-      raise
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
+      bucket = self.client.get_bucket(bucket_name)
+      bucket.delete_blob(target_name)
+    except NotFound:
+      return
+
   def delete_batch(self, paths):
     """Deletes the objects at the given GCS paths.
 
     Args:
       paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
              not to exceed MAX_BATCH_OPERATION_SIZE in length.
-
-    Returns: List of tuples of (path, exception) in the same order as the paths
-             argument, where exception is None if the operation succeeded or
-             the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    start = 0
+    while start < len(paths):
+      if (start + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[start:start + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[start:]
+      try:
+        with self.client.batch():
+          for path in current_paths:
+            bucket_name, blob_path = parse_gcs_path(path)
+            bucket = self.client.get_bucket(bucket_name)
+            blob = storage.Blob(blob_path, bucket)
+            blob.delete()
+      except NotFound:
+        pass
+      start += MAX_BATCH_OPERATION_SIZE
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+    src_bucket_name, src_path = parse_gcs_path(src)

Review Comment:
   src_path / dest_path as a confiusing name. let's use src_object_name or src_blob_name. would be good to use the same name for consistency throughout this file.
   
   Note that in GCS there is not concept of path or subdirectories. there are buckets and objects. forward-slashes are simply characters in object names.
   



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -185,40 +144,30 @@ def get_project_number(self, bucket):
       bucket_metadata = self.get_bucket(bucket_name=bucket)
       if bucket_metadata:
         self.bucket_to_project_number[bucket] = bucket_metadata.projectNumber
-      #  else failed to load the bucket metadata due to HttpError
 
     return self.bucket_to_project_number.get(bucket, None)
 
-  def _set_rewrite_response_callback(self, callback):
-    """For testing purposes only. No backward compatibility guarantees.
-
-    Args:
-      callback: A function that receives ``storage.RewriteResponse``.
-    """
-    self._rewrite_cb = callback
-
   def get_bucket(self, bucket_name):
     """Returns an object bucket from its name, or None if it does not exist."""
     try:
-      request = storage.StorageBucketsGetRequest(bucket=bucket_name)
-      return self.client.buckets.Get(request)
-    except HttpError:
+      return self.client.lookup_bucket(bucket_name)
+    except NotFound:
       return None
 
   def create_bucket(self, bucket_name, project, kms_key=None, location=None):
     """Create and return a GCS bucket in a specific project."""
-    encryption = None
-    if kms_key:
-      encryption = storage.Bucket.EncryptionValue(kms_key)
-
-    request = storage.StorageBucketsInsertRequest(
-        bucket=storage.Bucket(
-            name=bucket_name, location=location, encryption=encryption),
-        project=project,
-    )
+
     try:
-      return self.client.buckets.Insert(request)
-    except HttpError:
+      bucket = self.client.create_bucket(
+          bucket_or_name=bucket_name,
+          project=project,
+          location=location,
+      )
+      if kms_key:
+        bucket.default_kms_key_name(kms_key)
+        return self.get_bucket(bucket_name)

Review Comment:
   fwiw, seeing bucket.patch() in https://cloud.google.com/storage/docs/samples/storage-set-bucket-default-kms-key in this scenario, but not an expert here. wondering if both options are the same.



##########
sdks/python/apache_beam/runners/portability/sdk_container_builder.py:
##########
@@ -307,16 +306,16 @@ def _invoke_docker_build_and_push(self, container_image_name):
         "Python SDK container built and pushed as %s." % container_image_name)
 
   def _upload_to_gcs(self, local_file_path, gcs_location):
-    gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
-    request = storage.StorageObjectsInsertRequest(
-        bucket=gcs_bucket, name=gcs_object)
+    bucket_name, blob_name = self._get_gcs_bucket_and_name(gcs_location)
     _LOGGER.info('Starting GCS upload to %s...', gcs_location)
-    total_size = os.path.getsize(local_file_path)
     from apitools.base.py import exceptions
+    from google.cloud import storage

Review Comment:
   should this code use FileSystems api? 
   
   Also, have you tested prebuilding codepath after these changes? You can run Dataflow Python ValidatesContainer postcommit suite.



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -270,186 +213,86 @@ def delete(self, path):
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsDeleteRequest(
-        bucket=bucket, object=object_path)
+    bucket_name, target_name = parse_gcs_path(path)
     try:
-      self.client.objects.Delete(request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # Return success when the file doesn't exist anymore for idempotency.
-        return
-      raise
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
+      bucket = self.client.get_bucket(bucket_name)
+      bucket.delete_blob(target_name)
+    except NotFound:
+      return
+
   def delete_batch(self, paths):
     """Deletes the objects at the given GCS paths.
 
     Args:
       paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
              not to exceed MAX_BATCH_OPERATION_SIZE in length.
-
-    Returns: List of tuples of (path, exception) in the same order as the paths
-             argument, where exception is None if the operation succeeded or
-             the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    start = 0
+    while start < len(paths):
+      if (start + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[start:start + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[start:]
+      try:
+        with self.client.batch():
+          for path in current_paths:
+            bucket_name, blob_path = parse_gcs_path(path)
+            bucket = self.client.get_bucket(bucket_name)
+            blob = storage.Blob(blob_path, bucket)
+            blob.delete()
+      except NotFound:
+        pass
+      start += MAX_BATCH_OPERATION_SIZE
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+    src_bucket_name, src_path = parse_gcs_path(src)
+    dest_bucket_name, dest_path = parse_gcs_path(dest)
+    src_bucket = self.get_bucket(src_bucket_name)
+    src_blob = src_bucket.get_blob(src_path)
+    if not src_blob:
+      raise NotFound("Source %s not found", src)
+    dest_bucket = self.get_bucket(dest_bucket_name)
+    if not dest_path:
+      dest_path = None
+    src_bucket.copy_blob(src_blob, dest_bucket, new_name=dest_path)
+
+  def copy_batch(self, src_dest_pairs):
     """Copies the given GCS object from src to dest.

Review Comment:
   > Copies the given GCS object from src to dest.
   
   should this be:     Copies given GCS objects from src to dest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1245797583


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -295,162 +208,92 @@ def delete_batch(self, paths):
              argument, where exception is None if the operation succeeded or
              the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    final_results = []
+    s = 0
+    while s < len(paths):
+      if (s + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[s:]
+      current_batch = self.client.batch(raise_exception=False)
+      with current_batch:
+        for path in current_paths:
+          bucket_name, blob_name = parse_gcs_path(path)
+          bucket = self.client.get_bucket(bucket_name)
+          blob = storage.Blob(blob_name, bucket)
+          blob.delete()
+
+      current_responses = current_batch._responses
+      for resp in current_responses:
+        if resp[1] == NotFound:
+          final_results.append((resp[0], None))
+        else:
+          final_results.append(resp)
+
+      s += MAX_BATCH_OPERATION_SIZE
+
+    return final_results
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
-    """Copies the given GCS object from src to dest.
+    src_bucket_name, src_blob_name = parse_gcs_path(src)
+    dest_bucket_name, dest_blob_name= parse_gcs_path(dest, object_optional=True)
+    src_bucket = self.get_bucket(src_bucket_name)
+    src_blob = src_bucket.get_blob(src_blob_name)
+    if not src_blob:
+      raise NotFound("Source %s not found", src)
+    dest_bucket = self.get_bucket(dest_bucket_name)
+    if not dest_blob_name:
+      dest_blob_name = None
+    src_bucket.copy_blob(src_blob, dest_bucket, new_name=dest_blob_name)
+
+  def copy_batch(self, src_dest_pairs):
+    """Copies the given GCS objects from src to dest.
 
     Args:
       src_dest_pairs: list of (src, dest) tuples of gs://<bucket>/<name> files
                       paths to copy from src to dest, not to exceed
                       MAX_BATCH_OPERATION_SIZE in length.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite call will return after these many bytes. Used
-        primarily for testing.
 
     Returns: List of tuples of (src, dest, exception) in the same order as the
              src_dest_pairs argument, where exception is None if the operation
              succeeded or the relevant exception if the operation failed.
     """
-    if not src_dest_pairs:
-      return []
-    pair_to_request = {}
-    for pair in src_dest_pairs:
-      src_bucket, src_path = parse_gcs_path(pair[0])
-      dest_bucket, dest_path = parse_gcs_path(pair[1])
-      request = storage.StorageObjectsRewriteRequest(
-          sourceBucket=src_bucket,
-          sourceObject=src_path,
-          destinationBucket=dest_bucket,
-          destinationObject=dest_path,
-          destinationKmsKeyName=dest_kms_key_name,
-          maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-      pair_to_request[pair] = request
-    pair_to_status = {}
-    while True:
-      pairs_in_batch = list(set(src_dest_pairs) - set(pair_to_status))
-      if not pairs_in_batch:
-        break
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for pair in pairs_in_batch:
-        batch_request.Add(self.client.objects, 'Rewrite', pair_to_request[pair])
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for pair, api_call in zip(pairs_in_batch, api_calls):
-        src, dest = pair
-        response = api_call.response
-        if self._rewrite_cb is not None:
-          self._rewrite_cb(response)
-        if api_call.is_error:
-          exception = api_call.exception
-          # Translate 404 to the appropriate not found exception.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = (
-                GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
-          pair_to_status[pair] = exception
-        elif not response.done:
-          _LOGGER.debug(
-              'Rewrite progress: %d of %d bytes, %s to %s',
-              response.totalBytesRewritten,
-              response.objectSize,
-              src,
-              dest)
-          pair_to_request[pair].rewriteToken = response.rewriteToken
-        else:
-          _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-          pair_to_status[pair] = None
+    final_results = []
+    s = 0
+    while s < len(src_dest_pairs):
+      if (s + MAX_BATCH_OPERATION_SIZE) < len(src_dest_pairs):
+        current_pairs = src_dest_pairs[s:s + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_pairs = src_dest_pairs[s:]
+      current_batch = self.client.batch(raise_exception=False)
+      with current_batch:
+        for pair in current_pairs:
+          src_bucket_name, src_blob_name = parse_gcs_path(pair[0])
+          dest_bucket_name, dest_blob_name = parse_gcs_path(pair[1])
+          src_bucket = self.client.get_bucket(src_bucket_name)
+          src_blob = src_bucket.get_blob(src_blob_name)
+          dest_bucket = self.client.get_bucket(dest_bucket_name)
+
+          src_bucket.copy_blob(src_blob, dest_bucket, dest_blob_name)
 
-    return [(pair[0], pair[1], pair_to_status[pair]) for pair in src_dest_pairs]
+      final_results += current_batch._responses

Review Comment:
   The return value is no longer "Returns: List of tuples of (src, dest, exception) " as the pydoc states. It is now [(headers, payload)] originated from here: https://github.com/googleapis/python-storage/blob/5b492d144216177714e95645467e01c7dbc82d19/google/cloud/storage/batch.py#L356



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1218555055


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))

Review Comment:
   Okay, I'll change our MAX_BATCH_OPERATION_SIZE to 1000 the GCS client and see if I can make slicing larger groups of files in multiple batches work, because I do agree that would be better.
   
   Re: that link the client's code, I think that's outdated. There's a parameter now to record a different exception for each request. I'm in the process of implementing it currently to ensure that `delete_batch` is idempotent.



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))

Review Comment:
   Okay, I'll change our MAX_BATCH_OPERATION_SIZE to 1000 the GCS client and see if I can make slicing larger groups of files in multiple batches work, because I do agree that would be better.
   
   Re: that link to the client's code, I think that's outdated. There's a parameter now to record a different exception for each request. I'm in the process of implementing it currently to ensure that `delete_batch` is idempotent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1581197594

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1577509346

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1219803495


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))

Review Comment:
   Oh wait, I'm now seeing that the GCS client feature I mentioned is actually very new and not yet part of a release. I've reached out the client team to see when a new release might be available, since it will could really help solve our issue with making delete_batch idempotent. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1505941635

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1574141112

   > It's this PR not managing kms anymore, currently both in Java and Python SDK does
   I had meant this PR, and the GCSIO in general, won't need to manage kms like we had been previously. I'm not sure what you mean by the last half of the comment. Other parts of the Python SDK are still managing kms. Do any of those parts conflict with the changes I've made in this PR?
   
   > How is conclusion reached? This is different than what I heard last time.
   At the IO sync a couple weeks ago I laid out my case that the current performance tests were using a large number of small records, which was not representative of typical GCSIO jobs, and this caused the larger per-unit overhead of the GCS client implementation to have an outsized impact and cause it to be slower. In tests with larger record sizes, the performance regression disappeared, and according to the GCS client team, the larger record sizes are more representative of what GCS usually handles. I'm still working on my write-up of the case but I can send you the raw results in a spreadsheet if you'd like.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1613439777

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1613659833

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1219883954


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
+    with self.client.batch():
+      for path in paths:
+        bucket_name, blob_path = parse_gcs_path(path)
+        bucket = self.client.get_bucket(bucket_name)
+        blob = storage.Blob(blob_path, bucket)
         exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
+        try:
+          blob.delete()

Review Comment:
   Thanks, however I don't think `raise_exception=False` would help much. What I was saying is that in a with clause, the requests are first stacked, and get processed at `__exit__` (see the above link to batch.py). So the following try clause won't catch any http error:
   
   ```
   with batch()
     try:
       copy_blob
     except err:
       # This won't work
   ```
   
   because the actual http calls happens in with's `__exit__`.
   
   If we still need to return the copy/delete status of every blob, we then need to process the response in our own. Currently it is done here: https://github.com/googleapis/python-storage/blob/2b449cd289cb573ca1653bba37ecb84d35a025ad/google/cloud/storage/batch.py#L238
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1551651666

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1551606501

   @Abacn @johnjcasey @tvalentyn removing the old storage client and updating the base image requirements seems to have resulted in merge conflicts I can't resolve and that are blocking the tests from running. Could one of you resolve them so the tests can run?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1207174061


##########
sdks/python/setup.py:
##########
@@ -309,6 +309,7 @@ def get_portability_package_data():
             'google-cloud-datastore>=2.0.0,<3',
             'google-cloud-pubsub>=2.1.0,<3',
             'google-cloud-pubsublite>=1.2.0,<2',
+            'google-cloud-storage>=2.7.0,<3',

Review Comment:
   Adding now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1579252241

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1224720792


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
+    with self.client.batch():
+      for path in paths:
+        bucket_name, blob_path = parse_gcs_path(path)
+        bucket = self.client.get_bucket(bucket_name)
+        blob = storage.Blob(blob_path, bucket)
         exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
+        try:
+          blob.delete()

Review Comment:
   I considered that but all the fields on the client library Batch object are marked as private, so I wasn't sure we could rely on them in the future.
   
   Are the methods in GcsIO being accessed directly by anyone or are they only accessed through GCSFileSystem? If it's the former, I agree that maintaining their return values is important, but if it's the latter I don't feel like that's as high of a priority, especially if we can only do that by using internal variables that aren't guaranteed to remain backwards compatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1237725603


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -270,186 +213,86 @@ def delete(self, path):
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsDeleteRequest(
-        bucket=bucket, object=object_path)
+    bucket_name, target_name = parse_gcs_path(path)
     try:
-      self.client.objects.Delete(request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # Return success when the file doesn't exist anymore for idempotency.
-        return
-      raise
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
+      bucket = self.client.get_bucket(bucket_name)
+      bucket.delete_blob(target_name)
+    except NotFound:
+      return
+
   def delete_batch(self, paths):
     """Deletes the objects at the given GCS paths.
 
     Args:
       paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
              not to exceed MAX_BATCH_OPERATION_SIZE in length.
-
-    Returns: List of tuples of (path, exception) in the same order as the paths
-             argument, where exception is None if the operation succeeded or
-             the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    start = 0
+    while start < len(paths):
+      if (start + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[start:start + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[start:]
+      try:
+        with self.client.batch():
+          for path in current_paths:
+            bucket_name, blob_path = parse_gcs_path(path)
+            bucket = self.client.get_bucket(bucket_name)
+            blob = storage.Blob(blob_path, bucket)
+            blob.delete()
+      except NotFound:
+        pass
+      start += MAX_BATCH_OPERATION_SIZE
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+    src_bucket_name, src_path = parse_gcs_path(src)

Review Comment:
   src_path / dest_path is a confiusing name. let's use src_object_name or src_blob_name. would be good to use the same name for consistency throughout this file.
   
   Note that in GCS there is no concept of path or subdirectories. there are buckets and objects. forward-slashes are simply characters in object names.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1601732638

   Run Dataflow Python ValidatesContainer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1511742913

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1516505396

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1516899483

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1199261722


##########
sdks/python/apache_beam/internal/gcp/auth.py:
##########
@@ -119,7 +122,7 @@ class _Credentials(object):
 
   @classmethod
   def get_service_credentials(cls, pipeline_options):
-    # type: (PipelineOptions) -> Optional[google.auth.credentials.Credentials]
+    # type: (PipelineOptions) -> Optional[_ApitoolsCredentialsAdapter]

Review Comment:
   The return type didn't change, I just updated the type hint to be accurate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1551610704

   > @Abacn @johnjcasey @tvalentyn removing the old storage client and updating the base image requirements seems to have resulted in merge conflicts I can't resolve and that are blocking the tests from running. Could one of you resolve them so the tests can run?
   
   base image requirements are auto generated. You can just accept the master when rebase


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1502193869

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1176791404


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -241,24 +187,22 @@ def open(
     Raises:
       ValueError: Invalid open file mode.
     """
+    bucket_name, blob_name = parse_gcs_path(filename)
+    bucket = self.client.get_bucket(bucket_name)
+
     if mode == 'r' or mode == 'rb':
-      downloader = GcsDownloader(
-          self.client,
-          filename,
-          buffer_size=read_buffer_size,
-          get_project_number=self.get_project_number)
-      return io.BufferedReader(
-          DownloaderStream(
-              downloader, read_buffer_size=read_buffer_size, mode=mode),
-          buffer_size=read_buffer_size)
+      blob = bucket.get_blob(blob_name)
+      return storage.fileio.BlobReader(blob, chunk_size=read_buffer_size)

Review Comment:
   from google.cloud.storage.fileio import BlobReader?



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
+    with self.client.batch():
+      for path in paths:
+        bucket_name, blob_path = parse_gcs_path(path)
+        bucket = self.client.get_bucket(bucket_name)
+        blob = storage.Blob(blob_path, bucket)
         exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
+        try:
+          blob.delete()

Review Comment:
   This is in a batch environment where persumably request is executed in with's `__exit__`. Have you verified that if there is any NotFound exception, it emits here in the with block or in `__exit__`?



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))

Review Comment:
   We may still need `MAX_BATCH_OPERATION_SIZE` logic as the documentation states "No more than 100 calls should be included in a single batch request.": https://cloud.google.com/storage/docs/batch#batch-example-request



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
+    with self.client.batch():
+      for path in paths:
+        bucket_name, blob_path = parse_gcs_path(path)
+        bucket = self.client.get_bucket(bucket_name)
+        blob = storage.Blob(blob_path, bucket)
         exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
+        try:
+          blob.delete()
+        except Exception as err:
+          if err is NotFound:
             exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+          else:
+            exception = err
+        finally:
+          result_statuses.append((path, exception))
+
+      return result_statuses
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,

Review Comment:
   Just confirm if it is case that kms_key support is now embedded in the gcsio's constructor and no longer needed here?



##########
sdks/python/apache_beam/io/gcp/gcsio_integration_test.py:
##########
@@ -168,33 +139,6 @@ def _test_copy_batch(
   def test_copy_batch(self):
     self._test_copy_batch("test_copy_batch")
 
-  @pytest.mark.it_postcommit

Review Comment:
   Can we keep these tests? They are running on real kms key (different from unit test)



##########
sdks/python/apache_beam/io/filesystem.py:
##########
@@ -219,28 +219,33 @@ def _initialize_compressor(self):
 
   def readable(self):
     # type: () -> bool
-    mode = self._file.mode
-    return 'r' in mode or 'a' in mode
+    try:
+      return 'r' in self._file.mode or 'a' in self._file.mode
+    except AttributeError:

Review Comment:
   why there could be an attribute error here. Is that `self._file` does not have mode attribute? If so one can wrap self._file to a class that has these compatible attribute.



##########
sdks/python/apache_beam/examples/complete/game/user_score.py:
##########
@@ -177,12 +178,20 @@ def format_user_score_sums(user_score):
       (user, score) = user_score
       return 'user: %s, total_score: %s' % (user, score)
 
-    (  # pylint: disable=expression-not-assigned
-        p
-        | 'ReadInputText' >> beam.io.ReadFromText(args.input)
-        | 'UserScore' >> UserScore()
-        | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
-        | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
+    try:

Review Comment:
   I would understand changes to here and many test are not meant to be final?
   
   If there is an exception thrown in pipeline expansion time then the pipeline is incomplete.



##########
sdks/python/apache_beam/io/filesystem.py:
##########
@@ -219,28 +219,33 @@ def _initialize_compressor(self):
 
   def readable(self):
     # type: () -> bool
-    mode = self._file.mode
-    return 'r' in mode or 'a' in mode
+    try:
+      return 'r' in self._file.mode or 'a' in self._file.mode
+    except AttributeError:
+      return self._file.readable is True
 
   def writeable(self):
     # type: () -> bool
-    mode = self._file.mode
-    return 'w' in mode or 'a' in mode
+    try:
+      return 'w' in self._file.mode or 'a' in self._file.mode
+    except AttributeError:
+      return self._file.writable is True
 
   def write(self, data):
     # type: (bytes) -> None
 
     """Write data to file."""
     if not self._compressor:
-      raise ValueError('compressor not initialized')
+      self._initialize_compressor()

Review Comment:
   `self._initialize_compressor` is done in constructor if the file is writable. If hit this code path there is something wrong with the config of the writeable() (see above).
   
   In general, it should not be necessary to change the logics of filesystem.py here



##########
sdks/python/apache_beam/internal/gcp/auth.py:
##########
@@ -119,7 +122,7 @@ class _Credentials(object):
 
   @classmethod
   def get_service_credentials(cls, pipeline_options):
-    # type: (PipelineOptions) -> Optional[google.auth.credentials.Credentials]
+    # type: (PipelineOptions) -> Optional[_ApitoolsCredentialsAdapter]

Review Comment:
   What caused the return type change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1508807409

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1214792320


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
+    with self.client.batch():
+      for path in paths:
+        bucket_name, blob_path = parse_gcs_path(path)
+        bucket = self.client.get_bucket(bucket_name)
+        blob = storage.Blob(blob_path, bucket)
         exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
+        try:
+          blob.delete()

Review Comment:
   The method you pointed is called in finish() which is called in `__exit__`, that is when with block ends. This try - catch block here won't work as intended as it is before batch's `__exit__`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1215138142


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))

Review Comment:
   I see, you are correct for the previous behavior, and it should be considered a bug (after 100th element ignored). This might be a source of leftover temp test files in our test system. How about divide the incoming files into slices of 100, and sending batch request sequentially?
   
   ```
   for i in range(0, len(paths), MAX_BATCH_OPERATION_SIZE):
     slice = paths[i:i + MAX_BATCH_OPERATION_SIZE]
     try:
       with self.client.batch():
         for path in paths::
         bucket_name, blob_path = parse_gcs_path(path)
         bucket = self.client.get_bucket(bucket_name)
         blob = storage.Blob(blob_path, bucket)
         blob.delete()
     except Exception as err:
       ...
   ```
   
   btw max batch size is 1000 for storage client:  https://github.com/googleapis/python-storage/blob/2b449cd289cb573ca1653bba37ecb84d35a025ad/google/cloud/storage/batch.py#LL138C5-L138C21)
   
   Looking into the client library source code, it only tracks the lastly saw error (and drops previous one): https://github.com/googleapis/python-storage/blob/2b449cd289cb573ca1653bba37ecb84d35a025ad/google/cloud/storage/batch.py#LL240C19-L240C19 So we would not able to get the status of single delete requests as it is for now.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1242679621


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -185,40 +144,30 @@ def get_project_number(self, bucket):
       bucket_metadata = self.get_bucket(bucket_name=bucket)
       if bucket_metadata:
         self.bucket_to_project_number[bucket] = bucket_metadata.projectNumber
-      #  else failed to load the bucket metadata due to HttpError
 
     return self.bucket_to_project_number.get(bucket, None)
 
-  def _set_rewrite_response_callback(self, callback):
-    """For testing purposes only. No backward compatibility guarantees.
-
-    Args:
-      callback: A function that receives ``storage.RewriteResponse``.
-    """
-    self._rewrite_cb = callback
-
   def get_bucket(self, bucket_name):
     """Returns an object bucket from its name, or None if it does not exist."""
     try:
-      request = storage.StorageBucketsGetRequest(bucket=bucket_name)
-      return self.client.buckets.Get(request)
-    except HttpError:
+      return self.client.lookup_bucket(bucket_name)
+    except NotFound:
       return None
 
   def create_bucket(self, bucket_name, project, kms_key=None, location=None):
     """Create and return a GCS bucket in a specific project."""
-    encryption = None
-    if kms_key:
-      encryption = storage.Bucket.EncryptionValue(kms_key)
-
-    request = storage.StorageBucketsInsertRequest(
-        bucket=storage.Bucket(
-            name=bucket_name, location=location, encryption=encryption),
-        project=project,
-    )
+
     try:
-      return self.client.buckets.Insert(request)
-    except HttpError:
+      bucket = self.client.create_bucket(
+          bucket_or_name=bucket_name,
+          project=project,
+          location=location,
+      )
+      if kms_key:
+        bucket.default_kms_key_name(kms_key)
+        return self.get_bucket(bucket_name)

Review Comment:
   Yeah, I think that's actually the best way to do it. Updating it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1242725651


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -796,6 +796,7 @@ def noop(table, **kwargs):
           exception_type=exceptions.ServiceUnavailable if exceptions else None,
           error_message='backendError')
   ])
+  @unittest.skip('Not compatible with new GCS client. See GH issue #26334.')

Review Comment:
   Responded on the issue. My question as relates to this PR is whether this issue should block the merge or can be dealt with separately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1601732812

   Run Python Dataflow ValidatesContainer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1237605944


##########
sdks/python/apache_beam/examples/complete/game/user_score.py:
##########
@@ -177,12 +178,20 @@ def format_user_score_sums(user_score):
       (user, score) = user_score
       return 'user: %s, total_score: %s' % (user, score)
 
-    (  # pylint: disable=expression-not-assigned
-        p
-        | 'ReadInputText' >> beam.io.ReadFromText(args.input)
-        | 'UserScore' >> UserScore()
-        | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
-        | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
+    try:

Review Comment:
   the issue is closed, by I still see the try-excepts. Are they still necessary?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1601567279

   > Left some more comment regarding previous unresolved comments.
   
   Have all @Abacn 's comment been addressed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1214856302


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))

Review Comment:
   The previous code was silently truncating the list if more than 100 paths were given. Is that preferable to throwing an error so folks know they're over the limit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1584817945

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1211851551


##########
sdks/python/apache_beam/io/gcp/gcsio_integration_test.py:
##########
@@ -168,33 +139,6 @@ def _test_copy_batch(
   def test_copy_batch(self):
     self._test_copy_batch("test_copy_batch")
 
-  @pytest.mark.it_postcommit

Review Comment:
   The GCS client handles kms automatically. The default is to use the bucket kms for a new object in that bucket. If an object is copied from one bucket to another, the copy will have the new bucket's kms. The only way the new GCSIO interacts with kms (beyond retrieving keys from buckets or objects) is by allowing it to be set on a new bucket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1609910047

   Run Python Dataflow ValidatesContainer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1507466648

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1513784450

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1509340891

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1511947902

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1515022157

   ## [Codecov](https://codecov.io/gh/apache/beam/pull/25965?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#25965](https://codecov.io/gh/apache/beam/pull/25965?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (850fd87) into [master](https://codecov.io/gh/apache/beam/commit/87db1ae2c04e8589590e6cf8784b5d6d36878241?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (87db1ae) will **increase** coverage by `9.63%`.
   > The diff coverage is `45.34%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #25965      +/-   ##
   ==========================================
   + Coverage   71.17%   80.81%   +9.63%     
   ==========================================
     Files         787      465     -322     
     Lines      103294    66091   -37203     
   ==========================================
   - Hits        73523    53412   -20111     
   + Misses      28283    12679   -15604     
   + Partials     1488        0    -1488     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `?` | |
   | python | `80.81% <45.34%> (+0.94%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/25965?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...\_beam/runners/portability/sdk\_container\_builder.py](https://codecov.io/gh/apache/beam/pull/25965?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9zZGtfY29udGFpbmVyX2J1aWxkZXIucHk=) | `37.25% <0.00%> (-0.68%)` | :arrow_down: |
   | [.../python/apache\_beam/runners/worker/data\_sampler.py](https://codecov.io/gh/apache/beam/pull/25965?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9zYW1wbGVyLnB5) | `100.00% <ø> (ø)` | |
   | [sdks/python/setup.py](https://codecov.io/gh/apache/beam/pull/25965?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vc2V0dXAucHk=) | `0.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/examples/wordcount.py](https://codecov.io/gh/apache/beam/pull/25965?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvd29yZGNvdW50LnB5) | `40.54% <6.66%> (-7.74%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/filesystem.py](https://codecov.io/gh/apache/beam/pull/25965?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZmlsZXN5c3RlbS5weQ==) | `87.66% <36.36%> (-0.94%)` | :arrow_down: |
   | [...apache\_beam/runners/dataflow/internal/apiclient.py](https://codecov.io/gh/apache/beam/pull/25965?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9pbnRlcm5hbC9hcGljbGllbnQucHk=) | `77.25% <38.46%> (+0.26%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/25965?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `67.55% <50.50%> (-24.74%)` | :arrow_down: |
   | [sdks/python/apache\_beam/internal/gcp/auth.py](https://codecov.io/gh/apache/beam/pull/25965?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvZ2NwL2F1dGgucHk=) | `79.72% <100.00%> (+0.56%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/gcp/gcsfilesystem.py](https://codecov.io/gh/apache/beam/pull/25965?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2ZpbGVzeXN0ZW0ucHk=) | `88.65% <100.00%> (ø)` | |
   | [...ks/python/apache\_beam/runners/interactive/utils.py](https://codecov.io/gh/apache/beam/pull/25965?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS91dGlscy5weQ==) | `95.13% <100.00%> (+0.04%)` | :arrow_up: |
   
   ... and [338 files with indirect coverage changes](https://codecov.io/gh/apache/beam/pull/25965/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1194313340


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))

Review Comment:
   The lines I added above should ensure that an error is raised if more than 100 paths are requested. I'm pretty sure the previous logic was just truncating the request. If that's what we want to do, I can change it back, but I think throwing an error is more accurate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1220085820


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
+    with self.client.batch():
+      for path in paths:
+        bucket_name, blob_path = parse_gcs_path(path)
+        bucket = self.client.get_bucket(bucket_name)
+        blob = storage.Blob(blob_path, bucket)
         exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
+        try:
+          blob.delete()

Review Comment:
   I was thinking more that `raise_exception=False` would naturally provide us with a way to return all the errors with the paths that caused them to be thrown.
   
   But to your question about whether we need the status of every copy/delete to be returned, I'm actually not sure we do. The only place these methods are used is in `gcsfilesystem.py`, where the responses are parsed so exceptions can be connected to their respective paths, but I'm not sure why that's particularly preferable to just returning the first error anyways (which will have the name of the affected path included anyway). I guess it is technically more informative but also risks being excessive and difficult to parse if a whole batch fails (especially since in the case of `copy_batch` it would be attempting to print an entire map.)
   
   I'll change the batch methods to just run without returning responses and see how that goes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1613915369

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1614488122

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1557979630

   I ran `git pull --rebase=interactive origin master` and eliminated the commits changing the base image requirements and deleting StorageV1 client. When I try to push those changes it gets rejected because the tip of my branch is behind the remote. It recommends another git pull to correct this but that just raises conflicts with the base image requirements and StorageV1 client. I'm not sure where to go from there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1210677588


##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
   with beam.Pipeline(options=pipeline_options) as p:
-
-    lines = p | ReadFromText(known_args.input)
-
-    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
-    split_lines_result = (
-        lines
-        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
-            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
-            main='words'))
-
-    # split_lines_result is an object of type DoOutputsTuple. It supports
-    # accessing result in alternative ways.
-    words, _, _ = split_lines_result
-    short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
-    character_count = split_lines_result.tag_character_count
-
-    # pylint: disable=expression-not-assigned
-    (
-        character_count
-        | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
-        | beam.GroupByKey()
-        | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
-        | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        short_words
-        | 'count short words' >> CountWords()
-        |
-        'write short words' >> WriteToText(known_args.output + '-short-words'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        words
-        | 'count words' >> CountWords()
-        | 'write words' >> WriteToText(known_args.output + '-words'))
+    try:
+      lines = p | ReadFromText(known_args.input)
+
+      # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+      split_lines_result = (
+          lines
+          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+              SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+              main='words'))
+
+      # split_lines_result is an object of type DoOutputsTuple. It supports
+      # accessing result in alternative ways.
+      words, _, _ = split_lines_result
+      short_words = split_lines_result[
+          SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+      character_count = split_lines_result.tag_character_count
+
+      # pylint: disable=expression-not-assigned
+      (
+          character_count
+          | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+          | beam.GroupByKey()
+          | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+          | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          short_words
+          | 'count short words' >> CountWords()
+          | 'write short words' >>
+          WriteToText(known_args.output + '-short-words'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          words
+          | 'count words' >> CountWords()
+          | 'write words' >> WriteToText(known_args.output + '-words'))
+    except BeamIOError as err:

Review Comment:
   ok, I am going to verify that pipeline still run on full graph for tests listed #26774 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1237755937


##########
sdks/python/apache_beam/runners/portability/sdk_container_builder.py:
##########
@@ -307,16 +306,16 @@ def _invoke_docker_build_and_push(self, container_image_name):
         "Python SDK container built and pushed as %s." % container_image_name)
 
   def _upload_to_gcs(self, local_file_path, gcs_location):
-    gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
-    request = storage.StorageObjectsInsertRequest(
-        bucket=gcs_bucket, name=gcs_object)
+    bucket_name, blob_name = self._get_gcs_bucket_and_name(gcs_location)
     _LOGGER.info('Starting GCS upload to %s...', gcs_location)
-    total_size = os.path.getsize(local_file_path)
     from apitools.base.py import exceptions
+    from google.cloud import storage

Review Comment:
   triggered the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1242631887


##########
sdks/python/apache_beam/io/gcp/gcsfilesystem.py:
##########
@@ -340,8 +329,7 @@ def metadata(self, path):
     """
     try:
       file_metadata = self._gcsIO()._status(path)
-      return FileMetadata(
-          path, file_metadata['size'], file_metadata['last_updated'])
+      return FileMetadata(path, file_metadata['size'], file_metadata['updated'])

Review Comment:
   The values didn't but the key names did. Using 'updated' over 'last_updated' seems to be the standard.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1242651594


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -270,186 +213,86 @@ def delete(self, path):
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsDeleteRequest(
-        bucket=bucket, object=object_path)
+    bucket_name, target_name = parse_gcs_path(path)
     try:
-      self.client.objects.Delete(request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # Return success when the file doesn't exist anymore for idempotency.
-        return
-      raise
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
+      bucket = self.client.get_bucket(bucket_name)
+      bucket.delete_blob(target_name)
+    except NotFound:
+      return
+
   def delete_batch(self, paths):
     """Deletes the objects at the given GCS paths.
 
     Args:
       paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
              not to exceed MAX_BATCH_OPERATION_SIZE in length.
-
-    Returns: List of tuples of (path, exception) in the same order as the paths
-             argument, where exception is None if the operation succeeded or
-             the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    start = 0
+    while start < len(paths):
+      if (start + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[start:start + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[start:]
+      try:
+        with self.client.batch():
+          for path in current_paths:
+            bucket_name, blob_path = parse_gcs_path(path)
+            bucket = self.client.get_bucket(bucket_name)
+            blob = storage.Blob(blob_path, bucket)
+            blob.delete()
+      except NotFound:
+        pass
+      start += MAX_BATCH_OPERATION_SIZE
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+    src_bucket_name, src_path = parse_gcs_path(src)
+    dest_bucket_name, dest_path = parse_gcs_path(dest)

Review Comment:
   I've updated it to do so. No reason to respecify the blob name if it isn't changing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1242715847


##########
sdks/python/apache_beam/runners/portability/sdk_container_builder.py:
##########
@@ -307,16 +306,16 @@ def _invoke_docker_build_and_push(self, container_image_name):
         "Python SDK container built and pushed as %s." % container_image_name)
 
   def _upload_to_gcs(self, local_file_path, gcs_location):
-    gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
-    request = storage.StorageObjectsInsertRequest(
-        bucket=gcs_bucket, name=gcs_object)
+    bucket_name, blob_name = self._get_gcs_bucket_and_name(gcs_location)
     _LOGGER.info('Starting GCS upload to %s...', gcs_location)
-    total_size = os.path.getsize(local_file_path)
     from apitools.base.py import exceptions
+    from google.cloud import storage

Review Comment:
   Maybe? Most of the FileSystems methods I'm looking at return a file stream, so it would look more like the previous implementation, though upload_from_filename does a very similar thing one level up. I guess the advantage would be the filesystem hopefully changes less often than the underlying IO, so maybe we wouldn't need to change it as often. I don't think this file is already using filesystems though, so it's not clear to me we gain much from doing that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1246707971


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -295,162 +208,92 @@ def delete_batch(self, paths):
              argument, where exception is None if the operation succeeded or
              the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    final_results = []
+    s = 0
+    while s < len(paths):
+      if (s + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[s:]
+      current_batch = self.client.batch(raise_exception=False)
+      with current_batch:
+        for path in current_paths:
+          bucket_name, blob_name = parse_gcs_path(path)
+          bucket = self.client.get_bucket(bucket_name)
+          blob = storage.Blob(blob_name, bucket)
+          blob.delete()
+
+      current_responses = current_batch._responses
+      for resp in current_responses:
+        if resp[1] == NotFound:
+          final_results.append((resp[0], None))
+        else:
+          final_results.append(resp)
+
+      s += MAX_BATCH_OPERATION_SIZE
+
+    return final_results
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
-    """Copies the given GCS object from src to dest.
+    src_bucket_name, src_blob_name = parse_gcs_path(src)
+    dest_bucket_name, dest_blob_name= parse_gcs_path(dest, object_optional=True)
+    src_bucket = self.get_bucket(src_bucket_name)
+    src_blob = src_bucket.get_blob(src_blob_name)
+    if not src_blob:
+      raise NotFound("Source %s not found", src)
+    dest_bucket = self.get_bucket(dest_bucket_name)
+    if not dest_blob_name:
+      dest_blob_name = None
+    src_bucket.copy_blob(src_blob, dest_bucket, new_name=dest_blob_name)
+
+  def copy_batch(self, src_dest_pairs):
+    """Copies the given GCS objects from src to dest.
 
     Args:
       src_dest_pairs: list of (src, dest) tuples of gs://<bucket>/<name> files
                       paths to copy from src to dest, not to exceed
                       MAX_BATCH_OPERATION_SIZE in length.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite call will return after these many bytes. Used
-        primarily for testing.
 
     Returns: List of tuples of (src, dest, exception) in the same order as the
              src_dest_pairs argument, where exception is None if the operation
              succeeded or the relevant exception if the operation failed.
     """
-    if not src_dest_pairs:
-      return []
-    pair_to_request = {}
-    for pair in src_dest_pairs:
-      src_bucket, src_path = parse_gcs_path(pair[0])
-      dest_bucket, dest_path = parse_gcs_path(pair[1])
-      request = storage.StorageObjectsRewriteRequest(
-          sourceBucket=src_bucket,
-          sourceObject=src_path,
-          destinationBucket=dest_bucket,
-          destinationObject=dest_path,
-          destinationKmsKeyName=dest_kms_key_name,
-          maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-      pair_to_request[pair] = request
-    pair_to_status = {}
-    while True:
-      pairs_in_batch = list(set(src_dest_pairs) - set(pair_to_status))
-      if not pairs_in_batch:
-        break
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for pair in pairs_in_batch:
-        batch_request.Add(self.client.objects, 'Rewrite', pair_to_request[pair])
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for pair, api_call in zip(pairs_in_batch, api_calls):
-        src, dest = pair
-        response = api_call.response
-        if self._rewrite_cb is not None:
-          self._rewrite_cb(response)
-        if api_call.is_error:
-          exception = api_call.exception
-          # Translate 404 to the appropriate not found exception.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = (
-                GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
-          pair_to_status[pair] = exception
-        elif not response.done:
-          _LOGGER.debug(
-              'Rewrite progress: %d of %d bytes, %s to %s',
-              response.totalBytesRewritten,
-              response.objectSize,
-              src,
-              dest)
-          pair_to_request[pair].rewriteToken = response.rewriteToken
-        else:
-          _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-          pair_to_status[pair] = None
+    final_results = []
+    s = 0
+    while s < len(src_dest_pairs):
+      if (s + MAX_BATCH_OPERATION_SIZE) < len(src_dest_pairs):
+        current_pairs = src_dest_pairs[s:s + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_pairs = src_dest_pairs[s:]
+      current_batch = self.client.batch(raise_exception=False)
+      with current_batch:
+        for pair in current_pairs:
+          src_bucket_name, src_blob_name = parse_gcs_path(pair[0])
+          dest_bucket_name, dest_blob_name = parse_gcs_path(pair[1])
+          src_bucket = self.client.get_bucket(src_bucket_name)
+          src_blob = src_bucket.get_blob(src_blob_name)
+          dest_bucket = self.client.get_bucket(dest_bucket_name)
+
+          src_bucket.copy_blob(src_blob, dest_bucket, dest_blob_name)
 
-    return [(pair[0], pair[1], pair_to_status[pair]) for pair in src_dest_pairs]
+      final_results += current_batch._responses

Review Comment:
   Good catch. Adding to integration tests to enforce that structure in the responses. I'll let you know when it's resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1574264000

   Left some more comment regarding previous unresolved comments. For the performance, I will do some investigation on my own next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1211842030


##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
   with beam.Pipeline(options=pipeline_options) as p:
-
-    lines = p | ReadFromText(known_args.input)
-
-    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
-    split_lines_result = (
-        lines
-        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
-            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
-            main='words'))
-
-    # split_lines_result is an object of type DoOutputsTuple. It supports
-    # accessing result in alternative ways.
-    words, _, _ = split_lines_result
-    short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
-    character_count = split_lines_result.tag_character_count
-
-    # pylint: disable=expression-not-assigned
-    (
-        character_count
-        | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
-        | beam.GroupByKey()
-        | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
-        | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        short_words
-        | 'count short words' >> CountWords()
-        |
-        'write short words' >> WriteToText(known_args.output + '-short-words'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        words
-        | 'count words' >> CountWords()
-        | 'write words' >> WriteToText(known_args.output + '-words'))
+    try:
+      lines = p | ReadFromText(known_args.input)
+
+      # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+      split_lines_result = (
+          lines
+          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+              SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+              main='words'))
+
+      # split_lines_result is an object of type DoOutputsTuple. It supports
+      # accessing result in alternative ways.
+      words, _, _ = split_lines_result
+      short_words = split_lines_result[
+          SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+      character_count = split_lines_result.tag_character_count
+
+      # pylint: disable=expression-not-assigned
+      (
+          character_count
+          | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+          | beam.GroupByKey()
+          | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+          | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          short_words
+          | 'count short words' >> CountWords()
+          | 'write short words' >>
+          WriteToText(known_args.output + '-short-words'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          words
+          | 'count words' >> CountWords()
+          | 'write words' >> WriteToText(known_args.output + '-words'))
+    except BeamIOError as err:

Review Comment:
   I'm not sure this counts as a breaking change for external behavior, which I expect would be the same. And wordcount is not an unrelated test. It's using the GCSIO to write. Or am I misunderstanding something?
   
   Please let me know how the test goes. If it actually can access and write to GCS without credentials then we may need to reconsider.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1212282164


##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
   with beam.Pipeline(options=pipeline_options) as p:
-
-    lines = p | ReadFromText(known_args.input)
-
-    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
-    split_lines_result = (
-        lines
-        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
-            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
-            main='words'))
-
-    # split_lines_result is an object of type DoOutputsTuple. It supports
-    # accessing result in alternative ways.
-    words, _, _ = split_lines_result
-    short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
-    character_count = split_lines_result.tag_character_count
-
-    # pylint: disable=expression-not-assigned
-    (
-        character_count
-        | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
-        | beam.GroupByKey()
-        | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
-        | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        short_words
-        | 'count short words' >> CountWords()
-        |
-        'write short words' >> WriteToText(known_args.output + '-short-words'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        words
-        | 'count words' >> CountWords()
-        | 'write words' >> WriteToText(known_args.output + '-words'))
+    try:
+      lines = p | ReadFromText(known_args.input)
+
+      # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+      split_lines_result = (
+          lines
+          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+              SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+              main='words'))
+
+      # split_lines_result is an object of type DoOutputsTuple. It supports
+      # accessing result in alternative ways.
+      words, _, _ = split_lines_result
+      short_words = split_lines_result[
+          SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+      character_count = split_lines_result.tag_character_count
+
+      # pylint: disable=expression-not-assigned
+      (
+          character_count
+          | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+          | beam.GroupByKey()
+          | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+          | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          short_words
+          | 'count short words' >> CountWords()
+          | 'write short words' >>
+          WriteToText(known_args.output + '-short-words'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          words
+          | 'count words' >> CountWords()
+          | 'write words' >> WriteToText(known_args.output + '-words'))
+    except BeamIOError as err:

Review Comment:
   I see, the error happens when there is no credential provided. log:
   
   ```
   $ python -m apache_beam.examples.wordcount_minimal --output /Users/.../Desktop/outputoutput.txt
   INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
   INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
   INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: timed out
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: timed out
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth._default:Authentication failed using Compute Engine authentication due to unavailable metadata server.
   WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.
   Connecting anonymously.
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth._default:Authentication failed using Compute Engine authentication due to unavailable metadata server.
   Traceback (most recent call last):
   File "/Users/.../beam/sdks/python/apache_beam/examples/wordcount_minimal.py", line 148, in <module>
       main()
     File "/Users/.../beam/sdks/python/apache_beam/examples/wordcount_minimal.py", line 123, in main
       lines = p | ReadFromText(known_args.input)
     File "/Users/.../beam/sdks/python/apache_beam/io/textio.py", line 781, in __init__
       self._source = self._source_class(
     File "/Users/.../beam/sdks/python/apache_beam/io/textio.py", line 140, in __init__
       super().__init__(
     File "/Users/.../beam/sdks/python/apache_beam/io/filebasedsource.py", line 127, in __init__
       self._validate()
     File "/Users/.../beam/sdks/python/apache_beam/options/value_provider.py", line 193, in _f
       return fnc(self, *args, **kwargs)
     File "/Users/.../beam/sdks/python/apache_beam/io/filebasedsource.py", line 188, in _validate
       match_result = FileSystems.match([pattern], limits=[1])[0]
     File "/Users/.../beam/sdks/python/apache_beam/io/filesystems.py", line 204, in match
       return filesystem.match(patterns, limits)
     File "/Users/.../beam/sdks/python/apache_beam/io/filesystem.py", line 804, in match
       raise BeamIOError("Match operation failed", exceptions)
   ```
   
   No gcloud credential caused pipeline expansion fail at "lines = p | ReadFromText(known_args.input)" this is the very beginning of the test. 
   
   https://github.com/apache/beam/blob/7179cce5624c304d5f8f7ba69dc73e96b9823d31/sdks/python/apache_beam/examples/wordcount_minimal.py#LL125C16-L125C16
   
   pipeline is not initiatiated successfully. The pipeline expansion is incomplete and lines after ReadFromText and before catch is never executed. This is a breaking change - read from public bucket anonymously now fails.
   
   Note that wordcount is the hello world example and first time user is guided to run it. We provide the public bucket and user do not need google cloud account to run these examples. We should fix the test.
   
   If cloud-storage-client does not support anonymous connection (would be a surprise) we would need to make this migration opt-in instead of replacement.
   
   on master, the test runs successfully and a file gets written. log:
   ```
   INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
   INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
   INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: timed out
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: timed out
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth._default:Authentication failed using Compute Engine authentication due to unavailable metadata server.
   WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.
   Connecting anonymously.
   INFO:root:Default Python SDK image for environment is apache/beam_python3.8_sdk:2.49.0.dev
   INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function annotate_downstream_side_inputs at 0x11255ee50> ====================
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on a diff in pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1159859856


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -303,149 +256,79 @@ def delete_batch(self, paths):
       paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
       if not paths_chunk:
         return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+      with self.client.batch():
+        for path in paths_chunk:
+          bucket, blob_path = parse_gcs_path(path)
+          blob = storage.Blob(blob_path, bucket)
+          exception = None
+          try:
+            blob.delete()
+          except Exception as err:
+            if err is NotFound:
+              exception = None
+            else:
+              exception = err
+          finally:
+            result_statuses.append((path, exception))
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
+      !!!

Review Comment:
   Yeah, I use this to mark comments/commented out code that I intend to delete so I can find it quickly. Forgot to delete the exclamation marks themselves in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1577215796

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1558005821

   note that force-push may somewhat disrupt comment history @ github. Have all comments from @Abacn been addressed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1574100776

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1572539798

   Anonymous credential is now resolved. There are two remaining concerns
   - kms support
     kms key is user supplied and managed in gcp's kms store. It is indepenent of gcs. I don't think client can manage kms it automatically without user specifying it.
   
    For example, if you preserve the kms_key_name parameter in gcsio's constructor (but just not use it), the test would fail. This is a breaking change.
   
   - performance
   
   Recently there is an improvement of Python synthetic source (26697) which should speed up the source by 2x. Rebasing onto the latest master, then remove the ReShuffle transform in filebasedio_perf_test for write, then trigger the test with a synthetic source of `{"algorithm": "lcg"}`, is it possible to see the hot path for the CPU time now? (reshuffle and source are no longer major contribution).
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1503490721

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1502199789

   
   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1502208618

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1492184988

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1505533822

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1551801784

   I typically do: `git pull --rebase=interactive origin master`. You can drop the commit that regenerates requirements (if you have it as separate commit), or accept current master changes and regenerate later. You can also squash some of the commits you have.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1611789885

   Run Python Dataflow ValidatesContainer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1248210145


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -295,162 +208,92 @@ def delete_batch(self, paths):
              argument, where exception is None if the operation succeeded or
              the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    final_results = []
+    s = 0
+    while s < len(paths):
+      if (s + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[s:]
+      current_batch = self.client.batch(raise_exception=False)
+      with current_batch:
+        for path in current_paths:
+          bucket_name, blob_name = parse_gcs_path(path)
+          bucket = self.client.get_bucket(bucket_name)
+          blob = storage.Blob(blob_name, bucket)
+          blob.delete()
+
+      current_responses = current_batch._responses
+      for resp in current_responses:
+        if resp[1] == NotFound:
+          final_results.append((resp[0], None))
+        else:
+          final_results.append(resp)
+
+      s += MAX_BATCH_OPERATION_SIZE
+
+    return final_results
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
-    """Copies the given GCS object from src to dest.
+    src_bucket_name, src_blob_name = parse_gcs_path(src)
+    dest_bucket_name, dest_blob_name= parse_gcs_path(dest, object_optional=True)
+    src_bucket = self.get_bucket(src_bucket_name)
+    src_blob = src_bucket.get_blob(src_blob_name)
+    if not src_blob:
+      raise NotFound("Source %s not found", src)
+    dest_bucket = self.get_bucket(dest_bucket_name)
+    if not dest_blob_name:
+      dest_blob_name = None
+    src_bucket.copy_blob(src_blob, dest_bucket, new_name=dest_blob_name)
+
+  def copy_batch(self, src_dest_pairs):
+    """Copies the given GCS objects from src to dest.
 
     Args:
       src_dest_pairs: list of (src, dest) tuples of gs://<bucket>/<name> files
                       paths to copy from src to dest, not to exceed
                       MAX_BATCH_OPERATION_SIZE in length.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite call will return after these many bytes. Used
-        primarily for testing.
 
     Returns: List of tuples of (src, dest, exception) in the same order as the
              src_dest_pairs argument, where exception is None if the operation
              succeeded or the relevant exception if the operation failed.
     """
-    if not src_dest_pairs:
-      return []
-    pair_to_request = {}
-    for pair in src_dest_pairs:
-      src_bucket, src_path = parse_gcs_path(pair[0])
-      dest_bucket, dest_path = parse_gcs_path(pair[1])
-      request = storage.StorageObjectsRewriteRequest(
-          sourceBucket=src_bucket,
-          sourceObject=src_path,
-          destinationBucket=dest_bucket,
-          destinationObject=dest_path,
-          destinationKmsKeyName=dest_kms_key_name,
-          maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-      pair_to_request[pair] = request
-    pair_to_status = {}
-    while True:
-      pairs_in_batch = list(set(src_dest_pairs) - set(pair_to_status))
-      if not pairs_in_batch:
-        break
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for pair in pairs_in_batch:
-        batch_request.Add(self.client.objects, 'Rewrite', pair_to_request[pair])
-      api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
-      for pair, api_call in zip(pairs_in_batch, api_calls):
-        src, dest = pair
-        response = api_call.response
-        if self._rewrite_cb is not None:
-          self._rewrite_cb(response)
-        if api_call.is_error:
-          exception = api_call.exception
-          # Translate 404 to the appropriate not found exception.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = (
-                GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
-          pair_to_status[pair] = exception
-        elif not response.done:
-          _LOGGER.debug(
-              'Rewrite progress: %d of %d bytes, %s to %s',
-              response.totalBytesRewritten,
-              response.objectSize,
-              src,
-              dest)
-          pair_to_request[pair].rewriteToken = response.rewriteToken
-        else:
-          _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-          pair_to_status[pair] = None
+    final_results = []
+    s = 0
+    while s < len(src_dest_pairs):
+      if (s + MAX_BATCH_OPERATION_SIZE) < len(src_dest_pairs):
+        current_pairs = src_dest_pairs[s:s + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_pairs = src_dest_pairs[s:]
+      current_batch = self.client.batch(raise_exception=False)
+      with current_batch:
+        for pair in current_pairs:
+          src_bucket_name, src_blob_name = parse_gcs_path(pair[0])
+          dest_bucket_name, dest_blob_name = parse_gcs_path(pair[1])
+          src_bucket = self.client.get_bucket(src_bucket_name)
+          src_blob = src_bucket.get_blob(src_blob_name)
+          dest_bucket = self.client.get_bucket(dest_bucket_name)
+
+          src_bucket.copy_blob(src_blob, dest_bucket, dest_blob_name)
 
-    return [(pair[0], pair[1], pair_to_status[pair]) for pair in src_dest_pairs]
+      final_results += current_batch._responses

Review Comment:
   as discussed this will be a followup after this PR get's in



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1614663849

   Run Python 3.11 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: Replace StorageV1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1611862714

   Run Python Dataflow ValidatesContainer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1245547218


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -59,20 +48,10 @@
 
 _LOGGER = logging.getLogger(__name__)
 
-# Issue a friendlier error message if the storage library is not available.
-# TODO(silviuc): Remove this guard when storage is available everywhere.
 try:
-  # pylint: disable=wrong-import-order, wrong-import-position
-  # pylint: disable=ungrouped-imports
-  from apitools.base.py.batch import BatchApiRequest
-  from apitools.base.py.exceptions import HttpError
-  from apitools.base.py import transfer
   from apache_beam.internal.gcp import auth
-  from apache_beam.io.gcp.internal.clients import storage
 except ImportError:
-  raise ImportError(
-      'Google Cloud Storage I/O not supported for this execution environment '
-      '(could not import storage API client).')
+  raise ImportError('Internal auth library not found')

Review Comment:
   we shouldn't expect this import to fail 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1245549269


##########
sdks/python/apache_beam/runners/portability/sdk_container_builder.py:
##########
@@ -307,16 +306,16 @@ def _invoke_docker_build_and_push(self, container_image_name):
         "Python SDK container built and pushed as %s." % container_image_name)
 
   def _upload_to_gcs(self, local_file_path, gcs_location):
-    gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
-    request = storage.StorageObjectsInsertRequest(
-        bucket=gcs_bucket, name=gcs_object)
+    bucket_name, blob_name = self._get_gcs_bucket_and_name(gcs_location)
     _LOGGER.info('Starting GCS upload to %s...', gcs_location)
-    total_size = os.path.getsize(local_file_path)
     from apitools.base.py import exceptions
+    from google.cloud import storage

Review Comment:
   ok. up to you, if tests pass, I am ok to proceed as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1518009694

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1518140084

   Run Python TextIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1483023393

   Stopping reviewer notifications for this pull request: requested by reviewer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1511377727

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] BjornPrime commented on pull request #25965: [DO NOT APPROVE; testing purposes only] Replace storage v1 client with GCS client

Posted by "BjornPrime (via GitHub)" <gi...@apache.org>.
BjornPrime commented on PR #25965:
URL: https://github.com/apache/beam/pull/25965#issuecomment-1513296741

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org