You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/02 18:54:01 UTC
[2/2] incubator-beam git commit: Add support for reading compressed
files.
Add support for reading compressed files.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/22aba413
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/22aba413
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/22aba413
Branch: refs/heads/python-sdk
Commit: 22aba413cb16519a31fe1d72572d069d006c3666
Parents: 95656d1
Author: Slaven Bilac <sl...@google.com>
Authored: Thu Sep 1 14:09:54 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Sep 2 11:53:55 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 42 +++++++++++--
.../apache_beam/io/filebasedsource_test.py | 63 ++++++++++++++++++++
2 files changed, 101 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22aba413/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 4c9cd95..7b9fe47 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -26,11 +26,12 @@ For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
"""
from multiprocessing.pool import ThreadPool
-import range_trackers
from apache_beam.io import fileio
from apache_beam.io import iobase
+import range_trackers
+
MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
@@ -91,18 +92,45 @@ class _ConcatSource(iobase.BoundedSource):
class FileBasedSource(iobase.BoundedSource):
"""A ``BoundedSource`` for reading a file glob of a given type."""
- def __init__(self, file_pattern, min_bundle_size=0, splittable=True):
+ def __init__(self,
+ file_pattern,
+ min_bundle_size=0,
+ # TODO(BEAM-614)
+ compression_type=fileio.CompressionTypes.NO_COMPRESSION,
+ splittable=True):
"""Initializes ``FileBasedSource``.
Args:
file_pattern: the file glob to read.
min_bundle_size: minimum size of bundles that should be generated when
performing initial splitting on this source.
+ compression_type: compression type to use
+ splittable: whether FileBasedSource should try to logically split a single
+ file into data ranges so that different parts of the same file
+ can be read in parallel. If set to False, FileBasedSource will
+ prevent both initial and dynamic splitting of sources for
+ single files. File patterns that represent multiple files may
+ still get split into sources for individual files. Even if set
+ to True by the user, FileBasedSource may choose to not split
+ the file, for example, for compressed files where currently
+ it is not possible to efficiently read a data range without
+ decompressing the whole file.
+ Raises:
+ TypeError: when compression_type is not valid.
+ ValueError: when compression and splittable files are specified.
"""
self._pattern = file_pattern
self._concat_source = None
self._min_bundle_size = min_bundle_size
- self._splittable = splittable
+ if not fileio.CompressionTypes.is_valid_compression_type(compression_type):
+ raise TypeError('compression_type must be CompressionType object but '
+ 'was %s' % type(compression_type))
+ self._compression_type = compression_type
+ if compression_type != fileio.CompressionTypes.NO_COMPRESSION:
+ # We can't split compressed files efficiently so turn off splitting.
+ self._splittable = False
+ else:
+ self._splittable = splittable
def _get_concat_source(self):
if self._concat_source is None:
@@ -124,8 +152,14 @@ class FileBasedSource(iobase.BoundedSource):
return self._concat_source
def open_file(self, file_name):
- return fileio.ChannelFactory.open(
+ raw_file = fileio.ChannelFactory.open(
file_name, 'rb', 'application/octet-stream')
+ if self._compression_type == fileio.CompressionTypes.NO_COMPRESSION:
+ return raw_file
+ else:
+ return fileio._CompressedFile( # pylint: disable=protected-access
+ fileobj=raw_file,
+ compression_type=self.compression_type)
@staticmethod
def _estimate_sizes_in_parallel(file_names):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22aba413/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index ed67346..725fd34 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -15,14 +15,17 @@
# limitations under the License.
#
+import gzip
import logging
import math
import os
import tempfile
import unittest
+import zlib
import apache_beam as beam
from apache_beam.io import filebasedsource
+from apache_beam.io import fileio
from apache_beam.io import iobase
from apache_beam.io import range_trackers
@@ -69,6 +72,20 @@ def _write_data(num_lines, directory=None, prefix=tempfile.template):
return f.name, all_data
+def _write_prepared_data(data, directory=None, prefix=tempfile.template):
+ with tempfile.NamedTemporaryFile(
+ delete=False, dir=directory, prefix=prefix) as f:
+ f.write(data)
+ return f.name
+
+
+def _write_prepared_pattern(data):
+ temp_dir = tempfile.mkdtemp()
+ for d in data:
+ file_name = _write_prepared_data(d, temp_dir, prefix='mytemp')
+ return file_name[:file_name.rfind(os.path.sep)] + os.path.sep + 'mytemp*'
+
+
def _write_pattern(lines_per_file):
temp_dir = tempfile.mkdtemp()
@@ -285,6 +302,52 @@ class TestFileBasedSource(unittest.TestCase):
assert len(expected_data) == 200
self._run_dataflow_test(pattern, expected_data, False)
+ def test_read_gzip_file(self):
+ _, lines = _write_data(10)
+ filename = tempfile.NamedTemporaryFile(
+ delete=False, prefix=tempfile.template).name
+ with gzip.GzipFile(filename, 'wb') as f:
+ f.write('\n'.join(lines))
+
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ filename,
+ splittable=False,
+ compression_type=fileio.CompressionTypes.GZIP))
+ assert_that(pcoll, equal_to(lines))
+
+ def test_read_zlib_file(self):
+ _, lines = _write_data(10)
+ compressobj = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, zlib.MAX_WBITS)
+ compressed = compressobj.compress('\n'.join(lines)) + compressobj.flush()
+ filename = _write_prepared_data(compressed)
+
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ filename,
+ splittable=False,
+ compression_type=fileio.CompressionTypes.ZLIB))
+ assert_that(pcoll, equal_to(lines))
+
+ def test_read_zlib_pattern(self):
+ _, lines = _write_data(200)
+ splits = [0, 34, 100, 140, 164, 188, 200]
+ chunks = [lines[splits[i-1]:splits[i]] for i in xrange(1, len(splits))]
+ compressed_chunks = []
+ for c in chunks:
+ compressobj = zlib.compressobj(
+ zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, zlib.MAX_WBITS)
+ compressed_chunks.append(
+ compressobj.compress('\n'.join(c)) + compressobj.flush())
+ file_pattern = _write_prepared_pattern(compressed_chunks)
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ file_pattern,
+ splittable=False,
+ compression_type=fileio.CompressionTypes.ZLIB))
+ assert_that(pcoll, equal_to(lines))
+
class TestSingleFileSource(unittest.TestCase):