You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/11 18:24:02 UTC
[3/4] incubator-beam git commit: Add support for bz2 compression
Add support for bz2 compression
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a9a00bae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a9a00bae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a9a00bae
Branch: refs/heads/python-sdk
Commit: a9a00bae2f25237ffb58049d5ea72f532da24d78
Parents: 240f8f6
Author: tim1357 <se...@gmail.com>
Authored: Mon Sep 26 15:15:51 2016 -0400
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 11 11:15:07 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 33 ++++--
sdks/python/apache_beam/io/fileio_test.py | 156 ++++++++++++++++++++++++-
2 files changed, 179 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a9a00bae/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index c248f12..574295e 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -27,6 +27,7 @@ import shutil
import threading
import time
import zlib
+import bz2
import weakref
from apache_beam import coders
@@ -76,6 +77,9 @@ class CompressionTypes(object):
# ZLIB compression (deflate with ZLIB headers).
ZLIB = _CompressionType('zlib')
+ # BZIP2 compression (deflate with BZIP2 headers)
+ BZIP2 = _CompressionType('bz2')
+
# Uncompressed (i.e., may be split).
UNCOMPRESSED = _CompressionType('uncompressed')
@@ -92,14 +96,17 @@ class CompressionTypes(object):
def mime_type(cls, compression_type, default='application/octet-stream'):
mime_types_by_compression_type = {
cls.GZIP: 'application/x-gzip',
- cls.ZLIB: 'application/octet-stream'
+ cls.ZLIB: 'application/octet-stream',
+ cls.BZIP2: 'application/octet-stream'
}
return mime_types_by_compression_type.get(compression_type, default)
@classmethod
def detect_compression_type(cls, file_path):
"""Returns the compression type of a file (based on its suffix)"""
- compression_types_by_suffix = {'.gz': cls.GZIP, '.z': cls.ZLIB}
+ compression_types_by_suffix = {'.gz': cls.GZIP,
+ '.z': cls.ZLIB,
+ '.bz2': cls.BZIP2}
lowercased_path = file_path.lower()
for suffix, compression_type in compression_types_by_suffix.iteritems():
if lowercased_path.endswith(suffix):
@@ -596,14 +603,21 @@ class _CompressedFile(object):
self._compression_type = compression_type
if self._readable():
- self._decompressor = zlib.decompressobj(self._type_mask[compression_type])
+ if self._compression_type == CompressionTypes.BZIP2:
+ self._decompressor = bz2.BZ2Decompressor()
+ else:
+ self._decompressor = zlib.decompressobj(\
+ self._type_mask[compression_type])
else:
self._decompressor = None
if self._writeable():
- self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
- zlib.DEFLATED,
- self._type_mask[compression_type])
+ if self._compression_type == CompressionTypes.BZIP2:
+ self._compressor = bz2.BZ2Compressor(9)
+ else:
+ self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
+ zlib.DEFLATED,
+ self._type_mask[compression_type])
else:
self._compressor = None
@@ -638,10 +652,13 @@ class _CompressedFile(object):
buf = self._file.read(self._read_size)
if buf:
self._data += self._decompressor.decompress(buf)
- else:
- # EOF reached, flush.
+ elif self._compression_type != CompressionTypes.BZIP2:
+ # EOF reached, flush. BZIP2 does not have this flush method,
+ # because of it's block nature
self._data += self._decompressor.flush()
return
+ else:
+ return
def _read_from_internal_buffer(self, num_bytes):
"""Read up to num_bytes from the internal buffer."""
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a9a00bae/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index b518b97..1ff9acd 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -25,6 +25,7 @@ import os
import tempfile
import unittest
import zlib
+import bz2
import apache_beam as beam
from apache_beam import coders
@@ -149,6 +150,56 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, lines)
+ def test_read_entire_file_bzip(self):
+ lines = ['First', 'Second', 'Third']
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) +compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ compression_type=fileio.CompressionTypes.BZIP2)
+ read_lines = []
+ with source.reader() as reader:
+ for line in reader:
+ read_lines.append(line)
+ self.assertEqual(read_lines, lines)
+
+ def test_read_entire_file_bzip_auto(self):
+ lines = ['First', 'Second', 'Third']
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) + compressor.flush()
+ source = fileio.TextFileSource(file_path=self.create_temp_file(
+ data, suffix='.bz2'))
+ read_lines = []
+ with source.reader() as reader:
+ for line in reader:
+ read_lines.append(line)
+ self.assertEqual(read_lines, lines)
+
+ def test_read_entire_file_bzip_empty(self):
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('') + compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ compression_type=fileio.CompressionTypes.BZIP2)
+ read_lines = []
+ with source.reader() as reader:
+ for line in reader:
+ read_lines.append(line)
+ self.assertEqual(read_lines, [])
+
+ def test_read_entire_file_bzip_large(self):
+ lines = ['Line %d' % d for d in range(10 * 1000)]
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) + compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ compression_type=fileio.CompressionTypes.BZIP2)
+ read_lines = []
+ with source.reader() as reader:
+ for line in reader:
+ read_lines.append(line)
+ self.assertEqual(read_lines, lines)
+
def test_read_entire_file_zlib(self):
lines = ['First', 'Second', 'Third']
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -199,7 +250,7 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, lines)
- def test_skip_entire_file_gzip(self):
+ def test_skip_entire_file_zlib(self):
lines = ['First', 'Second', 'Third']
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
data = compressor.compress('\n'.join(lines)) + compressor.flush()
@@ -213,7 +264,21 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, [])
- def test_skip_entire_file_zlib(self):
+ def test_skip_entire_file_bzip(self):
+ lines = ['First', 'Second', 'Third']
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) + compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ start_offset=1, # Anything other than 0 should lead to a null-read.
+ compression_type=fileio.CompressionTypes.BZIP2)
+ read_lines = []
+ with source.reader() as reader:
+ for line in reader:
+ read_lines.append(line)
+ self.assertEqual(read_lines, [])
+
+ def test_skip_entire_file_gzip(self):
lines = ['First', 'Second', 'Third']
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
data = compressor.compress('\n'.join(lines)) + compressor.flush()
@@ -241,6 +306,20 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, lines)
+ def test_consume_entire_file_bzip(self):
+ lines = ['First', 'Second', 'Third']
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) + compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ end_offset=1, # Any end_offset should effectively be ignored.
+ compression_type=fileio.CompressionTypes.BZIP2)
+ read_lines = []
+ with source.reader() as reader:
+ for line in reader:
+ read_lines.append(line)
+ self.assertEqual(read_lines, lines)
+
def test_consume_entire_file_zlib(self):
lines = ['First', 'Second', 'Third']
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -289,6 +368,25 @@ class TestTextFileSource(unittest.TestCase):
self.assertEqual(len(progress_record), 3)
self.assertEqual(progress_record, [0, 6, 13])
+ def test_progress_entire_file_bzip(self):
+ lines = ['First', 'Second', 'Third']
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) + compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ compression_type=fileio.CompressionTypes.BZIP2)
+ progress_record = []
+ with source.reader() as reader:
+ self.assertEqual(-1, reader.get_progress().position.byte_offset)
+ for line in reader:
+ self.assertIsNotNone(line)
+ progress_record.append(reader.get_progress().position.byte_offset)
+ self.assertEqual(18, # Reading the entire contents before we decide EOF.
+ reader.get_progress().position.byte_offset)
+
+ self.assertEqual(len(progress_record), 3)
+ self.assertEqual(progress_record, [0, 6, 13])
+
def test_progress_entire_file_zlib(self):
lines = ['First', 'Second', 'Third']
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -353,6 +451,36 @@ class TestTextFileSource(unittest.TestCase):
dataflow_io.ReaderProgress(percent_complete=percent_complete)),
None)
+ def test_bzip_file_unsplittable(self):
+ lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) + compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ compression_type=fileio.CompressionTypes.BZIP2)
+
+ with source.reader() as reader:
+ percents_complete = [x / 100.0 for x in range(101)]
+
+ # Cursor at beginning of file.
+ for percent_complete in percents_complete:
+ self.try_splitting_reader_at(
+ reader,
+ iobase.DynamicSplitRequest(
+ iobase.ReaderProgress(percent_complete=percent_complete)),
+ None)
+
+ # Cursor passed beginning of file.
+ reader_iter = iter(reader)
+ next(reader_iter)
+ next(reader_iter)
+ for percent_complete in percents_complete:
+ self.try_splitting_reader_at(
+ reader,
+ iobase.DynamicSplitRequest(
+ iobase.ReaderProgress(percent_complete=percent_complete)),
+ None)
+
def test_zlib_file_unsplittable(self):
lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -659,6 +787,30 @@ class TestNativeTextFileSink(unittest.TestCase):
with gzip.GzipFile(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), [])
+ def test_write_text_bzip_file(self):
+ sink = fileio.NativeTextFileSink(
+ self.path, compression_type=fileio.CompressionTypes.BZIP2)
+ self._write_lines(sink, self.lines)
+
+ with bz2.BZ2File(self.path, 'r') as f:
+ self.assertEqual(f.read().splitlines(), self.lines)
+
+ def test_write_text_bzip_file_auto(self):
+ self.path = tempfile.NamedTemporaryFile(suffix='.bz2').name
+ sink = fileio.NativeTextFileSink(self.path)
+ self._write_lines(sink, self.lines)
+
+ with bz2.BZ2File(self.path, 'r') as f:
+ self.assertEqual(f.read().splitlines(), self.lines)
+
+ def test_write_text_bzip_file_empty(self):
+ sink = fileio.NativeTextFileSink(
+ self.path, compression_type=fileio.CompressionTypes.BZIP2)
+ self._write_lines(sink, [])
+
+ with bz2.BZ2File(self.path, 'r') as f:
+ self.assertEqual(f.read().splitlines(), [])
+
def test_write_text_zlib_file(self):
sink = fileio.NativeTextFileSink(
self.path, compression_type=fileio.CompressionTypes.ZLIB)