You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:20 UTC

[31/50] beam git commit: Fix the staging directory path in copying from GCS

Fix the staging directory path in copying from GCS


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/171a9930
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/171a9930
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/171a9930

Branch: refs/heads/DSL_SQL
Commit: 171a993044d97c42f027e1ec44436a3b8af7c32f
Parents: 6fed177
Author: Sourabh Bajaj <so...@google.com>
Authored: Tue Jun 6 12:55:42 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Jun 6 16:43:58 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/internal/dependency.py           |  7 ++++++-
 .../runners/dataflow/internal/dependency_test.py      | 14 +++++++++++++-
 2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/171a9930/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 3a0ff46..e656600 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -181,7 +181,12 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir,
           staging_temp_dir = tempfile.mkdtemp(dir=temp_dir)
         logging.info('Downloading extra package: %s locally before staging',
                      package)
-        _dependency_file_copy(package, staging_temp_dir)
+        if os.path.isfile(staging_temp_dir):
+          local_file_path = staging_temp_dir
+        else:
+          _, last_component = FileSystems.split(package)
+          local_file_path = FileSystems.join(staging_temp_dir, last_component)
+        _dependency_file_copy(package, local_file_path)
       else:
         raise RuntimeError(
             'The file %s cannot be found. It was specified in the '

http://git-wip-us.apache.org/repos/asf/beam/blob/171a9930/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
index 5eac7d6..e555b69 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
@@ -31,6 +31,16 @@ from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
 
 
+# Protect against environments where GCS library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  HttpError = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class SetupTest(unittest.TestCase):
 
   def update_options(self, options):
@@ -369,7 +379,9 @@ class SetupTest(unittest.TestCase):
       if from_path.startswith('gs://'):
         gcs_copied_files.append(from_path)
         _, from_name = os.path.split(from_path)
-        self.create_temp_file(os.path.join(to_path, from_name), 'nothing')
+        if os.path.isdir(to_path):
+          to_path = os.path.join(to_path, from_name)
+        self.create_temp_file(to_path, 'nothing')
         logging.info('Fake copied GCS file: %s to %s', from_path, to_path)
       elif to_path.startswith('gs://'):
         logging.info('Faking file_copy(%s, %s)', from_path, to_path)