You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/05/02 20:57:17 UTC
[1/2] beam git commit: [BEAM-539] Fixes several issues of FileSink.
Repository: beam
Updated Branches:
refs/heads/master 0ce01b63f -> 87a12af6c
[BEAM-539] Fixes several issues of FileSink.
(1) Updates FileSink to fail for file name prefixes that only contain a single component (for example GCS buckets).
For example, currently FileSink fails for 'gs://aaa' while passing for 'gs://aaa/'. This change makes FileSink fail for both cases (and makes the behaviour consistent with Java).
(2) Updates the name of the temporary directory created by FileSink
Currently, for a filename prefix 'gs://aaa/bbb', the temp path would be of the form 'gs://aaa/bbb-temp-...'.
This is error prone since a user pattern 'gs://aaa/bbb*' would match temp files. This changes makes the temp path format 'gs://aaa/beam-temp-bbb-...' instead.
To achieve above this adds a method 'split()' to FileSystem interface
that is analogous to Python 'os.path.split()' and has the
opposite effect of current method FileSystem.join().
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5ec48c58
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5ec48c58
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5ec48c58
Branch: refs/heads/master
Commit: 5ec48c58c0e32891224598db61ebb63e8731e9fb
Parents: 0ce01b6
Author: Chamikara Jayalath <ch...@google.com>
Authored: Fri Apr 28 14:38:35 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue May 2 13:56:00 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 20 +++++--
sdks/python/apache_beam/io/fileio_test.py | 56 ++++++++++++++++++++
sdks/python/apache_beam/io/filesystem.py | 17 ++++++
sdks/python/apache_beam/io/filesystems.py | 18 +++++++
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 34 +++++++++++-
.../apache_beam/io/gcp/gcsfilesystem_test.py | 12 +++++
sdks/python/apache_beam/io/localfilesystem.py | 13 +++++
.../apache_beam/io/localfilesystem_test.py | 35 ++++++++++++
8 files changed, 200 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index bb77bfe..49562f7 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -139,12 +139,26 @@ class FileSink(iobase.Sink):
@check_accessible(['file_path_prefix', 'file_name_suffix'])
def initialize_write(self):
file_path_prefix = self.file_path_prefix.get()
- file_name_suffix = self.file_name_suffix.get()
- tmp_dir = file_path_prefix + file_name_suffix + time.strftime(
- '-temp-%Y-%m-%d_%H-%M-%S')
+
+ tmp_dir = self._create_temp_dir(file_path_prefix)
FileSystems.mkdirs(tmp_dir)
return tmp_dir
+ def _create_temp_dir(self, file_path_prefix):
+ base_path, last_component = FileSystems.split(file_path_prefix)
+ if not last_component:
+ # Trying to re-split the base_path to check if it's a root.
+ new_base_path, _ = FileSystems.split(base_path)
+ if base_path == new_base_path:
+ raise ValueError('Cannot create a temporary directory for root path '
+ 'prefix %s. Please specify a file path prefix with '
+ 'at least two components.',
+ file_path_prefix)
+ path_components = [base_path,
+ 'beam-temp-' + last_component + time.strftime(
+ '-%Y-%m-%d_%H-%M-%S')]
+ return FileSystems.join(*path_components)
+
@check_accessible(['file_path_prefix', 'file_name_suffix'])
def open_writer(self, init_result, uid):
# A proper suffix is needed for AUTO compression detection.
http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 2409873..13778d5 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -26,10 +26,12 @@ import tempfile
import unittest
import hamcrest as hc
+import mock
import apache_beam as beam
from apache_beam import coders
from apache_beam.io import fileio
+from apache_beam.io.filesystem import BeamIOError
from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
@@ -184,6 +186,60 @@ class TestFileSink(_TestCaseWithTempDirCleanUp):
self.assertTrue('][a][' in concat, concat)
self.assertTrue('][b][' in concat, concat)
+ # Not using 'test' in name so that 'nose' doesn't pick this as a test.
+ def run_temp_dir_check(self, no_dir_path, dir_path, no_dir_root_path,
+ dir_root_path, prefix, separator):
+ def _get_temp_dir(file_path_prefix):
+ sink = MyFileSink(
+ file_path_prefix, file_name_suffix='.output',
+ coder=coders.ToStringCoder())
+ return sink.initialize_write()
+
+ temp_dir = _get_temp_dir(no_dir_path)
+ self.assertTrue(temp_dir.startswith(prefix))
+ last_sep = temp_dir.rfind(separator)
+ self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp'))
+
+ temp_dir = _get_temp_dir(dir_path)
+ self.assertTrue(temp_dir.startswith(prefix))
+ last_sep = temp_dir.rfind(separator)
+ self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp'))
+
+ with self.assertRaises(ValueError):
+ _get_temp_dir(no_dir_root_path)
+
+ with self.assertRaises(ValueError):
+ _get_temp_dir(dir_root_path)
+
+ def test_temp_dir_gcs(self):
+ try:
+ self.run_temp_dir_check(
+ 'gs://aaa/bbb', 'gs://aaa/bbb/', 'gs://aaa', 'gs://aaa/', 'gs://',
+ '/')
+ except BeamIOError:
+ logging.debug('Ignoring test since GCP module is not installed')
+
+ @mock.patch('apache_beam.io.localfilesystem.os')
+ def test_temp_dir_local(self, filesystem_os_mock):
+ # Here we test a unix-like mock file-system
+ # (not really testing Unix or Windows since we mock the function of 'os'
+ # module).
+
+ def _fake_unix_split(path):
+ sep = path.rfind('/')
+ if sep < 0:
+ raise ValueError('Path must contain a separator')
+ return (path[:sep], path[sep + 1:])
+
+ def _fake_unix_join(base, path):
+ return base + '/' + path
+
+ filesystem_os_mock.path.abspath = lambda a: a
+ filesystem_os_mock.path.split.side_effect = _fake_unix_split
+ filesystem_os_mock.path.join.side_effect = _fake_unix_join
+ self.run_temp_dir_check(
+ '/aaa/bbb', '/aaa/bbb/', '/', '/', '/', '/')
+
def test_file_sink_multi_shards(self):
temp_path = os.path.join(self._new_tempdir(), 'multishard')
sink = MyFileSink(
http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index db38858..ff8af03 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -438,6 +438,23 @@ class FileSystem(object):
raise NotImplementedError
@abc.abstractmethod
+ def split(self, path):
+ """Splits the given path into two parts.
+
+ Splits the path into a pair (head, tail) such that tail contains the last
+ component of the path and head contains everything up to that.
+
+ For file-systems other than the local file-system, head should include the
+ prefix.
+
+ Args:
+ path: path as a string
+ Returns:
+ a pair of path components as strings.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
def mkdirs(self, path):
"""Recursively create directories for the provided path.
http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/filesystems.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py
index 07fc684..225a857 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -50,6 +50,24 @@ class FileSystems(object):
return filesystem.join(basepath, *paths)
@staticmethod
+ def split(path):
+ """Splits the given path into two parts.
+
+ Splits the path into a pair (head, tail) such that tail contains the last
+ component of the path and head contains everything up to that.
+
+ For file-systems other than the local file-system, head should include the
+ prefix.
+
+ Args:
+ path: path as a string
+ Returns:
+ a pair of path components as strings.
+ """
+ filesystem = FileSystems.get_filesystem(path)
+ return filesystem.split(path)
+
+ @staticmethod
def mkdirs(path):
"""Recursively create directories for the provided path.
http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index fdc4757..ba00f50 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -32,6 +32,7 @@ class GCSFileSystem(FileSystem):
"""
CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations
+ GCS_PREFIX = 'gs://'
def join(self, basepath, *paths):
"""Join two or more pathname components for the filesystem
@@ -42,13 +43,42 @@ class GCSFileSystem(FileSystem):
Returns: full path after combining all the passed components
"""
- if not basepath.startswith('gs://'):
+ if not basepath.startswith(GCSFileSystem.GCS_PREFIX):
raise ValueError('Basepath %r must be GCS path.', basepath)
path = basepath
for p in paths:
path = path.rstrip('/') + '/' + p.lstrip('/')
return path
+ def split(self, path):
+ """Splits the given path into two parts.
+
+ Splits the path into a pair (head, tail) such that tail contains the last
+ component of the path and head contains everything up to that.
+
+ Head will include the GCS prefix ('gs://').
+
+ Args:
+ path: path as a string
+ Returns:
+ a pair of path components as strings.
+ """
+ path = path.strip()
+ if not path.startswith(GCSFileSystem.GCS_PREFIX):
+ raise ValueError('Path %r must be GCS path.', path)
+
+ prefix_len = len(GCSFileSystem.GCS_PREFIX)
+ last_sep = path[prefix_len:].rfind('/')
+ if last_sep >= 0:
+ last_sep += prefix_len
+
+ if last_sep > 0:
+ return (path[:last_sep], path[last_sep + 1:])
+ elif last_sep < 0:
+ return (path, '')
+ else:
+ raise ValueError('Invalid path: %s', path)
+
def mkdirs(self, path):
"""Recursively create directories for the provided path.
@@ -154,7 +184,7 @@ class GCSFileSystem(FileSystem):
def _copy_path(source, destination):
"""Recursively copy the file tree from the source to the destination
"""
- if not destination.startswith('gs://'):
+ if not destination.startswith(GCSFileSystem.GCS_PREFIX):
raise ValueError('Destination %r must be GCS path.', destination)
# Use copy_tree if the path ends with / as it is a directory
if source.endswith('/'):
http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index 0669bf2..5fb9a62 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -53,6 +53,18 @@ class GCSFileSystemTest(unittest.TestCase):
with self.assertRaises(ValueError):
file_system.join('/bucket/path/', '/to/file')
+ def test_split(self):
+ file_system = gcsfilesystem.GCSFileSystem()
+ self.assertEqual(('gs://foo/bar', 'baz'),
+ file_system.split('gs://foo/bar/baz'))
+ self.assertEqual(('gs://foo', ''),
+ file_system.split('gs://foo/'))
+ self.assertEqual(('gs://foo', ''),
+ file_system.split('gs://foo'))
+
+ with self.assertRaises(ValueError):
+ file_system.split('/no/gcs/prefix')
+
@mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
def test_match_multiples(self, mock_gcsio):
# Prepare mocks.
http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index 8b2bda9..bc7c576 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -45,6 +45,19 @@ class LocalFileSystem(FileSystem):
"""
return os.path.join(basepath, *paths)
+ def split(self, path):
+ """Splits the given path into two parts.
+
+ Splits the path into a pair (head, tail) such that tail contains the last
+ component of the path and head contains everything up to that.
+
+ Args:
+ path: path as a string
+ Returns:
+ a pair of path components as strings.
+ """
+ return os.path.split(os.path.abspath(path))
+
def mkdirs(self, path):
"""Recursively create directories for the provided path.
http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/localfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py
index df6eb61..986dcaf 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -39,6 +39,19 @@ def _gen_fake_join(separator):
return _join
+def _gen_fake_split(separator):
+ """Returns a callable that splits a with the given separator."""
+
+ def _split(path):
+ sep_index = path.rfind(separator)
+ if sep_index >= 0:
+ return (path[:sep_index], path[sep_index + 1:])
+ else:
+ return (path, '')
+
+ return _split
+
+
class LocalFileSystemTest(unittest.TestCase):
def setUp(self):
@@ -66,6 +79,28 @@ class LocalFileSystemTest(unittest.TestCase):
self.assertEqual(r'C:\tmp\path\to\file',
self.fs.join(r'C:\tmp\path', r'to\file'))
+ @mock.patch('apache_beam.io.localfilesystem.os')
+ def test_unix_path_split(self, os_mock):
+ os_mock.path.abspath.side_effect = lambda a: a
+ os_mock.path.split.side_effect = _gen_fake_split('/')
+ self.assertEqual(('/tmp/path/to', 'file'),
+ self.fs.split('/tmp/path/to/file'))
+ # Actual os.path.split will split following to '/' and 'tmp' when run in
+ # Unix.
+ self.assertEqual(('', 'tmp'),
+ self.fs.split('/tmp'))
+
+ @mock.patch('apache_beam.io.localfilesystem.os')
+ def test_windows_path_split(self, os_mock):
+ os_mock.path.abspath = lambda a: a
+ os_mock.path.split.side_effect = _gen_fake_split('\\')
+ self.assertEqual((r'C:\tmp\path\to', 'file'),
+ self.fs.split(r'C:\tmp\path\to\file'))
+ # Actual os.path.split will split following to 'C:\' and 'tmp' when run in
+ # Windows.
+ self.assertEqual((r'C:', 'tmp'),
+ self.fs.split(r'C:\tmp'))
+
def test_mkdirs(self):
path = os.path.join(self.tmpdir, 't1/t2')
self.fs.mkdirs(path)
[2/2] beam git commit: This closes #2770
Posted by ch...@apache.org.
This closes #2770
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87a12af6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87a12af6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87a12af6
Branch: refs/heads/master
Commit: 87a12af6c948ea4678df91821d98902aacaa6b3b
Parents: 0ce01b6 5ec48c5
Author: chamikara@google.com <ch...@google.com>
Authored: Tue May 2 13:56:57 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue May 2 13:56:57 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 20 +++++--
sdks/python/apache_beam/io/fileio_test.py | 56 ++++++++++++++++++++
sdks/python/apache_beam/io/filesystem.py | 17 ++++++
sdks/python/apache_beam/io/filesystems.py | 18 +++++++
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 34 +++++++++++-
.../apache_beam/io/gcp/gcsfilesystem_test.py | 12 +++++
sdks/python/apache_beam/io/localfilesystem.py | 13 +++++
.../apache_beam/io/localfilesystem_test.py | 35 ++++++++++++
8 files changed, 200 insertions(+), 5 deletions(-)
----------------------------------------------------------------------