You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/02 18:54:01 UTC

[2/2] incubator-beam git commit: Add support for reading compressed files.

Add support for reading compressed files.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/22aba413
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/22aba413
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/22aba413

Branch: refs/heads/python-sdk
Commit: 22aba413cb16519a31fe1d72572d069d006c3666
Parents: 95656d1
Author: Slaven Bilac <sl...@google.com>
Authored: Thu Sep 1 14:09:54 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Sep 2 11:53:55 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py   | 42 +++++++++++--
 .../apache_beam/io/filebasedsource_test.py      | 63 ++++++++++++++++++++
 2 files changed, 101 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22aba413/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 4c9cd95..7b9fe47 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -26,11 +26,12 @@ For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
 """
 
 from multiprocessing.pool import ThreadPool
-import range_trackers
 
 from apache_beam.io import fileio
 from apache_beam.io import iobase
 
+import range_trackers
+
 MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
 
 
@@ -91,18 +92,45 @@ class _ConcatSource(iobase.BoundedSource):
 class FileBasedSource(iobase.BoundedSource):
   """A ``BoundedSource`` for reading a file glob of a given type."""
 
-  def __init__(self, file_pattern, min_bundle_size=0, splittable=True):
+  def __init__(self,
+               file_pattern,
+               min_bundle_size=0,
+               # TODO(BEAM-614)
+               compression_type=fileio.CompressionTypes.NO_COMPRESSION,
+               splittable=True):
     """Initializes ``FileBasedSource``.
 
     Args:
       file_pattern: the file glob to read.
       min_bundle_size: minimum size of bundles that should be generated when
                        performing initial splitting on this source.
+      compression_type: compression type to use
+      splittable: whether FileBasedSource should try to logically split a single
+                  file into data ranges so that different parts of the same file
+                  can be read in parallel. If set to False, FileBasedSource will
+                  prevent both initial and dynamic splitting of sources for
+                  single files. File patterns that represent multiple files may
+                  still get split into sources for individual files. Even if set
+                  to True by the user, FileBasedSource may choose to not split
+                  the file, for example, for compressed files where currently
+                  it is not possible to efficiently read a data range without
+                  decompressing the whole file.
+    Raises:
+      TypeError: when compression_type is not valid.
+      ValueError: when compression and splittable files are specified.
     """
     self._pattern = file_pattern
     self._concat_source = None
     self._min_bundle_size = min_bundle_size
-    self._splittable = splittable
+    if not fileio.CompressionTypes.is_valid_compression_type(compression_type):
+      raise TypeError('compression_type must be CompressionType object but '
+                      'was %s' % type(compression_type))
+    self._compression_type = compression_type
+    if compression_type != fileio.CompressionTypes.NO_COMPRESSION:
+      # We can't split compressed files efficiently so turn off splitting.
+      self._splittable = False
+    else:
+      self._splittable = splittable
 
   def _get_concat_source(self):
     if self._concat_source is None:
@@ -124,8 +152,14 @@ class FileBasedSource(iobase.BoundedSource):
     return self._concat_source
 
   def open_file(self, file_name):
-    return fileio.ChannelFactory.open(
+    raw_file = fileio.ChannelFactory.open(
         file_name, 'rb', 'application/octet-stream')
+    if self._compression_type == fileio.CompressionTypes.NO_COMPRESSION:
+      return raw_file
+    else:
+      return fileio._CompressedFile(  # pylint: disable=protected-access
+          fileobj=raw_file,
+          compression_type=self.compression_type)
 
   @staticmethod
   def _estimate_sizes_in_parallel(file_names):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22aba413/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index ed67346..725fd34 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -15,14 +15,17 @@
 # limitations under the License.
 #
 
+import gzip
 import logging
 import math
 import os
 import tempfile
 import unittest
+import zlib
 
 import apache_beam as beam
 from apache_beam.io import filebasedsource
+from apache_beam.io import fileio
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 
@@ -69,6 +72,20 @@ def _write_data(num_lines, directory=None, prefix=tempfile.template):
     return f.name, all_data
 
 
+def _write_prepared_data(data, directory=None, prefix=tempfile.template):
+  with tempfile.NamedTemporaryFile(
+      delete=False, dir=directory, prefix=prefix) as f:
+    f.write(data)
+    return f.name
+
+
+def _write_prepared_pattern(data):
+  temp_dir = tempfile.mkdtemp()
+  for d in data:
+    file_name = _write_prepared_data(d, temp_dir, prefix='mytemp')
+  return file_name[:file_name.rfind(os.path.sep)] + os.path.sep + 'mytemp*'
+
+
 def _write_pattern(lines_per_file):
   temp_dir = tempfile.mkdtemp()
 
@@ -285,6 +302,52 @@ class TestFileBasedSource(unittest.TestCase):
     assert len(expected_data) == 200
     self._run_dataflow_test(pattern, expected_data, False)
 
+  def test_read_gzip_file(self):
+    _, lines = _write_data(10)
+    filename = tempfile.NamedTemporaryFile(
+        delete=False, prefix=tempfile.template).name
+    with gzip.GzipFile(filename, 'wb') as f:
+      f.write('\n'.join(lines))
+
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+        filename,
+        splittable=False,
+        compression_type=fileio.CompressionTypes.GZIP))
+    assert_that(pcoll, equal_to(lines))
+
+  def test_read_zlib_file(self):
+    _, lines = _write_data(10)
+    compressobj = zlib.compressobj(
+        zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, zlib.MAX_WBITS)
+    compressed = compressobj.compress('\n'.join(lines)) + compressobj.flush()
+    filename = _write_prepared_data(compressed)
+
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+        filename,
+        splittable=False,
+        compression_type=fileio.CompressionTypes.ZLIB))
+    assert_that(pcoll, equal_to(lines))
+
+  def test_read_zlib_pattern(self):
+    _, lines = _write_data(200)
+    splits = [0, 34, 100, 140, 164, 188, 200]
+    chunks = [lines[splits[i-1]:splits[i]] for i in xrange(1, len(splits))]
+    compressed_chunks = []
+    for c in chunks:
+      compressobj = zlib.compressobj(
+          zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, zlib.MAX_WBITS)
+      compressed_chunks.append(
+          compressobj.compress('\n'.join(c)) + compressobj.flush())
+    file_pattern = _write_prepared_pattern(compressed_chunks)
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+        file_pattern,
+        splittable=False,
+        compression_type=fileio.CompressionTypes.ZLIB))
+    assert_that(pcoll, equal_to(lines))
+
 
 class TestSingleFileSource(unittest.TestCase):