You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/11 18:24:00 UTC
[1/4] incubator-beam git commit: Closes #1007
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 240f8f673 -> eb57b974c
Closes #1007
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46110e4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46110e4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46110e4a
Branch: refs/heads/python-sdk
Commit: 46110e4accaf3ddad5d2f7b64da3f23092b112a9
Parents: 240f8f6 e223929
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Oct 11 11:15:07 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 11 11:15:07 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 33 ++++--
sdks/python/apache_beam/io/fileio_test.py | 156 ++++++++++++++++++++++++-
2 files changed, 179 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
[2/4] incubator-beam git commit: implement codreview feedback
Posted by ro...@apache.org.
implement codreview feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e223929c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e223929c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e223929c
Branch: refs/heads/python-sdk
Commit: e223929c68233853ea4d3b589078dbeaff0f6111
Parents: a9a00ba
Author: tim1357 <se...@gmail.com>
Authored: Wed Oct 5 13:36:09 2016 -0400
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 11 11:15:07 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 10 +++---
sdks/python/apache_beam/io/fileio_test.py | 42 +++++++++++++-------------
2 files changed, 26 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e223929c/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 574295e..2670470 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -18,6 +18,7 @@
from __future__ import absolute_import
+import bz2
import glob
import logging
from multiprocessing.pool import ThreadPool
@@ -27,7 +28,6 @@ import shutil
import threading
import time
import zlib
-import bz2
import weakref
from apache_beam import coders
@@ -97,7 +97,7 @@ class CompressionTypes(object):
mime_types_by_compression_type = {
cls.GZIP: 'application/x-gzip',
cls.ZLIB: 'application/octet-stream',
- cls.BZIP2: 'application/octet-stream'
+ cls.BZIP2: 'application/x-bz2',
}
return mime_types_by_compression_type.get(compression_type, default)
@@ -606,14 +606,14 @@ class _CompressedFile(object):
if self._compression_type == CompressionTypes.BZIP2:
self._decompressor = bz2.BZ2Decompressor()
else:
- self._decompressor = zlib.decompressobj(\
- self._type_mask[compression_type])
+ self._decompressor = zlib.decompressobj(
+ self._type_mask[compression_type])
else:
self._decompressor = None
if self._writeable():
if self._compression_type == CompressionTypes.BZIP2:
- self._compressor = bz2.BZ2Compressor(9)
+ self._compressor = bz2.BZ2Compressor()
else:
self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e223929c/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 1ff9acd..42ea9ff 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -18,6 +18,7 @@
"""Unit tests for local and GCS sources and sinks."""
+import bz2
import glob
import gzip
import logging
@@ -25,7 +26,6 @@ import os
import tempfile
import unittest
import zlib
-import bz2
import apache_beam as beam
from apache_beam import coders
@@ -150,10 +150,10 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, lines)
- def test_read_entire_file_bzip(self):
+ def test_read_entire_file_bzip2(self):
lines = ['First', 'Second', 'Third']
- compressor = bz2.BZ2Compressor(8)
- data = compressor.compress('\n'.join(lines)) +compressor.flush()
+ compressor = bz2.BZ2Compressor()
+ data = compressor.compress('\n'.join(lines)) + compressor.flush()
source = fileio.TextFileSource(
file_path=self.create_temp_file(data),
compression_type=fileio.CompressionTypes.BZIP2)
@@ -163,9 +163,9 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, lines)
- def test_read_entire_file_bzip_auto(self):
+ def test_read_entire_file_bzip2_auto(self):
lines = ['First', 'Second', 'Third']
- compressor = bz2.BZ2Compressor(8)
+ compressor = bz2.BZ2Compressor()
data = compressor.compress('\n'.join(lines)) + compressor.flush()
source = fileio.TextFileSource(file_path=self.create_temp_file(
data, suffix='.bz2'))
@@ -175,8 +175,8 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, lines)
- def test_read_entire_file_bzip_empty(self):
- compressor = bz2.BZ2Compressor(8)
+ def test_read_entire_file_bzip2_empty(self):
+ compressor = bz2.BZ2Compressor()
data = compressor.compress('') + compressor.flush()
source = fileio.TextFileSource(
file_path=self.create_temp_file(data),
@@ -187,9 +187,9 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, [])
- def test_read_entire_file_bzip_large(self):
+ def test_read_entire_file_bzip2_large(self):
lines = ['Line %d' % d for d in range(10 * 1000)]
- compressor = bz2.BZ2Compressor(8)
+ compressor = bz2.BZ2Compressor()
data = compressor.compress('\n'.join(lines)) + compressor.flush()
source = fileio.TextFileSource(
file_path=self.create_temp_file(data),
@@ -264,9 +264,9 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, [])
- def test_skip_entire_file_bzip(self):
+ def test_skip_entire_file_bzip2(self):
lines = ['First', 'Second', 'Third']
- compressor = bz2.BZ2Compressor(8)
+ compressor = bz2.BZ2Compressor()
data = compressor.compress('\n'.join(lines)) + compressor.flush()
source = fileio.TextFileSource(
file_path=self.create_temp_file(data),
@@ -306,9 +306,9 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, lines)
- def test_consume_entire_file_bzip(self):
+ def test_consume_entire_file_bzip2(self):
lines = ['First', 'Second', 'Third']
- compressor = bz2.BZ2Compressor(8)
+ compressor = bz2.BZ2Compressor()
data = compressor.compress('\n'.join(lines)) + compressor.flush()
source = fileio.TextFileSource(
file_path=self.create_temp_file(data),
@@ -368,9 +368,9 @@ class TestTextFileSource(unittest.TestCase):
self.assertEqual(len(progress_record), 3)
self.assertEqual(progress_record, [0, 6, 13])
- def test_progress_entire_file_bzip(self):
+ def test_progress_entire_file_bzip2(self):
lines = ['First', 'Second', 'Third']
- compressor = bz2.BZ2Compressor(8)
+ compressor = bz2.BZ2Compressor()
data = compressor.compress('\n'.join(lines)) + compressor.flush()
source = fileio.TextFileSource(
file_path=self.create_temp_file(data),
@@ -451,9 +451,9 @@ class TestTextFileSource(unittest.TestCase):
dataflow_io.ReaderProgress(percent_complete=percent_complete)),
None)
- def test_bzip_file_unsplittable(self):
+ def test_bzip2_file_unsplittable(self):
lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
- compressor = bz2.BZ2Compressor(8)
+ compressor = bz2.BZ2Compressor()
data = compressor.compress('\n'.join(lines)) + compressor.flush()
source = fileio.TextFileSource(
file_path=self.create_temp_file(data),
@@ -787,7 +787,7 @@ class TestNativeTextFileSink(unittest.TestCase):
with gzip.GzipFile(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), [])
- def test_write_text_bzip_file(self):
+ def test_write_text_bzip2_file(self):
sink = fileio.NativeTextFileSink(
self.path, compression_type=fileio.CompressionTypes.BZIP2)
self._write_lines(sink, self.lines)
@@ -795,7 +795,7 @@ class TestNativeTextFileSink(unittest.TestCase):
with bz2.BZ2File(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), self.lines)
- def test_write_text_bzip_file_auto(self):
+ def test_write_text_bzip2_file_auto(self):
self.path = tempfile.NamedTemporaryFile(suffix='.bz2').name
sink = fileio.NativeTextFileSink(self.path)
self._write_lines(sink, self.lines)
@@ -803,7 +803,7 @@ class TestNativeTextFileSink(unittest.TestCase):
with bz2.BZ2File(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), self.lines)
- def test_write_text_bzip_file_empty(self):
+ def test_write_text_bzip2_file_empty(self):
sink = fileio.NativeTextFileSink(
self.path, compression_type=fileio.CompressionTypes.BZIP2)
self._write_lines(sink, [])
[4/4] incubator-beam git commit: Post-merge fixup.
Posted by ro...@apache.org.
Post-merge fixup.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eb57b974
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eb57b974
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eb57b974
Branch: refs/heads/python-sdk
Commit: eb57b974c1ca9d231e614debaf40f686da9e4e02
Parents: 46110e4
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Oct 11 11:23:27 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 11 11:23:27 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio_test.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb57b974/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 42ea9ff..45435e6 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -466,8 +466,8 @@ class TestTextFileSource(unittest.TestCase):
for percent_complete in percents_complete:
self.try_splitting_reader_at(
reader,
- iobase.DynamicSplitRequest(
- iobase.ReaderProgress(percent_complete=percent_complete)),
+ dataflow_io.DynamicSplitRequest(
+ dataflow_io.ReaderProgress(percent_complete=percent_complete)),
None)
# Cursor passed beginning of file.
@@ -477,8 +477,8 @@ class TestTextFileSource(unittest.TestCase):
for percent_complete in percents_complete:
self.try_splitting_reader_at(
reader,
- iobase.DynamicSplitRequest(
- iobase.ReaderProgress(percent_complete=percent_complete)),
+ dataflow_io.DynamicSplitRequest(
+ dataflow_io.ReaderProgress(percent_complete=percent_complete)),
None)
def test_zlib_file_unsplittable(self):
[3/4] incubator-beam git commit: Add support for bz2 compression
Posted by ro...@apache.org.
Add support for bz2 compression
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a9a00bae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a9a00bae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a9a00bae
Branch: refs/heads/python-sdk
Commit: a9a00bae2f25237ffb58049d5ea72f532da24d78
Parents: 240f8f6
Author: tim1357 <se...@gmail.com>
Authored: Mon Sep 26 15:15:51 2016 -0400
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 11 11:15:07 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 33 ++++--
sdks/python/apache_beam/io/fileio_test.py | 156 ++++++++++++++++++++++++-
2 files changed, 179 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a9a00bae/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index c248f12..574295e 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -27,6 +27,7 @@ import shutil
import threading
import time
import zlib
+import bz2
import weakref
from apache_beam import coders
@@ -76,6 +77,9 @@ class CompressionTypes(object):
# ZLIB compression (deflate with ZLIB headers).
ZLIB = _CompressionType('zlib')
+ # BZIP2 compression (deflate with BZIP2 headers)
+ BZIP2 = _CompressionType('bz2')
+
# Uncompressed (i.e., may be split).
UNCOMPRESSED = _CompressionType('uncompressed')
@@ -92,14 +96,17 @@ class CompressionTypes(object):
def mime_type(cls, compression_type, default='application/octet-stream'):
mime_types_by_compression_type = {
cls.GZIP: 'application/x-gzip',
- cls.ZLIB: 'application/octet-stream'
+ cls.ZLIB: 'application/octet-stream',
+ cls.BZIP2: 'application/octet-stream'
}
return mime_types_by_compression_type.get(compression_type, default)
@classmethod
def detect_compression_type(cls, file_path):
"""Returns the compression type of a file (based on its suffix)"""
- compression_types_by_suffix = {'.gz': cls.GZIP, '.z': cls.ZLIB}
+ compression_types_by_suffix = {'.gz': cls.GZIP,
+ '.z': cls.ZLIB,
+ '.bz2': cls.BZIP2}
lowercased_path = file_path.lower()
for suffix, compression_type in compression_types_by_suffix.iteritems():
if lowercased_path.endswith(suffix):
@@ -596,14 +603,21 @@ class _CompressedFile(object):
self._compression_type = compression_type
if self._readable():
- self._decompressor = zlib.decompressobj(self._type_mask[compression_type])
+ if self._compression_type == CompressionTypes.BZIP2:
+ self._decompressor = bz2.BZ2Decompressor()
+ else:
+ self._decompressor = zlib.decompressobj(\
+ self._type_mask[compression_type])
else:
self._decompressor = None
if self._writeable():
- self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
- zlib.DEFLATED,
- self._type_mask[compression_type])
+ if self._compression_type == CompressionTypes.BZIP2:
+ self._compressor = bz2.BZ2Compressor(9)
+ else:
+ self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
+ zlib.DEFLATED,
+ self._type_mask[compression_type])
else:
self._compressor = None
@@ -638,10 +652,13 @@ class _CompressedFile(object):
buf = self._file.read(self._read_size)
if buf:
self._data += self._decompressor.decompress(buf)
- else:
- # EOF reached, flush.
+ elif self._compression_type != CompressionTypes.BZIP2:
+ # EOF reached, flush. BZIP2 does not have this flush method,
+ # because of it's block nature
self._data += self._decompressor.flush()
return
+ else:
+ return
def _read_from_internal_buffer(self, num_bytes):
"""Read up to num_bytes from the internal buffer."""
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a9a00bae/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index b518b97..1ff9acd 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -25,6 +25,7 @@ import os
import tempfile
import unittest
import zlib
+import bz2
import apache_beam as beam
from apache_beam import coders
@@ -149,6 +150,56 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, lines)
+ def test_read_entire_file_bzip(self):
+ lines = ['First', 'Second', 'Third']
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) +compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ compression_type=fileio.CompressionTypes.BZIP2)
+ read_lines = []
+ with source.reader() as reader:
+ for line in reader:
+ read_lines.append(line)
+ self.assertEqual(read_lines, lines)
+
+ def test_read_entire_file_bzip_auto(self):
+ lines = ['First', 'Second', 'Third']
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) + compressor.flush()
+ source = fileio.TextFileSource(file_path=self.create_temp_file(
+ data, suffix='.bz2'))
+ read_lines = []
+ with source.reader() as reader:
+ for line in reader:
+ read_lines.append(line)
+ self.assertEqual(read_lines, lines)
+
+ def test_read_entire_file_bzip_empty(self):
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('') + compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ compression_type=fileio.CompressionTypes.BZIP2)
+ read_lines = []
+ with source.reader() as reader:
+ for line in reader:
+ read_lines.append(line)
+ self.assertEqual(read_lines, [])
+
+ def test_read_entire_file_bzip_large(self):
+ lines = ['Line %d' % d for d in range(10 * 1000)]
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) + compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ compression_type=fileio.CompressionTypes.BZIP2)
+ read_lines = []
+ with source.reader() as reader:
+ for line in reader:
+ read_lines.append(line)
+ self.assertEqual(read_lines, lines)
+
def test_read_entire_file_zlib(self):
lines = ['First', 'Second', 'Third']
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -199,7 +250,7 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, lines)
- def test_skip_entire_file_gzip(self):
+ def test_skip_entire_file_zlib(self):
lines = ['First', 'Second', 'Third']
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
data = compressor.compress('\n'.join(lines)) + compressor.flush()
@@ -213,7 +264,21 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, [])
- def test_skip_entire_file_zlib(self):
+ def test_skip_entire_file_bzip(self):
+ lines = ['First', 'Second', 'Third']
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) + compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ start_offset=1, # Anything other than 0 should lead to a null-read.
+ compression_type=fileio.CompressionTypes.BZIP2)
+ read_lines = []
+ with source.reader() as reader:
+ for line in reader:
+ read_lines.append(line)
+ self.assertEqual(read_lines, [])
+
+ def test_skip_entire_file_gzip(self):
lines = ['First', 'Second', 'Third']
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
data = compressor.compress('\n'.join(lines)) + compressor.flush()
@@ -241,6 +306,20 @@ class TestTextFileSource(unittest.TestCase):
read_lines.append(line)
self.assertEqual(read_lines, lines)
+ def test_consume_entire_file_bzip(self):
+ lines = ['First', 'Second', 'Third']
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) + compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ end_offset=1, # Any end_offset should effectively be ignored.
+ compression_type=fileio.CompressionTypes.BZIP2)
+ read_lines = []
+ with source.reader() as reader:
+ for line in reader:
+ read_lines.append(line)
+ self.assertEqual(read_lines, lines)
+
def test_consume_entire_file_zlib(self):
lines = ['First', 'Second', 'Third']
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -289,6 +368,25 @@ class TestTextFileSource(unittest.TestCase):
self.assertEqual(len(progress_record), 3)
self.assertEqual(progress_record, [0, 6, 13])
+ def test_progress_entire_file_bzip(self):
+ lines = ['First', 'Second', 'Third']
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) + compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ compression_type=fileio.CompressionTypes.BZIP2)
+ progress_record = []
+ with source.reader() as reader:
+ self.assertEqual(-1, reader.get_progress().position.byte_offset)
+ for line in reader:
+ self.assertIsNotNone(line)
+ progress_record.append(reader.get_progress().position.byte_offset)
+ self.assertEqual(18, # Reading the entire contents before we decide EOF.
+ reader.get_progress().position.byte_offset)
+
+ self.assertEqual(len(progress_record), 3)
+ self.assertEqual(progress_record, [0, 6, 13])
+
def test_progress_entire_file_zlib(self):
lines = ['First', 'Second', 'Third']
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -353,6 +451,36 @@ class TestTextFileSource(unittest.TestCase):
dataflow_io.ReaderProgress(percent_complete=percent_complete)),
None)
+ def test_bzip_file_unsplittable(self):
+ lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
+ compressor = bz2.BZ2Compressor(8)
+ data = compressor.compress('\n'.join(lines)) + compressor.flush()
+ source = fileio.TextFileSource(
+ file_path=self.create_temp_file(data),
+ compression_type=fileio.CompressionTypes.BZIP2)
+
+ with source.reader() as reader:
+ percents_complete = [x / 100.0 for x in range(101)]
+
+ # Cursor at beginning of file.
+ for percent_complete in percents_complete:
+ self.try_splitting_reader_at(
+ reader,
+ iobase.DynamicSplitRequest(
+ iobase.ReaderProgress(percent_complete=percent_complete)),
+ None)
+
+ # Cursor passed beginning of file.
+ reader_iter = iter(reader)
+ next(reader_iter)
+ next(reader_iter)
+ for percent_complete in percents_complete:
+ self.try_splitting_reader_at(
+ reader,
+ iobase.DynamicSplitRequest(
+ iobase.ReaderProgress(percent_complete=percent_complete)),
+ None)
+
def test_zlib_file_unsplittable(self):
lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -659,6 +787,30 @@ class TestNativeTextFileSink(unittest.TestCase):
with gzip.GzipFile(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), [])
+ def test_write_text_bzip_file(self):
+ sink = fileio.NativeTextFileSink(
+ self.path, compression_type=fileio.CompressionTypes.BZIP2)
+ self._write_lines(sink, self.lines)
+
+ with bz2.BZ2File(self.path, 'r') as f:
+ self.assertEqual(f.read().splitlines(), self.lines)
+
+ def test_write_text_bzip_file_auto(self):
+ self.path = tempfile.NamedTemporaryFile(suffix='.bz2').name
+ sink = fileio.NativeTextFileSink(self.path)
+ self._write_lines(sink, self.lines)
+
+ with bz2.BZ2File(self.path, 'r') as f:
+ self.assertEqual(f.read().splitlines(), self.lines)
+
+ def test_write_text_bzip_file_empty(self):
+ sink = fileio.NativeTextFileSink(
+ self.path, compression_type=fileio.CompressionTypes.BZIP2)
+ self._write_lines(sink, [])
+
+ with bz2.BZ2File(self.path, 'r') as f:
+ self.assertEqual(f.read().splitlines(), [])
+
def test_write_text_zlib_file(self):
sink = fileio.NativeTextFileSink(
self.path, compression_type=fileio.CompressionTypes.ZLIB)