You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/11/21 19:35:31 UTC

[3/4] incubator-beam git commit: A few improvements to Apache Beam Python's FileIO.

A few improvements to Apache Beam Python's FileIO.

- Ensuring that AUTO compression works properly for FileSinks.
- Introducing __enter__ and __exit__ in _CompressedFile to allow use
  of "with", and updating textio accordingly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e85f67a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e85f67a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e85f67a1

Branch: refs/heads/python-sdk
Commit: e85f67a1a467a26259a849bd20c42e89f165828e
Parents: c1440f7
Author: Gus Katsiapis <ka...@katsiapis-linux.mtv.corp.google.com>
Authored: Fri Nov 18 18:31:20 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Nov 21 11:29:07 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py      | 10 +++++-
 sdks/python/apache_beam/io/fileio_test.py | 48 +++++++++++++++++++++++---
 sdks/python/apache_beam/io/textio.py      |  6 +---
 sdks/python/apache_beam/io/textio_test.py | 26 ++++++++++++++
 4 files changed, 80 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 4d0eea6..1dcd622 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -749,6 +749,12 @@ class _CompressedFile(object):
   def seekable(self):
     return False
 
+  def __enter__(self):
+    return self
+
+  def __exit__(self, exception_type, exception_value, traceback):
+    self.close()
+
 
 class FileSink(iobase.Sink):
   """A sink to a GCS or local files.
@@ -855,7 +861,9 @@ class FileSink(iobase.Sink):
     return tmp_dir
 
   def open_writer(self, init_result, uid):
-    return FileSinkWriter(self, os.path.join(init_result, uid))
+    # A proper suffix is needed for AUTO compression detection.
+    suffix = os.path.basename(self.file_path_prefix) + self.file_name_suffix
+    return FileSinkWriter(self, os.path.join(init_result, uid) + suffix)
 
   def finalize_write(self, init_result, writer_results):
     writer_results = sorted(writer_results)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 9d1e424..098ace1 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -38,10 +38,7 @@ from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 
 # TODO: Add tests for file patterns (ie not just individual files) for both
-# uncompressed
-
-# TODO: Update code to not use NamedTemporaryFile (or to use it in a way that
-# doesn't violate its assumptions).
+# compressed and uncompressed files.
 
 
 class TestTextFileSource(unittest.TestCase):
@@ -721,6 +718,49 @@ class TestNativeTextFileSink(unittest.TestCase):
     with bz2.BZ2File(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), [])
 
+  def test_write_dataflow(self):
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | beam.core.Create('Create', self.lines)
+    pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path))  # pylint: disable=expression-not-assigned
+    pipeline.run()
+
+    read_result = []
+    for file_name in glob.glob(self.path + '*'):
+      with open(file_name, 'r') as f:
+        read_result.extend(f.read().splitlines())
+
+    self.assertEqual(read_result, self.lines)
+
+  def test_write_dataflow_auto_compression(self):
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | beam.core.Create('Create', self.lines)
+    pcoll | 'Write' >> beam.Write(  # pylint: disable=expression-not-assigned
+        fileio.NativeTextFileSink(
+            self.path, file_name_suffix='.gz'))
+    pipeline.run()
+
+    read_result = []
+    for file_name in glob.glob(self.path + '*'):
+      with gzip.GzipFile(file_name, 'r') as f:
+        read_result.extend(f.read().splitlines())
+
+    self.assertEqual(read_result, self.lines)
+
+  def test_write_dataflow_auto_compression_unsharded(self):
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | beam.core.Create('Create', self.lines)
+    pcoll | 'Write' >> beam.Write(  # pylint: disable=expression-not-assigned
+        fileio.NativeTextFileSink(
+            self.path + '.gz', shard_name_template=''))
+    pipeline.run()
+
+    read_result = []
+    for file_name in glob.glob(self.path + '*'):
+      with gzip.GzipFile(file_name, 'r') as f:
+        read_result.extend(f.read().splitlines())
+
+    self.assertEqual(read_result, self.lines)
+
 
 class MyFileSink(fileio.FileSink):
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 9c89b68..bb664d1 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -85,11 +85,9 @@ class _TextSource(filebasedsource.FileBasedSource):
 
   def read_records(self, file_name, range_tracker):
     start_offset = range_tracker.start_position()
-
     read_buffer = _TextSource.ReadBuffer('', 0)
-    file_to_read = self.open_file(file_name)
 
-    try:
+    with self.open_file(file_name) as file_to_read:
       if start_offset > 0:
         # Seeking to one position before the start index and ignoring the
         # current line. If start_position is at beginning if the line, that line
@@ -116,8 +114,6 @@ class _TextSource(filebasedsource.FileBasedSource):
         if num_bytes_to_next_record < 0:
           break
         next_record_start_position += num_bytes_to_next_record
-    finally:
-      file_to_read.close()
 
   def _find_separator_bounds(self, file_to_read, read_buffer):
     # Determines the start and end positions within 'read_buffer.data' of the

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index acdac47..b1d3fb0 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -491,6 +491,32 @@ class TextSinkTest(unittest.TestCase):
 
     self.assertEqual(read_result, self.lines)
 
+  def test_write_dataflow_auto_compression(self):
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | beam.core.Create('Create', self.lines)
+    pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz')  # pylint: disable=expression-not-assigned
+    pipeline.run()
+
+    read_result = []
+    for file_name in glob.glob(self.path + '*'):
+      with gzip.GzipFile(file_name, 'r') as f:
+        read_result.extend(f.read().splitlines())
+
+    self.assertEqual(read_result, self.lines)
+
+  def test_write_dataflow_auto_compression_unsharded(self):
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | beam.core.Create('Create', self.lines)
+    pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='')  # pylint: disable=expression-not-assigned
+    pipeline.run()
+
+    read_result = []
+    for file_name in glob.glob(self.path + '*'):
+      with gzip.GzipFile(file_name, 'r') as f:
+        read_result.extend(f.read().splitlines())
+
+    self.assertEqual(read_result, self.lines)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)