You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/05/02 20:57:17 UTC

[1/2] beam git commit: [BEAM-539] Fixes several issues of FileSink.

Repository: beam
Updated Branches:
  refs/heads/master 0ce01b63f -> 87a12af6c


[BEAM-539] Fixes several issues of FileSink.

(1) Updates FileSink to fail for file name prefixes that only contain a single component (for example GCS buckets).

For example, currently FileSink fails for  'gs://aaa' while passing for 'gs://aaa/'. This change makes FileSink fail for both cases (and makes the behaviour consistent with Java).

(2) Updates the name of the temporary directory created by FileSink

Currently, for a filename prefix 'gs://aaa/bbb', the temp path would be of the form 'gs://aaa/bbb-temp-...'.
This is error prone since a user pattern 'gs://aaa/bbb*' would match temp files. This changes makes the temp path format 'gs://aaa/beam-temp-bbb-...' instead.

To achieve above this adds a method 'split()' to FileSystem interface
that is analogous to Python 'os.path.split()' and has the
opposite effect of current method FileSystem.join().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5ec48c58
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5ec48c58
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5ec48c58

Branch: refs/heads/master
Commit: 5ec48c58c0e32891224598db61ebb63e8731e9fb
Parents: 0ce01b6
Author: Chamikara Jayalath <ch...@google.com>
Authored: Fri Apr 28 14:38:35 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue May 2 13:56:00 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py            | 20 +++++--
 sdks/python/apache_beam/io/fileio_test.py       | 56 ++++++++++++++++++++
 sdks/python/apache_beam/io/filesystem.py        | 17 ++++++
 sdks/python/apache_beam/io/filesystems.py       | 18 +++++++
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 34 +++++++++++-
 .../apache_beam/io/gcp/gcsfilesystem_test.py    | 12 +++++
 sdks/python/apache_beam/io/localfilesystem.py   | 13 +++++
 .../apache_beam/io/localfilesystem_test.py      | 35 ++++++++++++
 8 files changed, 200 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index bb77bfe..49562f7 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -139,12 +139,26 @@ class FileSink(iobase.Sink):
   @check_accessible(['file_path_prefix', 'file_name_suffix'])
   def initialize_write(self):
     file_path_prefix = self.file_path_prefix.get()
-    file_name_suffix = self.file_name_suffix.get()
-    tmp_dir = file_path_prefix + file_name_suffix + time.strftime(
-        '-temp-%Y-%m-%d_%H-%M-%S')
+
+    tmp_dir = self._create_temp_dir(file_path_prefix)
     FileSystems.mkdirs(tmp_dir)
     return tmp_dir
 
+  def _create_temp_dir(self, file_path_prefix):
+    base_path, last_component = FileSystems.split(file_path_prefix)
+    if not last_component:
+      # Trying to re-split the base_path to check if it's a root.
+      new_base_path, _ = FileSystems.split(base_path)
+      if base_path == new_base_path:
+        raise ValueError('Cannot create a temporary directory for root path '
+                         'prefix %s. Please specify a file path prefix with '
+                         'at least two components.',
+                         file_path_prefix)
+    path_components = [base_path,
+                       'beam-temp-' + last_component + time.strftime(
+                           '-%Y-%m-%d_%H-%M-%S')]
+    return FileSystems.join(*path_components)
+
   @check_accessible(['file_path_prefix', 'file_name_suffix'])
   def open_writer(self, init_result, uid):
     # A proper suffix is needed for AUTO compression detection.

http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 2409873..13778d5 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -26,10 +26,12 @@ import tempfile
 import unittest
 
 import hamcrest as hc
+import mock
 
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import fileio
+from apache_beam.io.filesystem import BeamIOError
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
@@ -184,6 +186,60 @@ class TestFileSink(_TestCaseWithTempDirCleanUp):
     self.assertTrue('][a][' in concat, concat)
     self.assertTrue('][b][' in concat, concat)
 
+  # Not using 'test' in name so that 'nose' doesn't pick this as a test.
+  def run_temp_dir_check(self, no_dir_path, dir_path, no_dir_root_path,
+                         dir_root_path, prefix, separator):
+    def _get_temp_dir(file_path_prefix):
+      sink = MyFileSink(
+          file_path_prefix, file_name_suffix='.output',
+          coder=coders.ToStringCoder())
+      return sink.initialize_write()
+
+    temp_dir = _get_temp_dir(no_dir_path)
+    self.assertTrue(temp_dir.startswith(prefix))
+    last_sep = temp_dir.rfind(separator)
+    self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp'))
+
+    temp_dir = _get_temp_dir(dir_path)
+    self.assertTrue(temp_dir.startswith(prefix))
+    last_sep = temp_dir.rfind(separator)
+    self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp'))
+
+    with self.assertRaises(ValueError):
+      _get_temp_dir(no_dir_root_path)
+
+    with self.assertRaises(ValueError):
+      _get_temp_dir(dir_root_path)
+
+  def test_temp_dir_gcs(self):
+    try:
+      self.run_temp_dir_check(
+          'gs://aaa/bbb', 'gs://aaa/bbb/', 'gs://aaa', 'gs://aaa/', 'gs://',
+          '/')
+    except BeamIOError:
+      logging.debug('Ignoring test since GCP module is not installed')
+
+  @mock.patch('apache_beam.io.localfilesystem.os')
+  def test_temp_dir_local(self, filesystem_os_mock):
+    # Here we test a unix-like mock file-system
+    # (not really testing Unix or Windows since we mock the function of 'os'
+    # module).
+
+    def _fake_unix_split(path):
+      sep = path.rfind('/')
+      if sep < 0:
+        raise ValueError('Path must contain a separator')
+      return (path[:sep], path[sep + 1:])
+
+    def _fake_unix_join(base, path):
+      return base + '/' + path
+
+    filesystem_os_mock.path.abspath = lambda a: a
+    filesystem_os_mock.path.split.side_effect = _fake_unix_split
+    filesystem_os_mock.path.join.side_effect = _fake_unix_join
+    self.run_temp_dir_check(
+        '/aaa/bbb', '/aaa/bbb/', '/', '/', '/', '/')
+
   def test_file_sink_multi_shards(self):
     temp_path = os.path.join(self._new_tempdir(), 'multishard')
     sink = MyFileSink(

http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index db38858..ff8af03 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -438,6 +438,23 @@ class FileSystem(object):
     raise NotImplementedError
 
   @abc.abstractmethod
+  def split(self, path):
+    """Splits the given path into two parts.
+
+    Splits the path into a pair (head, tail) such that tail contains the last
+    component of the path and head contains everything up to that.
+
+    For file-systems other than the local file-system, head should include the
+    prefix.
+
+    Args:
+      path: path as a string
+    Returns:
+      a pair of path components as strings.
+    """
+    raise NotImplementedError
+
+  @abc.abstractmethod
   def mkdirs(self, path):
     """Recursively create directories for the provided path.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/filesystems.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py
index 07fc684..225a857 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -50,6 +50,24 @@ class FileSystems(object):
     return filesystem.join(basepath, *paths)
 
   @staticmethod
+  def split(path):
+    """Splits the given path into two parts.
+
+    Splits the path into a pair (head, tail) such that tail contains the last
+    component of the path and head contains everything up to that.
+
+    For file-systems other than the local file-system, head should include the
+    prefix.
+
+    Args:
+      path: path as a string
+    Returns:
+      a pair of path components as strings.
+    """
+    filesystem = FileSystems.get_filesystem(path)
+    return filesystem.split(path)
+
+  @staticmethod
   def mkdirs(path):
     """Recursively create directories for the provided path.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index fdc4757..ba00f50 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -32,6 +32,7 @@ class GCSFileSystem(FileSystem):
   """
 
   CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE  # Chuck size in batch operations
+  GCS_PREFIX = 'gs://'
 
   def join(self, basepath, *paths):
     """Join two or more pathname components for the filesystem
@@ -42,13 +43,42 @@ class GCSFileSystem(FileSystem):
 
     Returns: full path after combining all the passed components
     """
-    if not basepath.startswith('gs://'):
+    if not basepath.startswith(GCSFileSystem.GCS_PREFIX):
       raise ValueError('Basepath %r must be GCS path.', basepath)
     path = basepath
     for p in paths:
       path = path.rstrip('/') + '/' + p.lstrip('/')
     return path
 
+  def split(self, path):
+    """Splits the given path into two parts.
+
+    Splits the path into a pair (head, tail) such that tail contains the last
+    component of the path and head contains everything up to that.
+
+    Head will include the GCS prefix ('gs://').
+
+    Args:
+      path: path as a string
+    Returns:
+      a pair of path components as strings.
+    """
+    path = path.strip()
+    if not path.startswith(GCSFileSystem.GCS_PREFIX):
+      raise ValueError('Path %r must be GCS path.', path)
+
+    prefix_len = len(GCSFileSystem.GCS_PREFIX)
+    last_sep = path[prefix_len:].rfind('/')
+    if last_sep >= 0:
+      last_sep += prefix_len
+
+    if last_sep > 0:
+      return (path[:last_sep], path[last_sep + 1:])
+    elif last_sep < 0:
+      return (path, '')
+    else:
+      raise ValueError('Invalid path: %s', path)
+
   def mkdirs(self, path):
     """Recursively create directories for the provided path.
 
@@ -154,7 +184,7 @@ class GCSFileSystem(FileSystem):
     def _copy_path(source, destination):
       """Recursively copy the file tree from the source to the destination
       """
-      if not destination.startswith('gs://'):
+      if not destination.startswith(GCSFileSystem.GCS_PREFIX):
         raise ValueError('Destination %r must be GCS path.', destination)
       # Use copy_tree if the path ends with / as it is a directory
       if source.endswith('/'):

http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index 0669bf2..5fb9a62 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -53,6 +53,18 @@ class GCSFileSystemTest(unittest.TestCase):
     with self.assertRaises(ValueError):
       file_system.join('/bucket/path/', '/to/file')
 
+  def test_split(self):
+    file_system = gcsfilesystem.GCSFileSystem()
+    self.assertEqual(('gs://foo/bar', 'baz'),
+                     file_system.split('gs://foo/bar/baz'))
+    self.assertEqual(('gs://foo', ''),
+                     file_system.split('gs://foo/'))
+    self.assertEqual(('gs://foo', ''),
+                     file_system.split('gs://foo'))
+
+    with self.assertRaises(ValueError):
+      file_system.split('/no/gcs/prefix')
+
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
   def test_match_multiples(self, mock_gcsio):
     # Prepare mocks.

http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index 8b2bda9..bc7c576 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -45,6 +45,19 @@ class LocalFileSystem(FileSystem):
     """
     return os.path.join(basepath, *paths)
 
+  def split(self, path):
+    """Splits the given path into two parts.
+
+    Splits the path into a pair (head, tail) such that tail contains the last
+    component of the path and head contains everything up to that.
+
+    Args:
+      path: path as a string
+    Returns:
+      a pair of path components as strings.
+    """
+    return os.path.split(os.path.abspath(path))
+
   def mkdirs(self, path):
     """Recursively create directories for the provided path.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/localfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py
index df6eb61..986dcaf 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -39,6 +39,19 @@ def _gen_fake_join(separator):
   return _join
 
 
+def _gen_fake_split(separator):
+  """Returns a callable that splits a with the given separator."""
+
+  def _split(path):
+    sep_index = path.rfind(separator)
+    if sep_index >= 0:
+      return (path[:sep_index], path[sep_index + 1:])
+    else:
+      return (path, '')
+
+  return _split
+
+
 class LocalFileSystemTest(unittest.TestCase):
 
   def setUp(self):
@@ -66,6 +79,28 @@ class LocalFileSystemTest(unittest.TestCase):
     self.assertEqual(r'C:\tmp\path\to\file',
                      self.fs.join(r'C:\tmp\path', r'to\file'))
 
+  @mock.patch('apache_beam.io.localfilesystem.os')
+  def test_unix_path_split(self, os_mock):
+    os_mock.path.abspath.side_effect = lambda a: a
+    os_mock.path.split.side_effect = _gen_fake_split('/')
+    self.assertEqual(('/tmp/path/to', 'file'),
+                     self.fs.split('/tmp/path/to/file'))
+    # Actual os.path.split will split following to '/' and 'tmp' when run in
+    # Unix.
+    self.assertEqual(('', 'tmp'),
+                     self.fs.split('/tmp'))
+
+  @mock.patch('apache_beam.io.localfilesystem.os')
+  def test_windows_path_split(self, os_mock):
+    os_mock.path.abspath = lambda a: a
+    os_mock.path.split.side_effect = _gen_fake_split('\\')
+    self.assertEqual((r'C:\tmp\path\to', 'file'),
+                     self.fs.split(r'C:\tmp\path\to\file'))
+    # Actual os.path.split will split following to 'C:\' and 'tmp' when run in
+    # Windows.
+    self.assertEqual((r'C:', 'tmp'),
+                     self.fs.split(r'C:\tmp'))
+
   def test_mkdirs(self):
     path = os.path.join(self.tmpdir, 't1/t2')
     self.fs.mkdirs(path)


[2/2] beam git commit: This closes #2770

Posted by ch...@apache.org.
This closes #2770


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87a12af6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87a12af6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87a12af6

Branch: refs/heads/master
Commit: 87a12af6c948ea4678df91821d98902aacaa6b3b
Parents: 0ce01b6 5ec48c5
Author: chamikara@google.com <ch...@google.com>
Authored: Tue May 2 13:56:57 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue May 2 13:56:57 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py            | 20 +++++--
 sdks/python/apache_beam/io/fileio_test.py       | 56 ++++++++++++++++++++
 sdks/python/apache_beam/io/filesystem.py        | 17 ++++++
 sdks/python/apache_beam/io/filesystems.py       | 18 +++++++
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 34 +++++++++++-
 .../apache_beam/io/gcp/gcsfilesystem_test.py    | 12 +++++
 sdks/python/apache_beam/io/localfilesystem.py   | 13 +++++
 .../apache_beam/io/localfilesystem_test.py      | 35 ++++++++++++
 8 files changed, 200 insertions(+), 5 deletions(-)
----------------------------------------------------------------------