You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/15 20:35:30 UTC
[1/3] incubator-beam git commit: Enhancements and deprecation
cleanups related to file compression.
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 56ab1a4d5 -> 24b7bcc26
Enhancements and deprecation cleanups related to file compression.
- Getting rid of CompressionTypes.ZLIB and
CompressionTypes.NO_COMPRESSION.
- Introducing BZIP2 compression in analogy to Dataflow Java's BZIP2,
towards resolution of https://issues.apache.org/jira/browse/BEAM-570.
- Introducing SNAPPY codec support for AVRO conciseness and in order
to fully resolve https://issues.apache.org/jira/browse/BEAM-570.
- Moving avroio from compression_type to codec as per various
discussions.
- A few cleanups in avroio.
- Making textio more DRY and doing a few cleanups.
- Raising exceptions when splitting is requested for
compressed source since that should never happen
(guaranteed by the service for the supported compression types).
- Using cStringIO instead of StringIO in various places as
decided in some other discussions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/88461ab8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/88461ab8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/88461ab8
Branch: refs/heads/python-sdk
Commit: 88461ab8161839f2fd8bd47a548201ed7bffb2a3
Parents: 56ab1a4
Author: Gus Katsiapis <ka...@katsiapis-linux.mtv.corp.google.com>
Authored: Tue Oct 4 19:41:07 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Oct 15 13:28:27 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/avroio.py | 54 ++---
sdks/python/apache_beam/io/avroio_test.py | 20 ++
.../apache_beam/io/filebasedsource_test.py | 47 ++--
sdks/python/apache_beam/io/fileio.py | 123 +++++-----
sdks/python/apache_beam/io/fileio_test.py | 238 ++-----------------
sdks/python/apache_beam/io/textio.py | 71 +++---
sdks/python/apache_beam/io/textio_test.py | 86 ++++---
7 files changed, 233 insertions(+), 406 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index fdf8dae..7de00df 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -16,8 +16,8 @@
#
"""Implements a source for reading Avro files."""
+import cStringIO as StringIO
import os
-import StringIO
import zlib
import avro
@@ -73,15 +73,10 @@ class ReadFromAvro(PTransform):
**kwargs: Additional keyword arguments to be passed to the base class.
"""
super(ReadFromAvro, self).__init__()
+ self._args = (file_pattern, min_bundle_size)
- self._file_pattern = file_pattern
- self._min_bundle_size = min_bundle_size
-
- def apply(self, pcoll):
- return pcoll.pipeline | Read(
- _AvroSource(
- file_pattern=self._file_pattern,
- min_bundle_size=self._min_bundle_size))
+ def apply(self, pvalue):
+ return pvalue.pipeline | Read(_AvroSource(*self._args))
class _AvroUtils(object):
@@ -247,24 +242,17 @@ class _AvroSource(filebasedsource.FileBasedSource):
yield record
-_avro_codecs = {
- fileio.CompressionTypes.UNCOMPRESSED: 'null',
- fileio.CompressionTypes.ZLIB: 'deflate',
- # fileio.CompressionTypes.SNAPPY: 'snappy',
-}
-
-
class WriteToAvro(beam.transforms.PTransform):
"""A ``PTransform`` for writing avro files."""
def __init__(self,
file_path_prefix,
schema,
+ codec='deflate',
file_name_suffix='',
num_shards=0,
shard_name_template=None,
- mime_type='application/x-avro',
- compression_type=fileio.CompressionTypes.ZLIB):
+ mime_type='application/x-avro'):
"""Initialize a WriteToAvro transform.
Args:
@@ -274,6 +262,8 @@ class WriteToAvro(beam.transforms.PTransform):
only this argument is specified and num_shards, shard_name_template, and
file_name_suffix use default values.
schema: The schema to use, as returned by avro.schema.parse
+ codec: The codec to use for block-level compression. Any string supported
+ by the Avro specification is accepted (for example 'null').
file_name_suffix: Suffix for the files written.
append_trailing_newlines: indicate whether this sink should write an
additional newline char after writing each element.
@@ -292,21 +282,15 @@ class WriteToAvro(beam.transforms.PTransform):
generated. The default pattern used is '-SSSSS-of-NNNNN'.
mime_type: The MIME type to use for the produced files, if the filesystem
supports specifying MIME types.
- compression_type: Used to handle compressed output files. Defaults to
- CompressionTypes.ZLIB
Returns:
A WriteToAvro transform usable for writing.
"""
- if compression_type not in _avro_codecs:
- raise ValueError(
- 'Compression type %s not supported by avro.' % compression_type)
- self.args = (file_path_prefix, schema, file_name_suffix, num_shards,
- shard_name_template, mime_type, compression_type)
+ self._args = (file_path_prefix, schema, codec, file_name_suffix, num_shards,
+ shard_name_template, mime_type)
def apply(self, pcoll):
- # pylint: disable=expression-not-assigned
- pcoll | beam.io.iobase.Write(_AvroSink(*self.args))
+ return pcoll | beam.io.iobase.Write(_AvroSink(*self._args))
class _AvroSink(fileio.FileSink):
@@ -315,11 +299,11 @@ class _AvroSink(fileio.FileSink):
def __init__(self,
file_path_prefix,
schema,
+ codec,
file_name_suffix,
num_shards,
shard_name_template,
- mime_type,
- compression_type):
+ mime_type):
super(_AvroSink, self).__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
@@ -327,16 +311,16 @@ class _AvroSink(fileio.FileSink):
shard_name_template=shard_name_template,
coder=None,
mime_type=mime_type,
- # Compression happens at the block level, not the file level.
+ # Compression happens at the block level using the supplied codec, and
+ # not at the file level.
compression_type=fileio.CompressionTypes.UNCOMPRESSED)
- self.schema = schema
- self.avro_compression_type = compression_type
+ self._schema = schema
+ self._codec = codec
def open(self, temp_path):
file_handle = super(_AvroSink, self).open(temp_path)
return avro.datafile.DataFileWriter(
- file_handle, avro.io.DatumWriter(), self.schema,
- _avro_codecs[self.avro_compression_type])
+ file_handle, avro.io.DatumWriter(), self._schema, self._codec)
def write_record(self, writer, value):
- writer.append(value)
+ writer.append(value)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 697bd2c..1c96d72 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -246,6 +246,26 @@ class TestAvro(unittest.TestCase):
readback = p | avroio.ReadFromAvro(path + '*') | beam.Map(json.dumps)
assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS]))
+ def test_sink_transform_snappy(self):
+ try:
+ import snappy # pylint: disable=unused-variable
+ with tempfile.NamedTemporaryFile() as dst:
+ path = dst.name
+ with beam.Pipeline('DirectPipelineRunner') as p:
+ # pylint: disable=expression-not-assigned
+ p | beam.Create(self.RECORDS) | avroio.WriteToAvro(
+ path,
+ self.SCHEMA,
+ codec='snappy')
+ with beam.Pipeline('DirectPipelineRunner') as p:
+ # json used for stable sortability
+ readback = p | avroio.ReadFromAvro(path + '*') | beam.Map(json.dumps)
+ assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS]))
+ except ImportError:
+ logging.warning(
+ 'Skipped test_sink_transform_snappy since snappy appears to not be '
+ 'installed.')
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 91b1ffb..f1ac482 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -15,14 +15,14 @@
# limitations under the License.
#
+import bz2
+import cStringIO as StringIO
import gzip
import logging
import math
import os
-import StringIO
import tempfile
import unittest
-import zlib
import apache_beam as beam
from apache_beam.io import filebasedsource
@@ -335,44 +335,43 @@ class TestFileBasedSource(unittest.TestCase):
assert len(expected_data) == 200
self._run_dataflow_test(pattern, expected_data, False)
- def test_read_gzip_file(self):
+ def test_read_file_bzip2(self):
_, lines = write_data(10)
filename = tempfile.NamedTemporaryFile(
delete=False, prefix=tempfile.template).name
- with gzip.GzipFile(filename, 'wb') as f:
+ with bz2.BZ2File(filename, 'wb') as f:
f.write('\n'.join(lines))
pipeline = beam.Pipeline('DirectPipelineRunner')
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
splittable=False,
- compression_type=fileio.CompressionTypes.GZIP))
+ compression_type=fileio.CompressionTypes.BZIP2))
assert_that(pcoll, equal_to(lines))
pipeline.run()
- def test_read_zlib_file(self):
+ def test_read_file_gzip(self):
_, lines = write_data(10)
- compressobj = zlib.compressobj(
- zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, zlib.MAX_WBITS)
- compressed = compressobj.compress('\n'.join(lines)) + compressobj.flush()
- filename = _write_prepared_data(compressed)
+ filename = tempfile.NamedTemporaryFile(
+ delete=False, prefix=tempfile.template).name
+ with gzip.GzipFile(filename, 'wb') as f:
+ f.write('\n'.join(lines))
pipeline = beam.Pipeline('DirectPipelineRunner')
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
splittable=False,
- compression_type=fileio.CompressionTypes.ZLIB))
+ compression_type=fileio.CompressionTypes.GZIP))
assert_that(pcoll, equal_to(lines))
pipeline.run()
- def test_read_zlib_pattern(self):
+ def test_read_pattern_bzip2(self):
_, lines = write_data(200)
splits = [0, 34, 100, 140, 164, 188, 200]
chunks = [lines[splits[i-1]:splits[i]] for i in xrange(1, len(splits))]
compressed_chunks = []
for c in chunks:
- compressobj = zlib.compressobj(
- zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, zlib.MAX_WBITS)
+ compressobj = bz2.BZ2Compressor()
compressed_chunks.append(
compressobj.compress('\n'.join(c)) + compressobj.flush())
file_pattern = write_prepared_pattern(compressed_chunks)
@@ -380,11 +379,11 @@ class TestFileBasedSource(unittest.TestCase):
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
file_pattern,
splittable=False,
- compression_type=fileio.CompressionTypes.ZLIB))
+ compression_type=fileio.CompressionTypes.BZIP2))
assert_that(pcoll, equal_to(lines))
pipeline.run()
- def test_read_gzip_pattern(self):
+ def test_read_pattern_gzip(self):
_, lines = write_data(200)
splits = [0, 34, 100, 140, 164, 188, 200]
chunks = [lines[splits[i-1]:splits[i]] for i in xrange(1, len(splits))]
@@ -403,7 +402,21 @@ class TestFileBasedSource(unittest.TestCase):
assert_that(pcoll, equal_to(lines))
pipeline.run()
- def test_read_auto_single_file(self):
+ def test_read_auto_single_file_bzip2(self):
+ _, lines = write_data(10)
+ filename = tempfile.NamedTemporaryFile(
+ delete=False, prefix=tempfile.template, suffix='.bz2').name
+ with gzip.GzipFile(filename, 'wb') as f:
+ f.write('\n'.join(lines))
+
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ filename,
+ compression_type=fileio.CompressionTypes.AUTO))
+ assert_that(pcoll, equal_to(lines))
+ pipeline.run()
+
+ def test_read_auto_single_file_gzip(self):
_, lines = write_data(10)
filename = tempfile.NamedTemporaryFile(
delete=False, prefix=tempfile.template, suffix='.gz').name
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 2670470..e6575b0 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -62,31 +62,23 @@ class _CompressionType(object):
class CompressionTypes(object):
"""Enum-like class representing known compression types."""
-
# Detect compression based on filename extension.
#
# The following extensions are currently recognized by auto-detection:
- # .gz (implies GZIP as described below)
- # .z (implies ZLIB as described below).
+ # .bz2 (implies BZIP2 as described below).
+ # .gz (implies GZIP as described below)
# Any non-recognized extension implies UNCOMPRESSED as described below.
AUTO = _CompressionType('auto')
+ # BZIP2 compression.
+ BZIP2 = _CompressionType('bzip2')
+
# GZIP compression (deflate with GZIP headers).
GZIP = _CompressionType('gzip')
- # ZLIB compression (deflate with ZLIB headers).
- ZLIB = _CompressionType('zlib')
-
- # BZIP2 compression (deflate with BZIP2 headers)
- BZIP2 = _CompressionType('bz2')
-
# Uncompressed (i.e., may be split).
UNCOMPRESSED = _CompressionType('uncompressed')
- # TODO: Remove this backwards-compatibility soon.
- # Deprecated. Use UNCOMPRESSED instead.
- NO_COMPRESSION = UNCOMPRESSED
-
@classmethod
def is_valid_compression_type(cls, compression_type):
"""Returns true for valid compression types, false otherwise."""
@@ -95,18 +87,15 @@ class CompressionTypes(object):
@classmethod
def mime_type(cls, compression_type, default='application/octet-stream'):
mime_types_by_compression_type = {
- cls.GZIP: 'application/x-gzip',
- cls.ZLIB: 'application/octet-stream',
cls.BZIP2: 'application/x-bz2',
+ cls.GZIP: 'application/x-gzip',
}
return mime_types_by_compression_type.get(compression_type, default)
@classmethod
def detect_compression_type(cls, file_path):
"""Returns the compression type of a file (based on its suffix)"""
- compression_types_by_suffix = {'.gz': cls.GZIP,
- '.z': cls.ZLIB,
- '.bz2': cls.BZIP2}
+ compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP}
lowercased_path = file_path.lower()
for suffix, compression_type in compression_types_by_suffix.iteritems():
if lowercased_path.endswith(suffix):
@@ -173,16 +162,10 @@ class NativeFileSource(dataflow_io.NativeSource):
self.mime_type = mime_type
def __eq__(self, other):
- # TODO: Remove this backwards-compatibility soon.
- def equiv_autos(lhs, rhs):
- return ((lhs == 'AUTO' and rhs == CompressionTypes.AUTO) or
- (lhs == CompressionTypes.AUTO and rhs == 'AUTO'))
-
return (self.file_path == other.file_path and
self.start_offset == other.start_offset and
self.end_offset == other.end_offset and
- (self.compression_type == other.compression_type or
- equiv_autos(self.compression_type, other.compression_type)) and
+ self.compression_type == other.compression_type and
self.coder == other.coder and self.mime_type == other.mime_type)
@property
@@ -234,12 +217,10 @@ class NativeFileSourceReader(dataflow_io.NativeSourceReader,
self.end_offset)
# Position to the appropriate start_offset.
- if self.start_offset > 0:
- if ChannelFactory.is_compressed(self.file):
- # TODO: Turns this warning into an exception soon.
- logging.warning(
- 'Encountered initial split starting at (%s) for compressed source.',
- self.start_offset)
+ if self.start_offset > 0 and ChannelFactory.is_compressed(self.file):
+ raise ValueError(
+ 'Unexpected positive start_offset (%s) for a compressed source: %s',
+ self.start_offset, self.source)
self.seek_to_true_start_offset()
return self
@@ -249,14 +230,11 @@ class NativeFileSourceReader(dataflow_io.NativeSourceReader,
def __iter__(self):
if self.current_offset > 0 and ChannelFactory.is_compressed(self.file):
- # When compression is enabled both initial and dynamic splitting should be
- # prevented. Here we prevent initial splitting by ignoring all splits
- # other than the split that starts at byte 0.
- #
- # TODO: Turns this warning into an exception soon.
- logging.warning('Ignoring split starting at (%s) for compressed source.',
- self.current_offset)
- return
+ # When compression is enabled both initial and dynamic splitting should
+ # not be allowed.
+ raise ValueError(
+ 'Unespected split starting at (%s) for compressed source: %s',
+ self.current_offset, self.source)
while True:
if not self.range_tracker.try_claim(record_start=self.current_offset):
@@ -318,10 +296,6 @@ class NativeFileSourceReader(dataflow_io.NativeSourceReader,
# When compression is enabled both initial and dynamic splitting should be
# prevented. Here we prevent dynamic splitting by ignoring all dynamic
# split requests at the reader.
- #
- # TODO: Turns this warning into an exception soon.
- logging.warning('FileBasedReader cannot be split since it is compressed. '
- 'Requested: %r', dynamic_split_request)
return
assert dynamic_split_request is not None
@@ -463,13 +437,9 @@ class ChannelFactory(object):
compression_type=CompressionTypes.AUTO):
if compression_type == CompressionTypes.AUTO:
compression_type = CompressionTypes.detect_compression_type(path)
- elif compression_type == 'AUTO':
- # TODO: Remove this backwards-compatibility soon.
- compression_type = CompressionTypes.detect_compression_type(path)
- else:
- if not CompressionTypes.is_valid_compression_type(compression_type):
- raise TypeError('compression_type must be CompressionType object but '
- 'was %s' % type(compression_type))
+ elif not CompressionTypes.is_valid_compression_type(compression_type):
+ raise TypeError('compression_type must be CompressionType object but '
+ 'was %s' % type(compression_type))
if path.startswith('gs://'):
# pylint: disable=wrong-import-order, wrong-import-position
@@ -584,10 +554,10 @@ class ChannelFactory(object):
class _CompressedFile(object):
"""Somewhat limited file wrapper for easier handling of compressed files."""
- _type_mask = {
- CompressionTypes.GZIP: zlib.MAX_WBITS | 16,
- CompressionTypes.ZLIB: zlib.MAX_WBITS,
- }
+
+ # The bit mask to use for the wbits parameters of the GZIP compressor and
+ # decompressor objects.
+ _gzip_mask = zlib.MAX_WBITS | 16
def __init__(self,
fileobj,
@@ -606,8 +576,7 @@ class _CompressedFile(object):
if self._compression_type == CompressionTypes.BZIP2:
self._decompressor = bz2.BZ2Decompressor()
else:
- self._decompressor = zlib.decompressobj(
- self._type_mask[compression_type])
+ self._decompressor = zlib.decompressobj(self._gzip_mask)
else:
self._decompressor = None
@@ -616,8 +585,7 @@ class _CompressedFile(object):
self._compressor = bz2.BZ2Compressor()
else:
self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
- zlib.DEFLATED,
- self._type_mask[compression_type])
+ zlib.DEFLATED, self._gzip_mask)
else:
self._compressor = None
@@ -625,10 +593,14 @@ class _CompressedFile(object):
if not CompressionTypes.is_valid_compression_type(compression_type):
raise TypeError('compression_type must be CompressionType object but '
'was %s' % type(compression_type))
- if (compression_type == CompressionTypes.AUTO or
- compression_type == CompressionTypes.UNCOMPRESSED):
+ if compression_type in (CompressionTypes.AUTO, CompressionTypes.UNCOMPRESSED
+ ):
+ raise ValueError(
+ 'Cannot create object with unspecified or no compression')
+ if compression_type not in (CompressionTypes.BZIP2, CompressionTypes.GZIP):
raise ValueError(
- 'cannot create object with unspecified or no compression')
+ 'compression_type %s not supported for whole-file compression',
+ compression_type)
def _readable(self):
mode = self._file.mode
@@ -652,12 +624,23 @@ class _CompressedFile(object):
buf = self._file.read(self._read_size)
if buf:
self._data += self._decompressor.decompress(buf)
- elif self._compression_type != CompressionTypes.BZIP2:
- # EOF reached, flush. BZIP2 does not have this flush method,
- # because of it's block nature
- self._data += self._decompressor.flush()
- return
else:
+ # EOF reached.
+ # Verify completeness and no corruption and flush (if needed by
+ # the underlying algorithm).
+ if self._compression_type == CompressionTypes.BZIP2:
+ # Having unused_data past end of stream would imply file corruption.
+ assert not self._decompressor.unused_data, 'Possible file corruption.'
+ try:
+ # EOF implies that the underlying BZIP2 stream must also have
+ # reached EFO. We expect this to raise an EOFError and we catch it
+ # below. Any other kind of error though would be problematic.
+ self._decompressor.decompress('dummy')
+ assert False, 'Possible file corruption.'
+ except EOFError:
+ pass # All is as expected!
+ else:
+ self._data += self._decompressor.flush()
return
def _read_from_internal_buffer(self, num_bytes):
@@ -697,8 +680,7 @@ class _CompressedFile(object):
if self._file is None:
return
- if self._writeable():
- self._file.write(self._compressor.flush())
+ self.flush()
self._file.close()
def flush(self):
@@ -1014,8 +996,9 @@ class NativeFileSink(dataflow_io.NativeSink):
self.file_name_suffix = file_name_suffix
self.num_shards = num_shards
# TODO: Update this when the service supports more patterns.
- self.shard_name_template = ('-SSSSS-of-NNNNN' if shard_name_template is None
- else shard_name_template)
+ self.shard_name_template = (DEFAULT_SHARD_NAME_TEMPLATE if
+ shard_name_template is None else
+ shard_name_template)
# TODO: Implement sink validation.
self.validate = validate
self.mime_type = mime_type
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 45435e6..15daf04 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -138,7 +138,7 @@ class TestTextFileSource(unittest.TestCase):
self.assertEqual(read_lines, [])
def test_read_entire_file_gzip_large(self):
- lines = ['Line %d' % d for d in range(10 * 1000)]
+ lines = ['Line %d' % d for d in range(100 * 1000)]
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
data = compressor.compress('\n'.join(lines)) + compressor.flush()
source = fileio.TextFileSource(
@@ -188,7 +188,7 @@ class TestTextFileSource(unittest.TestCase):
self.assertEqual(read_lines, [])
def test_read_entire_file_bzip2_large(self):
- lines = ['Line %d' % d for d in range(10 * 1000)]
+ lines = ['Line %d' % d for d in range(100 * 1000)]
compressor = bz2.BZ2Compressor()
data = compressor.compress('\n'.join(lines)) + compressor.flush()
source = fileio.TextFileSource(
@@ -200,140 +200,6 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, lines)
- def test_read_entire_file_zlib(self):
- lines = ['First', 'Second', 'Third']
- compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
- data = compressor.compress('\n'.join(lines)) + compressor.flush()
- source = fileio.TextFileSource(
- file_path=self.create_temp_file(data),
- compression_type=fileio.CompressionTypes.ZLIB)
- read_lines = []
- with source.reader() as reader:
- for line in reader:
- read_lines.append(line)
- self.assertEqual(read_lines, lines)
-
- def test_read_entire_file_zlib_auto(self):
- lines = ['First', 'Second', 'Third']
- compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
- data = compressor.compress('\n'.join(lines)) + compressor.flush()
- source = fileio.TextFileSource(file_path=self.create_temp_file(
- data, suffix='.Z'))
- read_lines = []
- with source.reader() as reader:
- for line in reader:
- read_lines.append(line)
- self.assertEqual(read_lines, lines)
-
- def test_read_entire_file_zlib_empty(self):
- compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
- data = compressor.compress('') + compressor.flush()
- source = fileio.TextFileSource(
- file_path=self.create_temp_file(data),
- compression_type=fileio.CompressionTypes.ZLIB)
- read_lines = []
- with source.reader() as reader:
- for line in reader:
- read_lines.append(line)
- self.assertEqual(read_lines, [])
-
- def test_read_entire_file_zlib_large(self):
- lines = ['Line %d' % d for d in range(10 * 1000)]
- compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
- data = compressor.compress('\n'.join(lines)) + compressor.flush()
- source = fileio.TextFileSource(
- file_path=self.create_temp_file(data),
- compression_type=fileio.CompressionTypes.ZLIB)
- read_lines = []
- with source.reader() as reader:
- for line in reader:
- read_lines.append(line)
- self.assertEqual(read_lines, lines)
-
- def test_skip_entire_file_zlib(self):
- lines = ['First', 'Second', 'Third']
- compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
- data = compressor.compress('\n'.join(lines)) + compressor.flush()
- source = fileio.TextFileSource(
- file_path=self.create_temp_file(data),
- start_offset=1, # Anything other than 0 should lead to a null-read.
- compression_type=fileio.CompressionTypes.ZLIB)
- read_lines = []
- with source.reader() as reader:
- for line in reader:
- read_lines.append(line)
- self.assertEqual(read_lines, [])
-
- def test_skip_entire_file_bzip2(self):
- lines = ['First', 'Second', 'Third']
- compressor = bz2.BZ2Compressor()
- data = compressor.compress('\n'.join(lines)) + compressor.flush()
- source = fileio.TextFileSource(
- file_path=self.create_temp_file(data),
- start_offset=1, # Anything other than 0 should lead to a null-read.
- compression_type=fileio.CompressionTypes.BZIP2)
- read_lines = []
- with source.reader() as reader:
- for line in reader:
- read_lines.append(line)
- self.assertEqual(read_lines, [])
-
- def test_skip_entire_file_gzip(self):
- lines = ['First', 'Second', 'Third']
- compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
- data = compressor.compress('\n'.join(lines)) + compressor.flush()
- source = fileio.TextFileSource(
- file_path=self.create_temp_file(data),
- start_offset=1, # Anything other than 0 should lead to a null-read.
- compression_type=fileio.CompressionTypes.GZIP)
- read_lines = []
- with source.reader() as reader:
- for line in reader:
- read_lines.append(line)
- self.assertEqual(read_lines, [])
-
- def test_consume_entire_file_gzip(self):
- lines = ['First', 'Second', 'Third']
- compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
- data = compressor.compress('\n'.join(lines)) + compressor.flush()
- source = fileio.TextFileSource(
- file_path=self.create_temp_file(data),
- end_offset=1, # Any end_offset should effectively be ignored.
- compression_type=fileio.CompressionTypes.GZIP)
- read_lines = []
- with source.reader() as reader:
- for line in reader:
- read_lines.append(line)
- self.assertEqual(read_lines, lines)
-
- def test_consume_entire_file_bzip2(self):
- lines = ['First', 'Second', 'Third']
- compressor = bz2.BZ2Compressor()
- data = compressor.compress('\n'.join(lines)) + compressor.flush()
- source = fileio.TextFileSource(
- file_path=self.create_temp_file(data),
- end_offset=1, # Any end_offset should effectively be ignored.
- compression_type=fileio.CompressionTypes.BZIP2)
- read_lines = []
- with source.reader() as reader:
- for line in reader:
- read_lines.append(line)
- self.assertEqual(read_lines, lines)
-
- def test_consume_entire_file_zlib(self):
- lines = ['First', 'Second', 'Third']
- compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
- data = compressor.compress('\n'.join(lines)) + compressor.flush()
- source = fileio.TextFileSource(
- file_path=self.create_temp_file(data),
- end_offset=1, # Any end_offset should effectively be ignored.
- compression_type=fileio.CompressionTypes.ZLIB)
- read_lines = []
- with source.reader() as reader:
- for line in reader:
- read_lines.append(line)
- self.assertEqual(read_lines, lines)
-
def test_progress_entire_file(self):
lines = ['First', 'Second', 'Third']
source = fileio.TextFileSource(
@@ -387,25 +253,6 @@ class TestTextFileSource(unittest.TestCase):
self.assertEqual(len(progress_record), 3)
self.assertEqual(progress_record, [0, 6, 13])
- def test_progress_entire_file_zlib(self):
- lines = ['First', 'Second', 'Third']
- compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
- data = compressor.compress('\n'.join(lines)) + compressor.flush()
- source = fileio.TextFileSource(
- file_path=self.create_temp_file(data),
- compression_type=fileio.CompressionTypes.ZLIB)
- progress_record = []
- with source.reader() as reader:
- self.assertEqual(-1, reader.get_progress().position.byte_offset)
- for line in reader:
- self.assertIsNotNone(line)
- progress_record.append(reader.get_progress().position.byte_offset)
- self.assertEqual(18, # Reading the entire contents before we decide EOF.
- reader.get_progress().position.byte_offset)
-
- self.assertEqual(len(progress_record), 3)
- self.assertEqual(progress_record, [0, 6, 13])
-
def try_splitting_reader_at(self, reader, split_request, expected_response):
actual_response = reader.request_dynamic_split(split_request)
@@ -421,10 +268,20 @@ class TestTextFileSource(unittest.TestCase):
return actual_response
- def test_gzip_file_unsplittable(self):
+ def test_file_unsplittable_gzip(self):
lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
data = compressor.compress('\n'.join(lines)) + compressor.flush()
+
+ with self.assertRaises(ValueError): # Unsplittable initially.
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ compression_type=fileio.CompressionTypes.GZIP,
+ start_offset=1) # Anything other than 0 will do.
+ with source.reader():
+ pass
+
+ # Unsplittable dynamically.
source = fileio.TextFileSource(
file_path=self.create_temp_file(data),
compression_type=fileio.CompressionTypes.GZIP)
@@ -451,44 +308,23 @@ class TestTextFileSource(unittest.TestCase):
dataflow_io.ReaderProgress(percent_complete=percent_complete)),
None)
- def test_bzip2_file_unsplittable(self):
+ def test_file_unsplittable_bzip2(self):
lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
compressor = bz2.BZ2Compressor()
data = compressor.compress('\n'.join(lines)) + compressor.flush()
- source = fileio.TextFileSource(
- file_path=self.create_temp_file(data),
- compression_type=fileio.CompressionTypes.BZIP2)
- with source.reader() as reader:
- percents_complete = [x / 100.0 for x in range(101)]
+ with self.assertRaises(ValueError): # Unsplittable initially.
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ compression_type=fileio.CompressionTypes.BZIP2,
+ start_offset=1) # Anything other than 0 will do.
+ with source.reader():
+ pass
- # Cursor at beginning of file.
- for percent_complete in percents_complete:
- self.try_splitting_reader_at(
- reader,
- dataflow_io.DynamicSplitRequest(
- dataflow_io.ReaderProgress(percent_complete=percent_complete)),
- None)
-
- # Cursor passed beginning of file.
- reader_iter = iter(reader)
- next(reader_iter)
- next(reader_iter)
- for percent_complete in percents_complete:
- self.try_splitting_reader_at(
- reader,
- dataflow_io.DynamicSplitRequest(
- dataflow_io.ReaderProgress(percent_complete=percent_complete)),
- None)
-
- def test_zlib_file_unsplittable(self):
- lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
- compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
- data = compressor.compress('\n'.join(lines)) + compressor.flush()
+ # Unsplittable dynamically.
source = fileio.TextFileSource(
file_path=self.create_temp_file(data),
- compression_type=fileio.CompressionTypes.ZLIB)
-
+ compression_type=fileio.CompressionTypes.BZIP2)
with source.reader() as reader:
percents_complete = [x / 100.0 for x in range(101)]
@@ -811,33 +647,6 @@ class TestNativeTextFileSink(unittest.TestCase):
with bz2.BZ2File(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), [])
- def test_write_text_zlib_file(self):
- sink = fileio.NativeTextFileSink(
- self.path, compression_type=fileio.CompressionTypes.ZLIB)
- self._write_lines(sink, self.lines)
-
- with open(self.path, 'r') as f:
- self.assertEqual(
- zlib.decompress(f.read(), zlib.MAX_WBITS).splitlines(), self.lines)
-
- def test_write_text_zlib_file_auto(self):
- self.path = tempfile.NamedTemporaryFile(suffix='.Z').name
- sink = fileio.NativeTextFileSink(self.path)
- self._write_lines(sink, self.lines)
-
- with open(self.path, 'r') as f:
- self.assertEqual(
- zlib.decompress(f.read(), zlib.MAX_WBITS).splitlines(), self.lines)
-
- def test_write_text_zlib_file_empty(self):
- sink = fileio.NativeTextFileSink(
- self.path, compression_type=fileio.CompressionTypes.ZLIB)
- self._write_lines(sink, [])
-
- with open(self.path, 'r') as f:
- self.assertEqual(
- zlib.decompress(f.read(), zlib.MAX_WBITS).splitlines(), [])
-
class MyFileSink(fileio.FileSink):
@@ -982,6 +791,7 @@ class TestFileSink(unittest.TestCase):
with self.assertRaises(IOError):
list(sink.finalize_write(init_token, [res1, res2]))
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 6042576..f1f5a25 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -174,10 +174,14 @@ class ReadFromText(PTransform):
This implementation only supports reading text encoded using UTF-8 or ASCII.
This does not support other encodings such as UTF-16 or UTF-32."""
- def __init__(self, file_pattern=None, min_bundle_size=0,
- compression_type=fileio.CompressionTypes.UNCOMPRESSED,
- strip_trailing_newlines=True,
- coder=coders.StrUtf8Coder(), **kwargs):
+ def __init__(
+ self,
+ file_pattern=None,
+ min_bundle_size=0,
+ compression_type=fileio.CompressionTypes.AUTO,
+ strip_trailing_newlines=True,
+ coder=coders.StrUtf8Coder(),
+ **kwargs):
"""Initialize the ReadFromText transform.
Args:
@@ -187,8 +191,9 @@ class ReadFromText(PTransform):
min_bundle_size: Minimum size of bundles that should be generated when
splitting this source into bundles. See
``FileBasedSource`` for more details.
- compression_type: Used to handle compressed input files. Should be an
- object of type fileio.CompressionTypes.
+ compression_type: Used to handle compressed input files. Typical value
+ is CompressionTypes.AUTO, in which case the underlying file_path's
+ extension will be used to detect the compression.
strip_trailing_newlines: Indicates whether this source should remove
the newline char in each line it reads before
decoding that line.
@@ -196,33 +201,25 @@ class ReadFromText(PTransform):
"""
super(ReadFromText, self).__init__(**kwargs)
- self._file_pattern = file_pattern
- self._min_bundle_size = min_bundle_size
- self._compression_type = compression_type
- self._strip_trailing_newlines = strip_trailing_newlines
- self._coder = coder
+ self._args = (file_pattern, min_bundle_size, compression_type,
+ strip_trailing_newlines, coder)
- def apply(self, pcoll):
- return pcoll | Read(_TextSource(
- self._file_pattern,
- self._min_bundle_size,
- self._compression_type,
- self._strip_trailing_newlines,
- self._coder))
+ def apply(self, pvalue):
+ return pvalue.pipeline | Read(_TextSource(*self._args))
class WriteToText(PTransform):
"""A PTransform for writing to text files."""
- def __init__(self,
- file_path_prefix,
- file_name_suffix='',
- append_trailing_newlines=True,
- num_shards=0,
- shard_name_template=None,
- coder=coders.ToStringCoder(),
- compression_type=fileio.CompressionTypes.NO_COMPRESSION,
- ):
+ def __init__(
+ self,
+ file_path_prefix,
+ file_name_suffix='',
+ append_trailing_newlines=True,
+ num_shards=0,
+ shard_name_template=None,
+ coder=coders.ToStringCoder(),
+ compression_type=fileio.CompressionTypes.AUTO):
"""Initialize a WriteToText PTransform.
Args:
@@ -248,19 +245,15 @@ class WriteToText(PTransform):
case it behaves as if num_shards was set to 1 and only one file will be
generated. The default pattern used is '-SSSSS-of-NNNNN'.
coder: Coder used to encode each line.
- compression_type: Type of compression to use for this sink.
+ compression_type: Used to handle compressed output files. Typical value
+ is CompressionTypes.AUTO, in which case the final file path's
+ extension (as determined by file_path_prefix, file_name_suffix,
+ num_shards and shard_name_template) will be used to detect the
+ compression.
"""
- self._file_path_prefix = file_path_prefix
- self._file_name_suffix = file_name_suffix
- self._append_trailing_newlines = append_trailing_newlines
- self._num_shards = num_shards
- self._shard_name_template = shard_name_template
- self._coder = coder
- self._compression_type = compression_type
+ self._args = (file_path_prefix, file_name_suffix, append_trailing_newlines,
+ num_shards, shard_name_template, coder, compression_type)
def apply(self, pcoll):
- return pcoll | Write(_TextSink(
- self._file_path_prefix, self._file_name_suffix,
- self._append_trailing_newlines, self._num_shards,
- self._shard_name_template, self._coder, self._compression_type))
+ return pcoll | Write(_TextSink(*self._args))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index fef2b79..d42de2b 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -17,13 +17,13 @@
"""Tests for textio module."""
+import bz2
import glob
import gzip
import logging
import os
import tempfile
import unittest
-import zlib
import apache_beam as beam
import apache_beam.io.source_test_utils as source_test_utils
@@ -261,6 +261,44 @@ class TextSourceTest(unittest.TestCase):
assert_that(pcoll, equal_to(expected_data))
pipeline.run()
+ def test_read_auto_bzip2(self):
+ _, lines = write_data(15)
+ file_name = tempfile.NamedTemporaryFile(
+ delete=False, prefix=tempfile.template, suffix='.bz2').name
+ with bz2.BZFile(file_name, 'wb') as f:
+ f.write('\n'.join(lines))
+
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | 'Read' >> ReadFromText(file_name)
+ assert_that(pcoll, equal_to(lines))
+ pipeline.run()
+
+ def test_read_auto_gzip(self):
+ _, lines = write_data(15)
+ file_name = tempfile.NamedTemporaryFile(
+ delete=False, prefix=tempfile.template, suffix='.gz').name
+ with gzip.GzipFile(file_name, 'wb') as f:
+ f.write('\n'.join(lines))
+
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | 'Read' >> ReadFromText(file_name)
+ assert_that(pcoll, equal_to(lines))
+ pipeline.run()
+
+ def test_read_bzip2(self):
+ _, lines = write_data(15)
+ file_name = tempfile.NamedTemporaryFile(
+ delete=False, prefix=tempfile.template).name
+ with bz2.BZFile(file_name, 'wb') as f:
+ f.write('\n'.join(lines))
+
+ pipeline = beam.Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | 'Read' >> ReadFromText(
+ file_name,
+ compression_type=CompressionTypes.BZIP2)
+ assert_that(pcoll, equal_to(lines))
+ pipeline.run()
+
def test_read_gzip(self):
_, lines = write_data(15)
file_name = tempfile.NamedTemporaryFile(
@@ -355,6 +393,22 @@ class TextSinkTest(unittest.TestCase):
with open(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), [])
+ def test_write_bzip2_file(self):
+ sink = TextSink(
+ self.path, compression_type=CompressionTypes.BZIP2)
+ self._write_lines(sink, self.lines)
+
+ with bz2.BZ2File(self.path, 'r') as f:
+ self.assertEqual(f.read().splitlines(), self.lines)
+
+ def test_write_bzip2_file_auto(self):
+ self.path = tempfile.NamedTemporaryFile(suffix='.bz2').name
+ sink = TextSink(self.path)
+ self._write_lines(sink, self.lines)
+
+ with bz2.BZ2File(self.path, 'r') as f:
+ self.assertEqual(f.read().splitlines(), self.lines)
+
def test_write_gzip_file(self):
sink = TextSink(
self.path, compression_type=CompressionTypes.GZIP)
@@ -379,36 +433,6 @@ class TextSinkTest(unittest.TestCase):
with gzip.GzipFile(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), [])
- def test_write_zlib_file(self):
- sink = TextSink(
- self.path, compression_type=CompressionTypes.ZLIB)
- self._write_lines(sink, self.lines)
-
- with open(self.path, 'r') as f:
- content = f.read()
- self.assertEqual(
- zlib.decompress(content, zlib.MAX_WBITS).splitlines(), self.lines)
-
- def test_write_zlib_file_auto(self):
- self.path = tempfile.NamedTemporaryFile(suffix='.Z').name
- sink = TextSink(self.path)
- self._write_lines(sink, self.lines)
-
- with open(self.path, 'r') as f:
- content = f.read()
- self.assertEqual(
- zlib.decompress(content, zlib.MAX_WBITS).splitlines(), self.lines)
-
- def test_write_zlib_file_empty(self):
- sink = TextSink(
- self.path, compression_type=CompressionTypes.ZLIB)
- self._write_lines(sink, [])
-
- with open(self.path, 'r') as f:
- content = f.read()
- self.assertEqual(
- zlib.decompress(content, zlib.MAX_WBITS).splitlines(), [])
-
def test_write_dataflow(self):
pipeline = beam.Pipeline('DirectPipelineRunner')
pcoll = pipeline | beam.core.Create('Create', self.lines)
[2/3] incubator-beam git commit: A few test fixes and other cleanups.
Posted by ro...@apache.org.
A few test fixes and other cleanups.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4575854
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4575854
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4575854
Branch: refs/heads/python-sdk
Commit: b4575854b2c4aa9e56905dcd8f7385042a438eac
Parents: 88461ab
Author: Gus Katsiapis <ka...@katsiapis-linux.mtv.corp.google.com>
Authored: Thu Oct 6 09:55:38 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Oct 15 13:30:06 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/avroio.py | 4 ++--
sdks/python/apache_beam/io/filebasedsource_test.py | 2 +-
sdks/python/apache_beam/io/fileio.py | 5 +++--
sdks/python/apache_beam/io/textio_test.py | 4 ++--
4 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4575854/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 7de00df..53ed95a 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -287,7 +287,7 @@ class WriteToAvro(beam.transforms.PTransform):
A WriteToAvro transform usable for writing.
"""
self._args = (file_path_prefix, schema, codec, file_name_suffix, num_shards,
- shard_name_template, mime_type)
+ shard_name_template, mime_type)
def apply(self, pcoll):
return pcoll | beam.io.iobase.Write(_AvroSink(*self._args))
@@ -323,4 +323,4 @@ class _AvroSink(fileio.FileSink):
file_handle, avro.io.DatumWriter(), self._schema, self._codec)
def write_record(self, writer, value):
- writer.append(value)
\ No newline at end of file
+ writer.append(value)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4575854/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index f1ac482..2c68d2e 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -406,7 +406,7 @@ class TestFileBasedSource(unittest.TestCase):
_, lines = write_data(10)
filename = tempfile.NamedTemporaryFile(
delete=False, prefix=tempfile.template, suffix='.bz2').name
- with gzip.GzipFile(filename, 'wb') as f:
+ with bz2.BZ2File(filename, 'wb') as f:
f.write('\n'.join(lines))
pipeline = beam.Pipeline('DirectPipelineRunner')
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4575854/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index e6575b0..f74ac9c 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -633,7 +633,7 @@ class _CompressedFile(object):
assert not self._decompressor.unused_data, 'Possible file corruption.'
try:
# EOF implies that the underlying BZIP2 stream must also have
- # reached EFO. We expect this to raise an EOFError and we catch it
+ # reached EOF. We expect this to raise an EOFError and we catch it
# below. Any other kind of error though would be problematic.
self._decompressor.decompress('dummy')
assert False, 'Possible file corruption.'
@@ -680,7 +680,8 @@ class _CompressedFile(object):
if self._file is None:
return
- self.flush()
+ if self._writeable():
+ self._file.write(self._compressor.flush())
self._file.close()
def flush(self):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4575854/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index d42de2b..109506a 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -265,7 +265,7 @@ class TextSourceTest(unittest.TestCase):
_, lines = write_data(15)
file_name = tempfile.NamedTemporaryFile(
delete=False, prefix=tempfile.template, suffix='.bz2').name
- with bz2.BZFile(file_name, 'wb') as f:
+ with bz2.BZ2File(file_name, 'wb') as f:
f.write('\n'.join(lines))
pipeline = beam.Pipeline('DirectPipelineRunner')
@@ -289,7 +289,7 @@ class TextSourceTest(unittest.TestCase):
_, lines = write_data(15)
file_name = tempfile.NamedTemporaryFile(
delete=False, prefix=tempfile.template).name
- with bz2.BZFile(file_name, 'wb') as f:
+ with bz2.BZ2File(file_name, 'wb') as f:
f.write('\n'.join(lines))
pipeline = beam.Pipeline('DirectPipelineRunner')
[3/3] incubator-beam git commit: Closes #1053
Posted by ro...@apache.org.
Closes #1053
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24b7bcc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24b7bcc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24b7bcc2
Branch: refs/heads/python-sdk
Commit: 24b7bcc26c5d99d992dad51f58b14be7d1fbc551
Parents: 56ab1a4 b457585
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Sat Oct 15 13:31:46 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Oct 15 13:31:46 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/avroio.py | 52 ++--
sdks/python/apache_beam/io/avroio_test.py | 20 ++
.../apache_beam/io/filebasedsource_test.py | 47 ++--
sdks/python/apache_beam/io/fileio.py | 120 ++++------
sdks/python/apache_beam/io/fileio_test.py | 238 ++-----------------
sdks/python/apache_beam/io/textio.py | 71 +++---
sdks/python/apache_beam/io/textio_test.py | 86 ++++---
7 files changed, 231 insertions(+), 403 deletions(-)
----------------------------------------------------------------------