You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/28 22:15:11 UTC

[1/2] beam git commit: Replace gsutil with storage API calls

Repository: beam
Updated Branches:
  refs/heads/master 78a8d7d6d -> f138b3569


Replace gsutil with storage API calls

This commit replaces the `gsutil cp` call with GCS API call when staging
resources.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32f218db
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32f218db
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32f218db

Branch: refs/heads/master
Commit: 32f218dbddea5e904ab325ce9e4165b01e4d30d7
Parents: 78a8d7d
Author: David Volquartz Lebech <da...@lebech.info>
Authored: Mon Mar 27 12:18:18 2017 +0200
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Mar 28 15:14:01 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/gcsio.py         |  1 -
 .../runners/dataflow/internal/dependency.py     | 22 +++++++++++++++++---
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/32f218db/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 285e272..0a10094 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -754,7 +754,6 @@ class GcsBufferedWriter(object):
     self.path = path
     self.mode = mode
     self.bucket, self.name = parse_gcs_path(path)
-    self.mode = mode
 
     self.closed = False
     self.position = 0

http://git-wip-us.apache.org/repos/asf/beam/blob/32f218db/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 60630e9..22de5c6 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -52,6 +52,7 @@ TODO(silviuc): Should we allow several setup packages?
 TODO(silviuc): We should allow customizing the exact command for setup build.
 """
 
+import functools
 import glob
 import logging
 import os
@@ -87,9 +88,24 @@ def _dependency_file_copy(from_path, to_path):
   """Copies a local file to a GCS file or vice versa."""
   logging.info('file copy from %s to %s.', from_path, to_path)
   if from_path.startswith('gs://') or to_path.startswith('gs://'):
-    command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path]
-    logging.info('Executing command: %s', command_args)
-    processes.check_call(command_args)
+    from apache_beam.io.gcp import gcsio
+    if from_path.startswith('gs://') and to_path.startswith('gs://'):
+      # Both files are GCS files so copy.
+      gcsio.GcsIO().copy(from_path, to_path)
+    elif to_path.startswith('gs://'):
+      # Only target is a GCS file, read local file and upload.
+      with open(from_path, 'rb') as f:
+        with gcsio.GcsIO().open(to_path, mode='wb') as g:
+          pfun = functools.partial(f.read, gcsio.WRITE_CHUNK_SIZE)
+          for chunk in iter(pfun, ''):
+            g.write(chunk)
+    else:
+      # Source is a GCS file but target is local file.
+      with gcsio.GcsIO().open(from_path, mode='rb') as g:
+        with open(to_path, 'wb') as f:
+          pfun = functools.partial(g.read, gcsio.DEFAULT_READ_BUFFER_SIZE)
+          for chunk in iter(pfun, ''):
+            f.write(chunk)
   else:
     # Branch used only for unit tests and integration tests.
     # In such environments GCS support is not available.


[2/2] beam git commit: This closes #2333

Posted by al...@apache.org.
This closes #2333


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f138b356
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f138b356
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f138b356

Branch: refs/heads/master
Commit: f138b3569b191b8e1ff377939e3c5492e744b65e
Parents: 78a8d7d 32f218d
Author: Ahmet Altay <al...@google.com>
Authored: Tue Mar 28 15:14:57 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Mar 28 15:14:57 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/gcsio.py         |  1 -
 .../runners/dataflow/internal/dependency.py     | 22 +++++++++++++++++---
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------