You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/04 20:00:45 UTC
[1/2] incubator-beam git commit: Updates filebasedsource to support
CompressionType.AUTO.
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 731a77152 -> 3a69db0c5
Updates filebasedsource to support CompressionType.AUTO.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2126a34c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2126a34c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2126a34c
Branch: refs/heads/python-sdk
Commit: 2126a34c06d74c5ad44fbec8dd4e278f99ed473a
Parents: 731a771
Author: chamikara@google.com <ch...@google.com>
Authored: Sun Sep 25 21:44:34 2016 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Mon Oct 3 10:20:46 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 44 ++++----
.../apache_beam/io/filebasedsource_test.py | 104 +++++++++++++++++--
2 files changed, 121 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2126a34c/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 8ff69ca..e067833 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -42,8 +42,7 @@ class FileBasedSource(iobase.BoundedSource):
def __init__(self,
file_pattern,
min_bundle_size=0,
- # TODO(BEAM-614)
- compression_type=fileio.CompressionTypes.UNCOMPRESSED,
+ compression_type=fileio.CompressionTypes.AUTO,
splittable=True):
"""Initializes ``FileBasedSource``.
@@ -72,13 +71,6 @@ class FileBasedSource(iobase.BoundedSource):
'%s: file_pattern must be a string; got %r instead' %
(self.__class__.__name__, file_pattern))
- if compression_type == fileio.CompressionTypes.AUTO:
- raise ValueError('FileBasedSource currently does not support '
- 'CompressionTypes.AUTO. Please explicitly specify the '
- 'compression type or use '
- 'CompressionTypes.UNCOMPRESSED if file is '
- 'uncompressed.')
-
self._pattern = file_pattern
self._concat_source = None
self._min_bundle_size = min_bundle_size
@@ -86,11 +78,12 @@ class FileBasedSource(iobase.BoundedSource):
raise TypeError('compression_type must be CompressionType object but '
'was %s' % type(compression_type))
self._compression_type = compression_type
- if compression_type != fileio.CompressionTypes.UNCOMPRESSED:
+ if compression_type in (fileio.CompressionTypes.UNCOMPRESSED,
+ fileio.CompressionTypes.AUTO):
+ self._splittable = splittable
+ else:
# We can't split compressed files efficiently so turn off splitting.
self._splittable = False
- else:
- self._splittable = splittable
def _get_concat_source(self):
if self._concat_source is None:
@@ -102,11 +95,21 @@ class FileBasedSource(iobase.BoundedSource):
if sizes[index] == 0:
continue # Ignoring empty file.
+ # We determine splittability of this specific file.
+ splittable = self.splittable
+ if (splittable and
+ self._compression_type == fileio.CompressionTypes.AUTO):
+ compression_type = fileio.CompressionTypes.detect_compression_type(
+ file_name)
+ if compression_type != fileio.CompressionTypes.UNCOMPRESSED:
+ splittable = False
+
single_file_source = _SingleFileSource(
self, file_name,
0,
sizes[index],
- min_bundle_size=self._min_bundle_size)
+ min_bundle_size=self._min_bundle_size,
+ splittable=splittable)
single_file_sources.append(single_file_source)
self._concat_source = concat_source.ConcatSource(single_file_sources)
return self._concat_source
@@ -173,7 +176,7 @@ class _SingleFileSource(iobase.BoundedSource):
"""Denotes a source for a specific file type."""
def __init__(self, file_based_source, file_name, start_offset, stop_offset,
- min_bundle_size=0):
+ min_bundle_size=0, splittable=True):
if not isinstance(start_offset, (int, long)):
raise TypeError(
'start_offset must be a number. Received: %r' % start_offset)
@@ -193,6 +196,7 @@ class _SingleFileSource(iobase.BoundedSource):
self._stop_offset = stop_offset
self._min_bundle_size = min_bundle_size
self._file_based_source = file_based_source
+ self._splittable = splittable
def split(self, desired_bundle_size, start_offset=None, stop_offset=None):
if start_offset is None:
@@ -200,7 +204,7 @@ class _SingleFileSource(iobase.BoundedSource):
if stop_offset is None:
stop_offset = self._stop_offset
- if self._file_based_source.splittable:
+ if self._splittable:
bundle_size = max(desired_bundle_size, self._min_bundle_size)
bundle_start = start_offset
@@ -214,7 +218,8 @@ class _SingleFileSource(iobase.BoundedSource):
self._file_name,
bundle_start,
bundle_stop,
- min_bundle_size=self._min_bundle_size),
+ min_bundle_size=self._min_bundle_size,
+ splittable=self._splittable),
bundle_start,
bundle_stop)
bundle_start = bundle_stop
@@ -230,7 +235,8 @@ class _SingleFileSource(iobase.BoundedSource):
self._file_name,
start_offset,
range_trackers.OffsetRangeTracker.OFFSET_INFINITY,
- min_bundle_size=self._min_bundle_size
+ min_bundle_size=self._min_bundle_size,
+ splittable=self._splittable
),
start_offset,
range_trackers.OffsetRangeTracker.OFFSET_INFINITY
@@ -248,12 +254,12 @@ class _SingleFileSource(iobase.BoundedSource):
# file as end offset will be wrong for certain unsplittable source, for
# e.g., compressed sources.
stop_position = (
- self._stop_offset if self._file_based_source.splittable
+ self._stop_offset if self._splittable
else range_trackers.OffsetRangeTracker.OFFSET_INFINITY)
range_tracker = range_trackers.OffsetRangeTracker(
start_position, stop_position)
- if not self._file_based_source.splittable:
+ if not self._splittable:
range_tracker = range_trackers.UnsplittableRangeTracker(range_tracker)
return range_tracker
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2126a34c/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index d45a6f9..91b1ffb 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -19,6 +19,7 @@ import gzip
import logging
import math
import os
+import StringIO
import tempfile
import unittest
import zlib
@@ -44,18 +45,23 @@ class LineSource(FileBasedSource):
f = self.open_file(file_name)
try:
start = range_tracker.start_position()
- f.seek(start)
if start > 0:
- f.seek(-1, os.SEEK_CUR)
+ # Any line that starts after 'start' does not belong to the current
+ # bundle. Seeking to (start - 1) and skipping a line moves the current
+ # position to the starting position of the first line that belongs to
+ # the current bundle.
start -= 1
+ f.seek(start)
line = f.readline()
start += len(line)
current = start
- for line in f:
+ line = f.readline()
+ while line:
if not range_tracker.try_claim(current):
return
yield line.rstrip('\n')
current += len(line)
+ line = f.readline()
finally:
f.close()
@@ -94,17 +100,22 @@ def write_data(
return f.name, all_data
-def _write_prepared_data(data, directory=None, prefix=tempfile.template):
+def _write_prepared_data(data, directory=None,
+ prefix=tempfile.template, suffix=''):
with tempfile.NamedTemporaryFile(
- delete=False, dir=directory, prefix=prefix) as f:
+ delete=False, dir=directory, prefix=prefix, suffix=suffix) as f:
f.write(data)
return f.name
-def write_prepared_pattern(data):
+def write_prepared_pattern(data, suffixes=None):
+ if suffixes is None:
+ suffixes = [''] * len(data)
temp_dir = tempfile.mkdtemp()
- for d in data:
- file_name = _write_prepared_data(d, temp_dir, prefix='mytemp')
+ assert len(data) > 0
+ for i, d in enumerate(data):
+ file_name = _write_prepared_data(d, temp_dir, prefix='mytemp',
+ suffix=suffixes[i])
return file_name[:file_name.rfind(os.path.sep)] + os.path.sep + 'mytemp*'
@@ -337,6 +348,7 @@ class TestFileBasedSource(unittest.TestCase):
splittable=False,
compression_type=fileio.CompressionTypes.GZIP))
assert_that(pcoll, equal_to(lines))
+ pipeline.run()
def test_read_zlib_file(self):
_, lines = write_data(10)
@@ -351,6 +363,7 @@ class TestFileBasedSource(unittest.TestCase):
splittable=False,
compression_type=fileio.CompressionTypes.ZLIB))
assert_that(pcoll, equal_to(lines))
+ pipeline.run()
def test_read_zlib_pattern(self):
_, lines = write_data(200)
@@ -369,6 +382,81 @@ class TestFileBasedSource(unittest.TestCase):
splittable=False,
compression_type=fileio.CompressionTypes.ZLIB))
assert_that(pcoll, equal_to(lines))
+ pipeline.run()
+
+ def test_read_gzip_pattern(self):
+ _, lines = write_data(200)
+ splits = [0, 34, 100, 140, 164, 188, 200]
+ chunks = [lines[splits[i-1]:splits[i]] for i in xrange(1, len(splits))]
+ compressed_chunks = []
+ for c in chunks:
+ out = StringIO.StringIO()
+ with gzip.GzipFile(fileobj=out, mode="w") as f:
+ f.write('\n'.join(c))
+ compressed_chunks.append(out.getvalue())
+ file_pattern = write_prepared_pattern(compressed_chunks)
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ file_pattern,
+ splittable=False,
+ compression_type=fileio.CompressionTypes.GZIP))
+ assert_that(pcoll, equal_to(lines))
+ pipeline.run()
+
+ def test_read_auto_single_file(self):
+ _, lines = write_data(10)
+ filename = tempfile.NamedTemporaryFile(
+ delete=False, prefix=tempfile.template, suffix='.gz').name
+ with gzip.GzipFile(filename, 'wb') as f:
+ f.write('\n'.join(lines))
+
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ filename,
+ compression_type=fileio.CompressionTypes.AUTO))
+ assert_that(pcoll, equal_to(lines))
+ pipeline.run()
+
+ def test_read_auto_pattern(self):
+ _, lines = write_data(200)
+ splits = [0, 34, 100, 140, 164, 188, 200]
+ chunks = [lines[splits[i - 1]:splits[i]] for i in xrange(1, len(splits))]
+ compressed_chunks = []
+ for c in chunks:
+ out = StringIO.StringIO()
+ with gzip.GzipFile(fileobj=out, mode="w") as f:
+ f.write('\n'.join(c))
+ compressed_chunks.append(out.getvalue())
+ file_pattern = write_prepared_pattern(
+ compressed_chunks, suffixes=['.gz']*len(chunks))
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ file_pattern,
+ compression_type=fileio.CompressionTypes.AUTO))
+ assert_that(pcoll, equal_to(lines))
+ pipeline.run()
+
+ def test_read_auto_pattern_compressed_and_uncompressed(self):
+ _, lines = write_data(200)
+ splits = [0, 34, 100, 140, 164, 188, 200]
+ chunks = [lines[splits[i - 1]:splits[i]] for i in xrange(1, len(splits))]
+ chunks_to_write = []
+ for i, c in enumerate(chunks):
+ if i%2 == 0:
+ out = StringIO.StringIO()
+ with gzip.GzipFile(fileobj=out, mode="w") as f:
+ f.write('\n'.join(c))
+ chunks_to_write.append(out.getvalue())
+ else:
+ chunks_to_write.append('\n'.join(c))
+ file_pattern = write_prepared_pattern(chunks_to_write,
+ suffixes=(['.gz', '']*3))
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ file_pattern,
+ compression_type=fileio.CompressionTypes.AUTO))
+ assert_that(pcoll, equal_to(lines))
+ pipeline.run()
class TestSingleFileSource(unittest.TestCase):
[2/2] incubator-beam git commit: Closes #1002
Posted by ro...@apache.org.
Closes #1002
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3a69db0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3a69db0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3a69db0c
Branch: refs/heads/python-sdk
Commit: 3a69db0c5bb9321c2082831b5c00d778ddf1b1d7
Parents: 731a771 2126a34
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Oct 4 13:00:21 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 4 13:00:21 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 44 ++++----
.../apache_beam/io/filebasedsource_test.py | 104 +++++++++++++++++--
2 files changed, 121 insertions(+), 27 deletions(-)
----------------------------------------------------------------------