You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:20 UTC
[31/50] beam git commit: Fix the staging directory path in copying
from GCS
Fix the staging directory path in copying from GCS
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/171a9930
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/171a9930
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/171a9930
Branch: refs/heads/DSL_SQL
Commit: 171a993044d97c42f027e1ec44436a3b8af7c32f
Parents: 6fed177
Author: Sourabh Bajaj <so...@google.com>
Authored: Tue Jun 6 12:55:42 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Jun 6 16:43:58 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/internal/dependency.py | 7 ++++++-
.../runners/dataflow/internal/dependency_test.py | 14 +++++++++++++-
2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/171a9930/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 3a0ff46..e656600 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -181,7 +181,12 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir,
staging_temp_dir = tempfile.mkdtemp(dir=temp_dir)
logging.info('Downloading extra package: %s locally before staging',
package)
- _dependency_file_copy(package, staging_temp_dir)
+ if os.path.isfile(staging_temp_dir):
+ local_file_path = staging_temp_dir
+ else:
+ _, last_component = FileSystems.split(package)
+ local_file_path = FileSystems.join(staging_temp_dir, last_component)
+ _dependency_file_copy(package, local_file_path)
else:
raise RuntimeError(
'The file %s cannot be found. It was specified in the '
http://git-wip-us.apache.org/repos/asf/beam/blob/171a9930/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
index 5eac7d6..e555b69 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
@@ -31,6 +31,16 @@ from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
+# Protect against environments where GCS library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+ from apitools.base.py.exceptions import HttpError
+except ImportError:
+ HttpError = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class SetupTest(unittest.TestCase):
def update_options(self, options):
@@ -369,7 +379,9 @@ class SetupTest(unittest.TestCase):
if from_path.startswith('gs://'):
gcs_copied_files.append(from_path)
_, from_name = os.path.split(from_path)
- self.create_temp_file(os.path.join(to_path, from_name), 'nothing')
+ if os.path.isdir(to_path):
+ to_path = os.path.join(to_path, from_name)
+ self.create_temp_file(to_path, 'nothing')
logging.info('Fake copied GCS file: %s to %s', from_path, to_path)
elif to_path.startswith('gs://'):
logging.info('Faking file_copy(%s, %s)', from_path, to_path)