You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/11/21 19:35:31 UTC
[3/4] incubator-beam git commit: A few improvements to Apache Beam
Python's FileIO.
A few improvements to Apache Beam Python's FileIO.
- Ensuring that AUTO compression works properly for FileSinks.
- Introducing __enter__ and __exit__ in _CompressedFile to allow use
of "with", and updating textio accordingly.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e85f67a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e85f67a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e85f67a1
Branch: refs/heads/python-sdk
Commit: e85f67a1a467a26259a849bd20c42e89f165828e
Parents: c1440f7
Author: Gus Katsiapis <ka...@katsiapis-linux.mtv.corp.google.com>
Authored: Fri Nov 18 18:31:20 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Nov 21 11:29:07 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 10 +++++-
sdks/python/apache_beam/io/fileio_test.py | 48 +++++++++++++++++++++++---
sdks/python/apache_beam/io/textio.py | 6 +---
sdks/python/apache_beam/io/textio_test.py | 26 ++++++++++++++
4 files changed, 80 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 4d0eea6..1dcd622 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -749,6 +749,12 @@ class _CompressedFile(object):
def seekable(self):
return False
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exception_type, exception_value, traceback):
+ self.close()
+
class FileSink(iobase.Sink):
"""A sink to a GCS or local files.
@@ -855,7 +861,9 @@ class FileSink(iobase.Sink):
return tmp_dir
def open_writer(self, init_result, uid):
- return FileSinkWriter(self, os.path.join(init_result, uid))
+ # A proper suffix is needed for AUTO compression detection.
+ suffix = os.path.basename(self.file_path_prefix) + self.file_name_suffix
+ return FileSinkWriter(self, os.path.join(init_result, uid) + suffix)
def finalize_write(self, init_result, writer_results):
writer_results = sorted(writer_results)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 9d1e424..098ace1 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -38,10 +38,7 @@ from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
# TODO: Add tests for file patterns (ie not just individual files) for both
-# uncompressed
-
-# TODO: Update code to not use NamedTemporaryFile (or to use it in a way that
-# doesn't violate its assumptions).
+# compressed and uncompressed files.
class TestTextFileSource(unittest.TestCase):
@@ -721,6 +718,49 @@ class TestNativeTextFileSink(unittest.TestCase):
with bz2.BZ2File(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), [])
+ def test_write_dataflow(self):
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | beam.core.Create('Create', self.lines)
+ pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned
+ pipeline.run()
+
+ read_result = []
+ for file_name in glob.glob(self.path + '*'):
+ with open(file_name, 'r') as f:
+ read_result.extend(f.read().splitlines())
+
+ self.assertEqual(read_result, self.lines)
+
+ def test_write_dataflow_auto_compression(self):
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | beam.core.Create('Create', self.lines)
+ pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned
+ fileio.NativeTextFileSink(
+ self.path, file_name_suffix='.gz'))
+ pipeline.run()
+
+ read_result = []
+ for file_name in glob.glob(self.path + '*'):
+ with gzip.GzipFile(file_name, 'r') as f:
+ read_result.extend(f.read().splitlines())
+
+ self.assertEqual(read_result, self.lines)
+
+ def test_write_dataflow_auto_compression_unsharded(self):
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | beam.core.Create('Create', self.lines)
+ pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned
+ fileio.NativeTextFileSink(
+ self.path + '.gz', shard_name_template=''))
+ pipeline.run()
+
+ read_result = []
+ for file_name in glob.glob(self.path + '*'):
+ with gzip.GzipFile(file_name, 'r') as f:
+ read_result.extend(f.read().splitlines())
+
+ self.assertEqual(read_result, self.lines)
+
class MyFileSink(fileio.FileSink):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 9c89b68..bb664d1 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -85,11 +85,9 @@ class _TextSource(filebasedsource.FileBasedSource):
def read_records(self, file_name, range_tracker):
start_offset = range_tracker.start_position()
-
read_buffer = _TextSource.ReadBuffer('', 0)
- file_to_read = self.open_file(file_name)
- try:
+ with self.open_file(file_name) as file_to_read:
if start_offset > 0:
# Seeking to one position before the start index and ignoring the
# current line. If start_position is at beginning if the line, that line
@@ -116,8 +114,6 @@ class _TextSource(filebasedsource.FileBasedSource):
if num_bytes_to_next_record < 0:
break
next_record_start_position += num_bytes_to_next_record
- finally:
- file_to_read.close()
def _find_separator_bounds(self, file_to_read, read_buffer):
# Determines the start and end positions within 'read_buffer.data' of the
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index acdac47..b1d3fb0 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -491,6 +491,32 @@ class TextSinkTest(unittest.TestCase):
self.assertEqual(read_result, self.lines)
+ def test_write_dataflow_auto_compression(self):
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | beam.core.Create('Create', self.lines)
+ pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz') # pylint: disable=expression-not-assigned
+ pipeline.run()
+
+ read_result = []
+ for file_name in glob.glob(self.path + '*'):
+ with gzip.GzipFile(file_name, 'r') as f:
+ read_result.extend(f.read().splitlines())
+
+ self.assertEqual(read_result, self.lines)
+
+ def test_write_dataflow_auto_compression_unsharded(self):
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | beam.core.Create('Create', self.lines)
+ pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='') # pylint: disable=expression-not-assigned
+ pipeline.run()
+
+ read_result = []
+ for file_name in glob.glob(self.path + '*'):
+ with gzip.GzipFile(file_name, 'r') as f:
+ read_result.extend(f.read().splitlines())
+
+ self.assertEqual(read_result, self.lines)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)