You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/04/25 23:22:48 UTC

[1/2] beam git commit: [BEAM-1988] Add FileSystems Interface for accessing underlying FS correctly

Repository: beam
Updated Branches:
  refs/heads/master 0d69611e2 -> b8c568f29


[BEAM-1988] Add FileSystems Interface for accessing underlying FS correctly


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ad6dcf4d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ad6dcf4d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ad6dcf4d

Branch: refs/heads/master
Commit: ad6dcf4d1d22b7e6e349db9027ef639a5410b494
Parents: 0d69611
Author: Sourabh Bajaj <so...@google.com>
Authored: Tue Apr 25 12:01:21 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Tue Apr 25 16:21:30 2017 -0700

----------------------------------------------------------------------
 sdks/python/.pylintrc                           |   1 +
 sdks/python/apache_beam/io/filebasedsource.py   |  20 +-
 sdks/python/apache_beam/io/fileio.py            |  25 +--
 sdks/python/apache_beam/io/filesystem.py        |   6 +-
 sdks/python/apache_beam/io/filesystems.py       | 186 +++++++++++++++
 sdks/python/apache_beam/io/filesystems_test.py  | 224 +++++++++++++++++++
 .../apache_beam/io/localfilesystem_test.py      |   4 +-
 .../runners/dataflow/internal/apiclient.py      |   7 +-
 .../runners/dataflow/internal/dependency.py     |  32 ++-
 .../dataflow/internal/dependency_test.py        |   7 +-
 .../apache_beam/tests/pipeline_verifiers.py     |   7 +-
 11 files changed, 452 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/.pylintrc
----------------------------------------------------------------------
diff --git a/sdks/python/.pylintrc b/sdks/python/.pylintrc
index 429ebdb..6418249 100644
--- a/sdks/python/.pylintrc
+++ b/sdks/python/.pylintrc
@@ -95,6 +95,7 @@ disable =
   import-self,
   invalid-name,
   invalid-unary-operand-type,
+  len-as-condition,
   locally-disabled,
   locally-enabled,
   misplaced-bare-raise,

http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index ef44b3e..e25f92e 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -30,7 +30,7 @@ from apache_beam.io import concat_source
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 from apache_beam.io.filesystem import CompressionTypes
-from apache_beam.io.filesystems_util import get_filesystem
+from apache_beam.io.filesystems import FileSystems
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils.value_provider import ValueProvider
 from apache_beam.utils.value_provider import StaticValueProvider
@@ -86,10 +86,6 @@ class FileBasedSource(iobase.BoundedSource):
     if isinstance(file_pattern, basestring):
       file_pattern = StaticValueProvider(str, file_pattern)
     self._pattern = file_pattern
-    if file_pattern.is_accessible():
-      self._file_system = get_filesystem(file_pattern.get())
-    else:
-      self._file_system = None
 
     self._concat_source = None
     self._min_bundle_size = min_bundle_size
@@ -118,9 +114,7 @@ class FileBasedSource(iobase.BoundedSource):
       pattern = self._pattern.get()
 
       single_file_sources = []
-      if self._file_system is None:
-        self._file_system = get_filesystem(pattern)
-      match_result = self._file_system.match([pattern])[0]
+      match_result = FileSystems.match([pattern])[0]
       files_metadata = match_result.metadata_list
 
       # We create a reference for FileBasedSource that will be serialized along
@@ -155,7 +149,7 @@ class FileBasedSource(iobase.BoundedSource):
     return self._concat_source
 
   def open_file(self, file_name):
-    return get_filesystem(file_name).open(
+    return FileSystems.open(
         file_name, 'application/octet-stream',
         compression_type=self._compression_type)
 
@@ -164,11 +158,9 @@ class FileBasedSource(iobase.BoundedSource):
     """Validate if there are actual files in the specified glob pattern
     """
     pattern = self._pattern.get()
-    if self._file_system is None:
-      self._file_system = get_filesystem(pattern)
 
     # Limit the responses as we only want to check if something exists
-    match_result = self._file_system.match([pattern], limits=[1])[0]
+    match_result = FileSystems.match([pattern], limits=[1])[0]
     if len(match_result.metadata_list) <= 0:
       raise IOError(
           'No files found based on the file pattern %s' % pattern)
@@ -183,9 +175,7 @@ class FileBasedSource(iobase.BoundedSource):
   @check_accessible(['_pattern'])
   def estimate_size(self):
     pattern = self._pattern.get()
-    if self._file_system is None:
-      self._file_system = get_filesystem(pattern)
-    match_result = self._file_system.match([pattern])[0]
+    match_result = FileSystems.match([pattern])[0]
     return sum([f.size_in_bytes for f in match_result.metadata_list])
 
   def read(self, range_tracker):

http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index f61289e..bb77bfe 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -28,7 +28,7 @@ from apache_beam.internal import util
 from apache_beam.io import iobase
 from apache_beam.io.filesystem import BeamIOError
 from apache_beam.io.filesystem import CompressionTypes
-from apache_beam.io.filesystems_util import get_filesystem
+from apache_beam.io.filesystems import FileSystems
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils.value_provider import ValueProvider
 from apache_beam.utils.value_provider import StaticValueProvider
@@ -91,10 +91,6 @@ class FileSink(iobase.Sink):
     self.shard_name_format = self._template_to_format(shard_name_template)
     self.compression_type = compression_type
     self.mime_type = mime_type
-    if file_path_prefix.is_accessible():
-      self._file_system = get_filesystem(file_path_prefix.get())
-    else:
-      self._file_system = None
 
   def display_data(self):
     return {'shards':
@@ -115,10 +111,7 @@ class FileSink(iobase.Sink):
     The returned file handle is passed to ``write_[encoded_]record`` and
     ``close``.
     """
-    if self._file_system is None:
-      self._file_system = get_filesystem(self.file_path_prefix.get())
-    return self._file_system.create(temp_path, self.mime_type,
-                                    self.compression_type)
+    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
 
   def write_record(self, file_handle, value):
     """Writes a single record go the file handle returned by ``open()``.
@@ -149,9 +142,7 @@ class FileSink(iobase.Sink):
     file_name_suffix = self.file_name_suffix.get()
     tmp_dir = file_path_prefix + file_name_suffix + time.strftime(
         '-temp-%Y-%m-%d_%H-%M-%S')
-    if self._file_system is None:
-      self._file_system = get_filesystem(file_path_prefix)
-    self._file_system.mkdirs(tmp_dir)
+    FileSystems.mkdirs(tmp_dir)
     return tmp_dir
 
   @check_accessible(['file_path_prefix', 'file_name_suffix'])
@@ -177,7 +168,7 @@ class FileSink(iobase.Sink):
 
     source_files = []
     destination_files = []
-    chunk_size = self._file_system.CHUNK_SIZE
+    chunk_size = FileSystems.get_chunk_size(file_path_prefix)
     for shard_num, shard in enumerate(writer_results):
       final_name = ''.join([
           file_path_prefix, self.shard_name_format % dict(
@@ -204,10 +195,8 @@ class FileSink(iobase.Sink):
       """_rename_batch executes batch rename operations."""
       source_files, destination_files = batch
       exceptions = []
-      if self._file_system is None:
-        self._file_system = get_filesystem(file_path_prefix)
       try:
-        self._file_system.rename(source_files, destination_files)
+        FileSystems.rename(source_files, destination_files)
         return exceptions
       except BeamIOError as exp:
         if exp.exception_details is None:
@@ -220,7 +209,7 @@ class FileSink(iobase.Sink):
             if isinstance(exception, IOError):
               # May have already been copied.
               try:
-                if self._file_system.exists(dest):
+                if FileSystems.exists(dest):
                   should_report = False
               except Exception as exists_e:  # pylint: disable=broad-except
                 logging.warning('Exception when checking if file %s exists: '
@@ -250,7 +239,7 @@ class FileSink(iobase.Sink):
                  time.time() - start_time)
 
     try:
-      self._file_system.delete([init_result])
+      FileSystems.delete([init_result])
     except IOError:
       # May have already been removed.
       pass

http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index 591d0b0..db38858 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -465,7 +465,8 @@ class FileSystem(object):
     raise NotImplementedError
 
   @abc.abstractmethod
-  def create(self, path, mime_type, compression_type):
+  def create(self, path, mime_type='application/octet-stream',
+             compression_type=CompressionTypes.AUTO):
     """Returns a write channel for the given file path.
 
     Args:
@@ -478,7 +479,8 @@ class FileSystem(object):
     raise NotImplementedError
 
   @abc.abstractmethod
-  def open(self, path, mime_type, compression_type):
+  def open(self, path, mime_type='application/octet-stream',
+           compression_type=CompressionTypes.AUTO):
     """Returns a read channel for the given file path.
 
     Args:

http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/io/filesystems.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py
new file mode 100644
index 0000000..07fc684
--- /dev/null
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -0,0 +1,186 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""FileSystems interface class for accessing the correct filesystem"""
+
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystems_util import get_filesystem
+
+
+class FileSystems(object):
+  """A class that defines the functions that can be performed on a filesystem.
+  All methods are static and access the underlying registered filesystems.
+  """
+
+  @staticmethod
+  def get_filesystem(path):
+    """Get the correct filesystem for the specified path
+    """
+    try:
+      return get_filesystem(path)
+    except Exception as e:
+      raise BeamIOError('Enable to get the Filesystem', {path: e})
+
+  @staticmethod
+  def join(basepath, *paths):
+    """Join two or more pathname components for the filesystem
+
+    Args:
+      basepath: string path of the first component of the path
+      paths: path components to be added
+
+    Returns: full path after combining all the passed components
+    """
+    filesystem = FileSystems.get_filesystem(basepath)
+    return filesystem.join(basepath, *paths)
+
+  @staticmethod
+  def mkdirs(path):
+    """Recursively create directories for the provided path.
+
+    Args:
+      path: string path of the directory structure that should be created
+
+    Raises:
+      IOError if leaf directory already exists.
+    """
+    filesystem = FileSystems.get_filesystem(path)
+    return filesystem.mkdirs(path)
+
+  @staticmethod
+  def match(patterns, limits=None):
+    """Find all matching paths to the patterns provided.
+
+    Args:
+      patterns: list of string for the file path pattern to match against
+      limits: list of maximum number of responses that need to be fetched
+
+    Returns: list of ``MatchResult`` objects.
+
+    Raises:
+      ``BeamIOError`` if any of the pattern match operations fail
+    """
+    if len(patterns) == 0:
+      return []
+    filesystem = FileSystems.get_filesystem(patterns[0])
+    return filesystem.match(patterns, limits)
+
+  @staticmethod
+  def create(path, mime_type='application/octet-stream',
+             compression_type=CompressionTypes.AUTO):
+    """Returns a write channel for the given file path.
+
+    Args:
+      path: string path of the file object to be written to the system
+      mime_type: MIME type to specify the type of content in the file object
+      compression_type: Type of compression to be used for this object. See
+        ``CompressionTypes`` for possible values.
+
+    Returns: file handle with a ``close`` function for the user to use.
+    """
+    filesystem = FileSystems.get_filesystem(path)
+    return filesystem.create(path, mime_type, compression_type)
+
+  @staticmethod
+  def open(path, mime_type='application/octet-stream',
+           compression_type=CompressionTypes.AUTO):
+    """Returns a read channel for the given file path.
+
+    Args:
+      path: string path of the file object to be written to the system
+      mime_type: MIME type to specify the type of content in the file object
+      compression_type: Type of compression to be used for this object. See
+        ``CompressionTypes`` for possible values.
+
+    Returns: file handle with a ``close`` function for the user to use.
+    """
+    filesystem = FileSystems.get_filesystem(path)
+    return filesystem.open(path, mime_type, compression_type)
+
+  @staticmethod
+  def copy(source_file_names, destination_file_names):
+    """Recursively copy the file list from the source to the destination
+
+    Args:
+      source_file_names: list of source file objects that needs to be copied
+      destination_file_names: list of destination of the new object
+
+    Raises:
+      ``BeamIOError`` if any of the copy operations fail
+    """
+    if len(source_file_names) == 0:
+      return
+    filesystem = FileSystems.get_filesystem(source_file_names[0])
+    return filesystem.copy(source_file_names, destination_file_names)
+
+  @staticmethod
+  def rename(source_file_names, destination_file_names):
+    """Rename the files at the source list to the destination list.
+    Source and destination lists should be of the same size.
+
+    Args:
+      source_file_names: List of file paths that need to be moved
+      destination_file_names: List of destination_file_names for the files
+
+    Raises:
+      ``BeamIOError`` if any of the rename operations fail
+    """
+    if len(source_file_names) == 0:
+      return
+    filesystem = FileSystems.get_filesystem(source_file_names[0])
+    return filesystem.rename(source_file_names, destination_file_names)
+
+  @staticmethod
+  def exists(path):
+    """Check if the provided path exists on the FileSystem.
+
+    Args:
+      path: string path that needs to be checked.
+
+    Returns: boolean flag indicating if path exists
+    """
+    filesystem = FileSystems.get_filesystem(path)
+    return filesystem.exists(path)
+
+  @staticmethod
+  def delete(paths):
+    """Deletes files or directories at the provided paths.
+    Directories will be deleted recursively.
+
+    Args:
+      paths: list of paths that give the file objects to be deleted
+
+    Raises:
+      ``BeamIOError`` if any of the delete operations fail
+    """
+    if len(paths) == 0:
+      return
+    filesystem = FileSystems.get_filesystem(paths[0])
+    return filesystem.delete(paths)
+
+  @staticmethod
+  def get_chunk_size(path):
+    """Get the correct chunk size for the FileSystem.
+
+    Args:
+      path: string path that needs to be checked.
+
+    Returns: integer size for parallelization in the FS operations.
+    """
+    filesystem = FileSystems.get_filesystem(path)
+    return filesystem.CHUNK_SIZE

http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/io/filesystems_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems_test.py b/sdks/python/apache_beam/io/filesystems_test.py
new file mode 100644
index 0000000..9165586
--- /dev/null
+++ b/sdks/python/apache_beam/io/filesystems_test.py
@@ -0,0 +1,224 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for LocalFileSystem."""
+
+import unittest
+
+import filecmp
+import os
+import shutil
+import tempfile
+import mock
+
+from apache_beam.io import localfilesystem
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystems import FileSystems
+
+
+def _gen_fake_join(separator):
+  """Returns a callable that joins paths with the given separator."""
+
+  def _join(first_path, *paths):
+    return separator.join((first_path.rstrip(separator),) + paths)
+
+  return _join
+
+
+class FileSystemsTest(unittest.TestCase):
+
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+  @mock.patch('apache_beam.io.localfilesystem.os')
+  def test_unix_path_join(self, *unused_mocks):
+    # Test joining of Unix paths.
+    localfilesystem.os.path.join.side_effect = _gen_fake_join('/')
+    self.assertEqual('/tmp/path/to/file',
+                     FileSystems.join('/tmp/path', 'to', 'file'))
+    self.assertEqual('/tmp/path/to/file',
+                     FileSystems.join('/tmp/path', 'to/file'))
+    self.assertEqual('/tmp/path/to/file',
+                     FileSystems.join('/', 'tmp/path', 'to/file'))
+    self.assertEqual('/tmp/path/to/file',
+                     FileSystems.join('/tmp/', 'path', 'to/file'))
+
+  @mock.patch('apache_beam.io.localfilesystem.os')
+  def test_windows_path_join(self, *unused_mocks):
+    # Test joining of Windows paths.
+    localfilesystem.os.path.join.side_effect = _gen_fake_join('\\')
+    self.assertEqual(r'C:\tmp\path\to\file',
+                     FileSystems.join(r'C:\tmp\path', 'to', 'file'))
+    self.assertEqual(r'C:\tmp\path\to\file',
+                     FileSystems.join(r'C:\tmp\path', r'to\file'))
+    self.assertEqual(r'C:\tmp\path\to\file',
+                     FileSystems.join(r'C:\tmp\path\\', 'to', 'file'))
+
+  def test_mkdirs(self):
+    path = os.path.join(self.tmpdir, 't1/t2')
+    FileSystems.mkdirs(path)
+    self.assertTrue(os.path.isdir(path))
+
+  def test_mkdirs_failed(self):
+    path = os.path.join(self.tmpdir, 't1/t2')
+    FileSystems.mkdirs(path)
+
+    # Check IOError if existing directory is created
+    with self.assertRaises(IOError):
+      FileSystems.mkdirs(path)
+
+    with self.assertRaises(IOError):
+      FileSystems.mkdirs(os.path.join(self.tmpdir, 't1'))
+
+  def test_match_file(self):
+    path = os.path.join(self.tmpdir, 'f1')
+    open(path, 'a').close()
+
+    # Match files in the temp directory
+    result = FileSystems.match([path])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [path])
+
+  def test_match_file_empty(self):
+    path = os.path.join(self.tmpdir, 'f2')  # Does not exist
+
+    # Match files in the temp directory
+    result = FileSystems.match([path])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [])
+
+  def test_match_file_exception(self):
+    # Match files with None so that it throws an exception
+    with self.assertRaises(BeamIOError) as error:
+      FileSystems.match([None])
+    self.assertTrue(
+        error.exception.message.startswith('Enable to get the Filesystem'))
+    self.assertEqual(error.exception.exception_details.keys(), [None])
+
+  def test_match_directory(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    open(path1, 'a').close()
+    open(path2, 'a').close()
+
+    # Match both the files in the directory
+    path = os.path.join(self.tmpdir, '*')
+    result = FileSystems.match([path])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [path1, path2])
+
+  def test_match_directory(self):
+    result = FileSystems.match([self.tmpdir])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [self.tmpdir])
+
+  def test_copy(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    FileSystems.copy([path1], [path2])
+    self.assertTrue(filecmp.cmp(path1, path2))
+
+  def test_copy_error(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with self.assertRaises(BeamIOError) as error:
+      FileSystems.copy([path1], [path2])
+    self.assertTrue(
+        error.exception.message.startswith('Copy operation failed'))
+    self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)])
+
+  def test_copy_directory(self):
+    path_t1 = os.path.join(self.tmpdir, 't1')
+    path_t2 = os.path.join(self.tmpdir, 't2')
+    FileSystems.mkdirs(path_t1)
+    FileSystems.mkdirs(path_t2)
+
+    path1 = os.path.join(path_t1, 'f1')
+    path2 = os.path.join(path_t2, 'f1')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    FileSystems.copy([path_t1], [path_t2])
+    self.assertTrue(filecmp.cmp(path1, path2))
+
+  def test_rename(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    FileSystems.rename([path1], [path2])
+    self.assertTrue(FileSystems.exists(path2))
+    self.assertFalse(FileSystems.exists(path1))
+
+  def test_rename_error(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with self.assertRaises(BeamIOError) as error:
+      FileSystems.rename([path1], [path2])
+    self.assertTrue(
+        error.exception.message.startswith('Rename operation failed'))
+    self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)])
+
+  def test_rename_directory(self):
+    path_t1 = os.path.join(self.tmpdir, 't1')
+    path_t2 = os.path.join(self.tmpdir, 't2')
+    FileSystems.mkdirs(path_t1)
+
+    path1 = os.path.join(path_t1, 'f1')
+    path2 = os.path.join(path_t2, 'f1')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    FileSystems.rename([path_t1], [path_t2])
+    self.assertTrue(FileSystems.exists(path_t2))
+    self.assertFalse(FileSystems.exists(path_t1))
+    self.assertTrue(FileSystems.exists(path2))
+    self.assertFalse(FileSystems.exists(path1))
+
+  def test_exists(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+    self.assertTrue(FileSystems.exists(path1))
+    self.assertFalse(FileSystems.exists(path2))
+
+  def test_delete(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    self.assertTrue(FileSystems.exists(path1))
+    FileSystems.delete([path1])
+    self.assertFalse(FileSystems.exists(path1))
+
+  def test_delete_error(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    with self.assertRaises(BeamIOError) as error:
+      FileSystems.delete([path1])
+    self.assertTrue(
+        error.exception.message.startswith('Delete operation failed'))
+    self.assertEqual(error.exception.exception_details.keys(), [path1])

http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/io/localfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py
index 3fe308d..df6eb61 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -26,15 +26,15 @@ import shutil
 import tempfile
 import mock
 
-from apache_beam.io.filesystem import BeamIOError
 from apache_beam.io import localfilesystem
+from apache_beam.io.filesystem import BeamIOError
 
 
 def _gen_fake_join(separator):
   """Returns a callable that joins paths with the given separator."""
 
   def _join(first_path, *paths):
-    return separator.join((first_path,) + paths)
+    return separator.join((first_path.rstrip(separator),) + paths)
 
   return _join
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index d95b33f..0270cbe 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -32,7 +32,7 @@ from apitools.base.py import exceptions
 
 from apache_beam.internal.gcp.auth import get_service_credentials
 from apache_beam.internal.gcp.json_value import to_json_value
-from apache_beam.io.filesystems_util import get_filesystem
+from apache_beam.io.filesystems import FileSystems
 from apache_beam.io.gcp.internal.clients import storage
 from apache_beam.runners.dataflow.internal import dependency
 from apache_beam.runners.dataflow.internal.clients import dataflow
@@ -336,10 +336,9 @@ class Job(object):
     # for GCS staging locations where the potential for such clashes is high.
     if self.google_cloud_options.staging_location.startswith('gs://'):
       path_suffix = '%s.%f' % (self.google_cloud_options.job_name, time.time())
-      filesystem = get_filesystem(self.google_cloud_options.staging_location)
-      self.google_cloud_options.staging_location = filesystem.join(
+      self.google_cloud_options.staging_location = FileSystems.join(
           self.google_cloud_options.staging_location, path_suffix)
-      self.google_cloud_options.temp_location = filesystem.join(
+      self.google_cloud_options.temp_location = FileSystems.join(
           self.google_cloud_options.temp_location, path_suffix)
 
     self.proto = dataflow.Job(name=self.google_cloud_options.job_name)

http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index bb490f3..f64f9b2 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -63,7 +63,7 @@ import tempfile
 
 from apache_beam import version as beam_version
 from apache_beam.internal import pickler
-from apache_beam.io.filesystems_util import get_filesystem
+from apache_beam.io.filesystems import FileSystems
 from apache_beam.runners.dataflow.internal import names
 from apache_beam.utils import processes
 from apache_beam.utils.pipeline_options import GoogleCloudOptions
@@ -157,7 +157,6 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir,
       name patterns.
   """
   resources = []
-  staging_filesystem = get_filesystem(staging_location)
   staging_temp_dir = None
   local_packages = []
   for package in extra_packages:
@@ -190,14 +189,13 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir,
       local_packages.append(package)
 
   if staging_temp_dir:
-    temp_fs = get_filesystem(staging_temp_dir)
     local_packages.extend(
-        [temp_fs.join(staging_temp_dir, f) for f in os.listdir(
+        [FileSystems.join(staging_temp_dir, f) for f in os.listdir(
             staging_temp_dir)])
 
   for package in local_packages:
     basename = os.path.basename(package)
-    staged_path = staging_filesystem.join(staging_location, basename)
+    staged_path = FileSystems.join(staging_location, basename)
     file_copy(package, staged_path)
     resources.append(basename)
   # Create a file containing the list of extra packages and stage it.
@@ -210,7 +208,7 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir,
   with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
     for package in local_packages:
       f.write('%s\n' % os.path.basename(package))
-  staged_path = staging_filesystem.join(staging_location, EXTRA_PACKAGES_FILE)
+  staged_path = FileSystems.join(staging_location, EXTRA_PACKAGES_FILE)
   # Note that the caller of this function is responsible for deleting the
   # temporary folder where all temp files are created, including this one.
   file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
@@ -285,16 +283,14 @@ def stage_job_resources(
     raise RuntimeError(
         'The --temp_location option must be specified.')
 
-  filesystem = get_filesystem(google_cloud_options.staging_location)
-
   # Stage a requirements file if present.
   if setup_options.requirements_file is not None:
     if not os.path.isfile(setup_options.requirements_file):
       raise RuntimeError('The file %s cannot be found. It was specified in the '
                          '--requirements_file command line option.' %
                          setup_options.requirements_file)
-    staged_path = filesystem.join(google_cloud_options.staging_location,
-                                  REQUIREMENTS_FILE)
+    staged_path = FileSystems.join(google_cloud_options.staging_location,
+                                   REQUIREMENTS_FILE)
     file_copy(setup_options.requirements_file, staged_path)
     resources.append(REQUIREMENTS_FILE)
     requirements_cache_path = (
@@ -308,8 +304,8 @@ def stage_job_resources(
     populate_requirements_cache(
         setup_options.requirements_file, requirements_cache_path)
     for pkg in  glob.glob(os.path.join(requirements_cache_path, '*')):
-      file_copy(pkg, filesystem.join(google_cloud_options.staging_location,
-                                     os.path.basename(pkg)))
+      file_copy(pkg, FileSystems.join(google_cloud_options.staging_location,
+                                      os.path.basename(pkg)))
       resources.append(os.path.basename(pkg))
 
   # Handle a setup file if present.
@@ -327,8 +323,8 @@ def stage_job_resources(
           'setup.py instead of %s' % setup_options.setup_file)
     tarball_file = _build_setup_package(setup_options.setup_file, temp_dir,
                                         build_setup_args)
-    staged_path = filesystem.join(google_cloud_options.staging_location,
-                                  WORKFLOW_TARBALL_FILE)
+    staged_path = FileSystems.join(google_cloud_options.staging_location,
+                                   WORKFLOW_TARBALL_FILE)
     file_copy(tarball_file, staged_path)
     resources.append(WORKFLOW_TARBALL_FILE)
 
@@ -347,8 +343,8 @@ def stage_job_resources(
     pickled_session_file = os.path.join(temp_dir,
                                         names.PICKLED_MAIN_SESSION_FILE)
     pickler.dump_session(pickled_session_file)
-    staged_path = filesystem.join(google_cloud_options.staging_location,
-                                  names.PICKLED_MAIN_SESSION_FILE)
+    staged_path = FileSystems.join(google_cloud_options.staging_location,
+                                   names.PICKLED_MAIN_SESSION_FILE)
     file_copy(pickled_session_file, staged_path)
     resources.append(names.PICKLED_MAIN_SESSION_FILE)
 
@@ -362,8 +358,8 @@ def stage_job_resources(
     else:
       stage_tarball_from_remote_location = False
 
-    staged_path = filesystem.join(google_cloud_options.staging_location,
-                                  names.DATAFLOW_SDK_TARBALL_FILE)
+    staged_path = FileSystems.join(google_cloud_options.staging_location,
+                                   names.DATAFLOW_SDK_TARBALL_FILE)
     if stage_tarball_from_remote_location:
       # If --sdk_location is not specified then the appropriate package
       # will be obtained from PyPI (https://pypi.python.org) based on the

http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
index 24f65d0..1ff087b 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
@@ -23,7 +23,7 @@ import shutil
 import tempfile
 import unittest
 
-from apache_beam.io.filesystems_util import get_filesystem
+from apache_beam.io.filesystems import FileSystems
 from apache_beam.runners.dataflow.internal import dependency
 from apache_beam.runners.dataflow.internal import names
 from apache_beam.utils.pipeline_options import GoogleCloudOptions
@@ -241,9 +241,8 @@ class SetupTest(unittest.TestCase):
     def file_copy(from_path, to_path):
       if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE):
         self.assertEqual(expected_from_path, from_path)
-        filesystem = get_filesystem(expected_to_dir)
-        self.assertEqual(filesystem.join(expected_to_dir,
-                                         names.DATAFLOW_SDK_TARBALL_FILE),
+        self.assertEqual(FileSystems.join(expected_to_dir,
+                                          names.DATAFLOW_SDK_TARBALL_FILE),
                          to_path)
       if from_path.startswith('gs://') or to_path.startswith('gs://'):
         logging.info('Faking file_copy(%s, %s)', from_path, to_path)

http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 51302b0..df05054 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -27,7 +27,7 @@ import time
 
 from hamcrest.core.base_matcher import BaseMatcher
 
-from apache_beam.io.filesystems_util import get_filesystem
+from apache_beam.io.filesystems import FileSystems
 from apache_beam.runners.runner import PipelineState
 from apache_beam.tests import test_utils as utils
 from apache_beam.utils import retry
@@ -99,7 +99,6 @@ class FileChecksumMatcher(BaseMatcher):
       self.sleep_secs = None
 
     self.file_path = file_path
-    self.file_system = get_filesystem(self.file_path)
     self.expected_checksum = expected_checksum
 
   @retry.with_exponential_backoff(
@@ -108,7 +107,7 @@ class FileChecksumMatcher(BaseMatcher):
   def _read_with_retry(self):
     """Read path with retry if I/O failed"""
     read_lines = []
-    match_result = self.file_system.match([self.file_path])[0]
+    match_result = FileSystems.match([self.file_path])[0]
     matched_path = [f.path for f in match_result.metadata_list]
     if not matched_path:
       raise IOError('No such file or directory: %s' % self.file_path)
@@ -116,7 +115,7 @@ class FileChecksumMatcher(BaseMatcher):
     logging.info('Find %d files in %s: \n%s',
                  len(matched_path), self.file_path, '\n'.join(matched_path))
     for path in matched_path:
-      with self.file_system.open(path, 'r') as f:
+      with FileSystems.open(path, 'r') as f:
         for line in f:
           read_lines.append(line)
     return read_lines


[2/2] beam git commit: This closes #2665

Posted by ch...@apache.org.
This closes #2665


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b8c568f2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b8c568f2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b8c568f2

Branch: refs/heads/master
Commit: b8c568f2950d333f44b72b57c24f59464a0c9836
Parents: 0d69611 ad6dcf4
Author: Chamikara Jayalath <ch...@google.com>
Authored: Tue Apr 25 16:22:26 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Tue Apr 25 16:22:26 2017 -0700

----------------------------------------------------------------------
 sdks/python/.pylintrc                           |   1 +
 sdks/python/apache_beam/io/filebasedsource.py   |  20 +-
 sdks/python/apache_beam/io/fileio.py            |  25 +--
 sdks/python/apache_beam/io/filesystem.py        |   6 +-
 sdks/python/apache_beam/io/filesystems.py       | 186 +++++++++++++++
 sdks/python/apache_beam/io/filesystems_test.py  | 224 +++++++++++++++++++
 .../apache_beam/io/localfilesystem_test.py      |   4 +-
 .../runners/dataflow/internal/apiclient.py      |   7 +-
 .../runners/dataflow/internal/dependency.py     |  32 ++-
 .../dataflow/internal/dependency_test.py        |   7 +-
 .../apache_beam/tests/pipeline_verifiers.py     |   7 +-
 11 files changed, 452 insertions(+), 67 deletions(-)
----------------------------------------------------------------------