You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/23 19:54:58 UTC

[1/2] beam git commit: [BEAM-1988] Migrate from utils.path to BFS

Repository: beam
Updated Branches:
  refs/heads/master 1dce98f07 -> a67019739


[BEAM-1988] Migrate from utils.path to BFS


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9b7bbc2d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9b7bbc2d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9b7bbc2d

Branch: refs/heads/master
Commit: 9b7bbc2dbce01605fa4a4e8ddebaf2bc648d6f9b
Parents: 1dce98f
Author: Sourabh Bajaj <so...@google.com>
Authored: Fri Apr 21 16:39:34 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Sun Apr 23 12:54:46 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py |  5 +-
 .../apache_beam/io/gcp/gcsfilesystem_test.py    | 10 ++-
 .../runners/dataflow/internal/apiclient.py      |  8 ++-
 .../runners/dataflow/internal/dependency.py     | 23 ++++---
 .../dataflow/internal/dependency_test.py        |  5 +-
 sdks/python/apache_beam/utils/__init__.py       |  4 --
 sdks/python/apache_beam/utils/path.py           | 46 -------------
 sdks/python/apache_beam/utils/path_test.py      | 70 --------------------
 8 files changed, 31 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index 99f27f8..fdc4757 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -46,10 +46,7 @@ class GCSFileSystem(FileSystem):
       raise ValueError('Basepath %r must be GCS path.', basepath)
     path = basepath
     for p in paths:
-      if path == '' or path.endswith('/'):
-        path += p
-      else:
-        path += '/' + p
+      path = path.rstrip('/') + '/' + p.lstrip('/')
     return path
 
   def mkdirs(self, path):

http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index d6a8fd7..0669bf2 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -42,8 +42,16 @@ class GCSFileSystemTest(unittest.TestCase):
                      file_system.join('gs://bucket/path', 'to', 'file'))
     self.assertEqual('gs://bucket/path/to/file',
                      file_system.join('gs://bucket/path', 'to/file'))
-    self.assertEqual('gs://bucket/path//to/file',
+    self.assertEqual('gs://bucket/path/to/file',
                      file_system.join('gs://bucket/path', '/to/file'))
+    self.assertEqual('gs://bucket/path/to/file',
+                     file_system.join('gs://bucket/path/', 'to', 'file'))
+    self.assertEqual('gs://bucket/path/to/file',
+                     file_system.join('gs://bucket/path/', 'to/file'))
+    self.assertEqual('gs://bucket/path/to/file',
+                     file_system.join('gs://bucket/path/', '/to/file'))
+    with self.assertRaises(ValueError):
+      file_system.join('/bucket/path/', '/to/file')
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
   def test_match_multiples(self, mock_gcsio):

http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 24e1129..d95b33f 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -30,9 +30,9 @@ from datetime import datetime
 from apitools.base.py import encoding
 from apitools.base.py import exceptions
 
-from apache_beam import utils
 from apache_beam.internal.gcp.auth import get_service_credentials
 from apache_beam.internal.gcp.json_value import to_json_value
+from apache_beam.io.filesystems_util import get_filesystem
 from apache_beam.io.gcp.internal.clients import storage
 from apache_beam.runners.dataflow.internal import dependency
 from apache_beam.runners.dataflow.internal.clients import dataflow
@@ -336,10 +336,12 @@ class Job(object):
     # for GCS staging locations where the potential for such clashes is high.
     if self.google_cloud_options.staging_location.startswith('gs://'):
       path_suffix = '%s.%f' % (self.google_cloud_options.job_name, time.time())
-      self.google_cloud_options.staging_location = utils.path.join(
+      filesystem = get_filesystem(self.google_cloud_options.staging_location)
+      self.google_cloud_options.staging_location = filesystem.join(
           self.google_cloud_options.staging_location, path_suffix)
-      self.google_cloud_options.temp_location = utils.path.join(
+      self.google_cloud_options.temp_location = filesystem.join(
           self.google_cloud_options.temp_location, path_suffix)
+
     self.proto = dataflow.Job(name=self.google_cloud_options.job_name)
     if self.options.view_as(StandardOptions).streaming:
       self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING

http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 1f28b26..bb490f3 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -61,10 +61,9 @@ import shutil
 import sys
 import tempfile
 
-
-from apache_beam import utils
 from apache_beam import version as beam_version
 from apache_beam.internal import pickler
+from apache_beam.io.filesystems_util import get_filesystem
 from apache_beam.runners.dataflow.internal import names
 from apache_beam.utils import processes
 from apache_beam.utils.pipeline_options import GoogleCloudOptions
@@ -158,6 +157,7 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir,
       name patterns.
   """
   resources = []
+  staging_filesystem = get_filesystem(staging_location)
   staging_temp_dir = None
   local_packages = []
   for package in extra_packages:
@@ -190,13 +190,14 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir,
       local_packages.append(package)
 
   if staging_temp_dir:
+    temp_fs = get_filesystem(staging_temp_dir)
     local_packages.extend(
-        [utils.path.join(staging_temp_dir, f) for f in os.listdir(
+        [temp_fs.join(staging_temp_dir, f) for f in os.listdir(
             staging_temp_dir)])
 
   for package in local_packages:
     basename = os.path.basename(package)
-    staged_path = utils.path.join(staging_location, basename)
+    staged_path = staging_filesystem.join(staging_location, basename)
     file_copy(package, staged_path)
     resources.append(basename)
   # Create a file containing the list of extra packages and stage it.
@@ -209,7 +210,7 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir,
   with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
     for package in local_packages:
       f.write('%s\n' % os.path.basename(package))
-  staged_path = utils.path.join(staging_location, EXTRA_PACKAGES_FILE)
+  staged_path = staging_filesystem.join(staging_location, EXTRA_PACKAGES_FILE)
   # Note that the caller of this function is responsible for deleting the
   # temporary folder where all temp files are created, including this one.
   file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
@@ -284,13 +285,15 @@ def stage_job_resources(
     raise RuntimeError(
         'The --temp_location option must be specified.')
 
+  filesystem = get_filesystem(google_cloud_options.staging_location)
+
   # Stage a requirements file if present.
   if setup_options.requirements_file is not None:
     if not os.path.isfile(setup_options.requirements_file):
       raise RuntimeError('The file %s cannot be found. It was specified in the '
                          '--requirements_file command line option.' %
                          setup_options.requirements_file)
-    staged_path = utils.path.join(google_cloud_options.staging_location,
+    staged_path = filesystem.join(google_cloud_options.staging_location,
                                   REQUIREMENTS_FILE)
     file_copy(setup_options.requirements_file, staged_path)
     resources.append(REQUIREMENTS_FILE)
@@ -305,7 +308,7 @@ def stage_job_resources(
     populate_requirements_cache(
         setup_options.requirements_file, requirements_cache_path)
     for pkg in  glob.glob(os.path.join(requirements_cache_path, '*')):
-      file_copy(pkg, utils.path.join(google_cloud_options.staging_location,
+      file_copy(pkg, filesystem.join(google_cloud_options.staging_location,
                                      os.path.basename(pkg)))
       resources.append(os.path.basename(pkg))
 
@@ -324,7 +327,7 @@ def stage_job_resources(
           'setup.py instead of %s' % setup_options.setup_file)
     tarball_file = _build_setup_package(setup_options.setup_file, temp_dir,
                                         build_setup_args)
-    staged_path = utils.path.join(google_cloud_options.staging_location,
+    staged_path = filesystem.join(google_cloud_options.staging_location,
                                   WORKFLOW_TARBALL_FILE)
     file_copy(tarball_file, staged_path)
     resources.append(WORKFLOW_TARBALL_FILE)
@@ -344,7 +347,7 @@ def stage_job_resources(
     pickled_session_file = os.path.join(temp_dir,
                                         names.PICKLED_MAIN_SESSION_FILE)
     pickler.dump_session(pickled_session_file)
-    staged_path = utils.path.join(google_cloud_options.staging_location,
+    staged_path = filesystem.join(google_cloud_options.staging_location,
                                   names.PICKLED_MAIN_SESSION_FILE)
     file_copy(pickled_session_file, staged_path)
     resources.append(names.PICKLED_MAIN_SESSION_FILE)
@@ -359,7 +362,7 @@ def stage_job_resources(
     else:
       stage_tarball_from_remote_location = False
 
-    staged_path = utils.path.join(google_cloud_options.staging_location,
+    staged_path = filesystem.join(google_cloud_options.staging_location,
                                   names.DATAFLOW_SDK_TARBALL_FILE)
     if stage_tarball_from_remote_location:
       # If --sdk_location is not specified then the appropriate package

http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
index 545bcd6..24f65d0 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
@@ -23,7 +23,7 @@ import shutil
 import tempfile
 import unittest
 
-from apache_beam import utils
+from apache_beam.io.filesystems_util import get_filesystem
 from apache_beam.runners.dataflow.internal import dependency
 from apache_beam.runners.dataflow.internal import names
 from apache_beam.utils.pipeline_options import GoogleCloudOptions
@@ -241,7 +241,8 @@ class SetupTest(unittest.TestCase):
     def file_copy(from_path, to_path):
       if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE):
         self.assertEqual(expected_from_path, from_path)
-        self.assertEqual(utils.path.join(expected_to_dir,
+        filesystem = get_filesystem(expected_to_dir)
+        self.assertEqual(filesystem.join(expected_to_dir,
                                          names.DATAFLOW_SDK_TARBALL_FILE),
                          to_path)
       if from_path.startswith('gs://') or to_path.startswith('gs://'):

http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/utils/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/__init__.py b/sdks/python/apache_beam/utils/__init__.py
index 20ff35d..74cf45d 100644
--- a/sdks/python/apache_beam/utils/__init__.py
+++ b/sdks/python/apache_beam/utils/__init__.py
@@ -16,7 +16,3 @@
 #
 
 """A package containing utilities."""
-
-# We must import path here to support the pattern of referencing utils.path
-# without needing to explicitly import apache_beam.utils.path.
-import path

http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/utils/path.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/path.py b/sdks/python/apache_beam/utils/path.py
deleted file mode 100644
index 86dc4db..0000000
--- a/sdks/python/apache_beam/utils/path.py
+++ /dev/null
@@ -1,46 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-"""Utilities for dealing with file paths."""
-
-import os
-
-
-def join(path, *paths):
-  """Joins given path pieces with the appropriate separator.
-
-  This function is useful for joining parts of a path that could at times refer
-  to either a GCS path or a local path.  In particular, this is useful for
-  ensuring Windows compatibility as on Windows, the GCS path separator is
-  different from the separator for local paths.
-
-  Use os.path.join instead if a path always refers to a local path.
-
-  Args:
-    path: First part of path to join.  If this part starts with 'gs:/', the GCS
-      separator will be used in joining this path.
-    *paths: Remaining part(s) of path to join.
-
-  Returns:
-    Pieces joined by the appropriate path separator.
-  """
-  if path.startswith('gs:/'):
-    # Note that we explicitly choose not to use posixpath.join() here, since
-    # that function has the undesirable behavior of having, for example,
-    # posixpath.join('gs://bucket/path', '/to/file') return '/to/file' instead
-    # of the slightly less surprising result 'gs://bucket/path//to/file'.
-    return '/'.join((path,) + paths)
-  return os.path.join(path, *paths)

http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/utils/path_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/path_test.py b/sdks/python/apache_beam/utils/path_test.py
deleted file mode 100644
index e59eca0..0000000
--- a/sdks/python/apache_beam/utils/path_test.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-"""Unit tests for the path module."""
-
-import unittest
-
-
-import mock
-
-from apache_beam.utils import path
-
-
-def _gen_fake_join(separator):
-  """Returns a callable that joins paths with the given separator."""
-
-  def _join(first_path, *paths):
-    return separator.join((first_path,) + paths)
-
-  return _join
-
-
-class Path(unittest.TestCase):
-
-  def setUp(self):
-    pass
-
-  @mock.patch('apache_beam.utils.path.os')
-  def test_gcs_path(self, *unused_mocks):
-    # Test joining of GCS paths when os.path.join uses Windows-style separator.
-    path.os.path.join.side_effect = _gen_fake_join('\\')
-    self.assertEqual('gs://bucket/path/to/file',
-                     path.join('gs://bucket/path', 'to', 'file'))
-    self.assertEqual('gs://bucket/path/to/file',
-                     path.join('gs://bucket/path', 'to/file'))
-    self.assertEqual('gs://bucket/path//to/file',
-                     path.join('gs://bucket/path', '/to/file'))
-
-  @mock.patch('apache_beam.utils.path.os')
-  def test_unix_path(self, *unused_mocks):
-    # Test joining of Unix paths.
-    path.os.path.join.side_effect = _gen_fake_join('/')
-    self.assertEqual('/tmp/path/to/file', path.join('/tmp/path', 'to', 'file'))
-    self.assertEqual('/tmp/path/to/file', path.join('/tmp/path', 'to/file'))
-
-  @mock.patch('apache_beam.utils.path.os')
-  def test_windows_path(self, *unused_mocks):
-    # Test joining of Windows paths.
-    path.os.path.join.side_effect = _gen_fake_join('\\')
-    self.assertEqual(r'C:\tmp\path\to\file',
-                     path.join(r'C:\tmp\path', 'to', 'file'))
-    self.assertEqual(r'C:\tmp\path\to\file',
-                     path.join(r'C:\tmp\path', r'to\file'))
-
-
-if __name__ == '__main__':
-  unittest.main()


[2/2] beam git commit: This closes #2643

Posted by al...@apache.org.
This closes #2643


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6701973
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6701973
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6701973

Branch: refs/heads/master
Commit: a67019739dc7f09a8336b9606c3726ad5d546f51
Parents: 1dce98f 9b7bbc2
Author: Ahmet Altay <al...@google.com>
Authored: Sun Apr 23 12:54:48 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Sun Apr 23 12:54:48 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py |  5 +-
 .../apache_beam/io/gcp/gcsfilesystem_test.py    | 10 ++-
 .../runners/dataflow/internal/apiclient.py      |  8 ++-
 .../runners/dataflow/internal/dependency.py     | 23 ++++---
 .../dataflow/internal/dependency_test.py        |  5 +-
 sdks/python/apache_beam/utils/__init__.py       |  4 --
 sdks/python/apache_beam/utils/path.py           | 46 -------------
 sdks/python/apache_beam/utils/path_test.py      | 70 --------------------
 8 files changed, 31 insertions(+), 140 deletions(-)
----------------------------------------------------------------------