You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2020/01/09 16:58:17 UTC

[beam] branch master updated: [BEAM-9078] Pass total_size to storage.Upload

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f79d277  [BEAM-9078] Pass total_size to storage.Upload
     new e60d49b  Merge pull request #10544 from bradgwest/BEAM-9078
f79d277 is described below

commit f79d277c3bb89e951fc7b5e6332d40237659aadd
Author: Brad West <br...@gmail.com>
AuthorDate: Thu Jan 9 08:24:41 2020 -0700

    [BEAM-9078] Pass total_size to storage.Upload
    
    It's possible for the tarball uploaded to GCS to be quite large. An example
    is a user vendoring multiple dependencies in their tarball so as to achieve
    a more stable deployable artifact.
    
    Before this change the GCS upload api call executed a multipart upload,
    which Google [documentation](https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload)
    states should be used when the file is small enough to upload again when
    the connection fails. For large tarballs, we will hit 60 second socket
    timeouts before completing the multipart upload. By passing `total_size`,
    apitools first checks if the size exceeds the resumable upload threshold, and
    executes the more robust resumable upload rather than a multipart, avoiding
    socket timeouts.
---
 sdks/python/apache_beam/runners/dataflow/internal/apiclient.py | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 06f1340..200dbcc 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -481,8 +481,9 @@ class DataflowApplicationClient(object):
   @retry.no_retries  # Using no_retries marks this as an integration point.
   def _gcs_file_copy(self, from_path, to_path):
     to_folder, to_name = os.path.split(to_path)
+    total_size = os.path.getsize(from_path)
     with open(from_path, 'rb') as f:
-      self.stage_file(to_folder, to_name, f)
+      self.stage_file(to_folder, to_name, f, total_size=total_size)
 
   def _stage_resources(self, options):
     google_cloud_options = options.view_as(GoogleCloudOptions)
@@ -499,7 +500,7 @@ class DataflowApplicationClient(object):
     return resources
 
   def stage_file(self, gcs_or_local_path, file_name, stream,
-                 mime_type='application/octet-stream'):
+                 mime_type='application/octet-stream', total_size=None):
     """Stages a file at a GCS or local path with stream-supplied contents."""
     if not gcs_or_local_path.startswith('gs://'):
       local_path = FileSystems.join(gcs_or_local_path, file_name)
@@ -514,7 +515,7 @@ class DataflowApplicationClient(object):
         bucket=bucket, name=name)
     start_time = time.time()
     _LOGGER.info('Starting GCS upload to %s...', gcs_location)
-    upload = storage.Upload(stream, mime_type)
+    upload = storage.Upload(stream, mime_type, total_size)
     try:
       response = self._storage_client.objects.Insert(request, upload=upload)
     except exceptions.HttpError as e: