You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/11 18:24:00 UTC

[1/4] incubator-beam git commit: Closes #1007

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 240f8f673 -> eb57b974c


Closes #1007


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46110e4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46110e4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46110e4a

Branch: refs/heads/python-sdk
Commit: 46110e4accaf3ddad5d2f7b64da3f23092b112a9
Parents: 240f8f6 e223929
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Oct 11 11:15:07 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 11 11:15:07 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py      |  33 ++++--
 sdks/python/apache_beam/io/fileio_test.py | 156 ++++++++++++++++++++++++-
 2 files changed, 179 insertions(+), 10 deletions(-)
----------------------------------------------------------------------



[2/4] incubator-beam git commit: implement codreview feedback

Posted by ro...@apache.org.
implement codreview feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e223929c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e223929c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e223929c

Branch: refs/heads/python-sdk
Commit: e223929c68233853ea4d3b589078dbeaff0f6111
Parents: a9a00ba
Author: tim1357 <se...@gmail.com>
Authored: Wed Oct 5 13:36:09 2016 -0400
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 11 11:15:07 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py      | 10 +++---
 sdks/python/apache_beam/io/fileio_test.py | 42 +++++++++++++-------------
 2 files changed, 26 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e223929c/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 574295e..2670470 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -18,6 +18,7 @@
 
 from __future__ import absolute_import
 
+import bz2
 import glob
 import logging
 from multiprocessing.pool import ThreadPool
@@ -27,7 +28,6 @@ import shutil
 import threading
 import time
 import zlib
-import bz2
 import weakref
 
 from apache_beam import coders
@@ -97,7 +97,7 @@ class CompressionTypes(object):
     mime_types_by_compression_type = {
         cls.GZIP: 'application/x-gzip',
         cls.ZLIB: 'application/octet-stream',
-        cls.BZIP2: 'application/octet-stream'
+        cls.BZIP2: 'application/x-bz2',
     }
     return mime_types_by_compression_type.get(compression_type, default)
 
@@ -606,14 +606,14 @@ class _CompressedFile(object):
       if self._compression_type == CompressionTypes.BZIP2:
         self._decompressor = bz2.BZ2Decompressor()
       else:
-        self._decompressor = zlib.decompressobj(\
-                            self._type_mask[compression_type])
+        self._decompressor = zlib.decompressobj(
+            self._type_mask[compression_type])
     else:
       self._decompressor = None
 
     if self._writeable():
       if self._compression_type == CompressionTypes.BZIP2:
-        self._compressor = bz2.BZ2Compressor(9)
+        self._compressor = bz2.BZ2Compressor()
       else:
         self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
                                             zlib.DEFLATED,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e223929c/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 1ff9acd..42ea9ff 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -18,6 +18,7 @@
 
 """Unit tests for local and GCS sources and sinks."""
 
+import bz2
 import glob
 import gzip
 import logging
@@ -25,7 +26,6 @@ import os
 import tempfile
 import unittest
 import zlib
-import bz2
 
 import apache_beam as beam
 from apache_beam import coders
@@ -150,10 +150,10 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, lines)
 
-  def test_read_entire_file_bzip(self):
+  def test_read_entire_file_bzip2(self):
     lines = ['First', 'Second', 'Third']
-    compressor = bz2.BZ2Compressor(8)
-    data = compressor.compress('\n'.join(lines)) +compressor.flush()
+    compressor = bz2.BZ2Compressor()
+    data = compressor.compress('\n'.join(lines)) + compressor.flush()
     source = fileio.TextFileSource(
         file_path=self.create_temp_file(data),
         compression_type=fileio.CompressionTypes.BZIP2)
@@ -163,9 +163,9 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, lines)
 
-  def test_read_entire_file_bzip_auto(self):
+  def test_read_entire_file_bzip2_auto(self):
     lines = ['First', 'Second', 'Third']
-    compressor = bz2.BZ2Compressor(8)
+    compressor = bz2.BZ2Compressor()
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
     source = fileio.TextFileSource(file_path=self.create_temp_file(
         data, suffix='.bz2'))
@@ -175,8 +175,8 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, lines)
 
-  def test_read_entire_file_bzip_empty(self):
-    compressor = bz2.BZ2Compressor(8)
+  def test_read_entire_file_bzip2_empty(self):
+    compressor = bz2.BZ2Compressor()
     data = compressor.compress('') + compressor.flush()
     source = fileio.TextFileSource(
         file_path=self.create_temp_file(data),
@@ -187,9 +187,9 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, [])
 
-  def test_read_entire_file_bzip_large(self):
+  def test_read_entire_file_bzip2_large(self):
     lines = ['Line %d' % d for d in range(10 * 1000)]
-    compressor = bz2.BZ2Compressor(8)
+    compressor = bz2.BZ2Compressor()
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
     source = fileio.TextFileSource(
         file_path=self.create_temp_file(data),
@@ -264,9 +264,9 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, [])
 
-  def test_skip_entire_file_bzip(self):
+  def test_skip_entire_file_bzip2(self):
     lines = ['First', 'Second', 'Third']
-    compressor = bz2.BZ2Compressor(8)
+    compressor = bz2.BZ2Compressor()
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
     source = fileio.TextFileSource(
         file_path=self.create_temp_file(data),
@@ -306,9 +306,9 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, lines)
 
-  def test_consume_entire_file_bzip(self):
+  def test_consume_entire_file_bzip2(self):
     lines = ['First', 'Second', 'Third']
-    compressor = bz2.BZ2Compressor(8)
+    compressor = bz2.BZ2Compressor()
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
     source = fileio.TextFileSource(
         file_path=self.create_temp_file(data),
@@ -368,9 +368,9 @@ class TestTextFileSource(unittest.TestCase):
     self.assertEqual(len(progress_record), 3)
     self.assertEqual(progress_record, [0, 6, 13])
 
-  def test_progress_entire_file_bzip(self):
+  def test_progress_entire_file_bzip2(self):
     lines = ['First', 'Second', 'Third']
-    compressor = bz2.BZ2Compressor(8)
+    compressor = bz2.BZ2Compressor()
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
     source = fileio.TextFileSource(
         file_path=self.create_temp_file(data),
@@ -451,9 +451,9 @@ class TestTextFileSource(unittest.TestCase):
                 dataflow_io.ReaderProgress(percent_complete=percent_complete)),
             None)
 
-  def test_bzip_file_unsplittable(self):
+  def test_bzip2_file_unsplittable(self):
     lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
-    compressor = bz2.BZ2Compressor(8)
+    compressor = bz2.BZ2Compressor()
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
     source = fileio.TextFileSource(
         file_path=self.create_temp_file(data),
@@ -787,7 +787,7 @@ class TestNativeTextFileSink(unittest.TestCase):
     with gzip.GzipFile(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), [])
 
-  def test_write_text_bzip_file(self):
+  def test_write_text_bzip2_file(self):
     sink = fileio.NativeTextFileSink(
         self.path, compression_type=fileio.CompressionTypes.BZIP2)
     self._write_lines(sink, self.lines)
@@ -795,7 +795,7 @@ class TestNativeTextFileSink(unittest.TestCase):
     with bz2.BZ2File(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), self.lines)
 
-  def test_write_text_bzip_file_auto(self):
+  def test_write_text_bzip2_file_auto(self):
     self.path = tempfile.NamedTemporaryFile(suffix='.bz2').name
     sink = fileio.NativeTextFileSink(self.path)
     self._write_lines(sink, self.lines)
@@ -803,7 +803,7 @@ class TestNativeTextFileSink(unittest.TestCase):
     with bz2.BZ2File(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), self.lines)
 
-  def test_write_text_bzip_file_empty(self):
+  def test_write_text_bzip2_file_empty(self):
     sink = fileio.NativeTextFileSink(
         self.path, compression_type=fileio.CompressionTypes.BZIP2)
     self._write_lines(sink, [])


[4/4] incubator-beam git commit: Post-merge fixup.

Posted by ro...@apache.org.
Post-merge fixup.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eb57b974
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eb57b974
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eb57b974

Branch: refs/heads/python-sdk
Commit: eb57b974c1ca9d231e614debaf40f686da9e4e02
Parents: 46110e4
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Oct 11 11:23:27 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 11 11:23:27 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio_test.py | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb57b974/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 42ea9ff..45435e6 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -466,8 +466,8 @@ class TestTextFileSource(unittest.TestCase):
       for percent_complete in percents_complete:
         self.try_splitting_reader_at(
             reader,
-            iobase.DynamicSplitRequest(
-                iobase.ReaderProgress(percent_complete=percent_complete)),
+            dataflow_io.DynamicSplitRequest(
+                dataflow_io.ReaderProgress(percent_complete=percent_complete)),
             None)
 
       # Cursor passed beginning of file.
@@ -477,8 +477,8 @@ class TestTextFileSource(unittest.TestCase):
       for percent_complete in percents_complete:
         self.try_splitting_reader_at(
             reader,
-            iobase.DynamicSplitRequest(
-                iobase.ReaderProgress(percent_complete=percent_complete)),
+            dataflow_io.DynamicSplitRequest(
+                dataflow_io.ReaderProgress(percent_complete=percent_complete)),
             None)
 
   def test_zlib_file_unsplittable(self):


[3/4] incubator-beam git commit: Add support for bz2 compression

Posted by ro...@apache.org.
Add support for bz2 compression


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a9a00bae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a9a00bae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a9a00bae

Branch: refs/heads/python-sdk
Commit: a9a00bae2f25237ffb58049d5ea72f532da24d78
Parents: 240f8f6
Author: tim1357 <se...@gmail.com>
Authored: Mon Sep 26 15:15:51 2016 -0400
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 11 11:15:07 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py      |  33 ++++--
 sdks/python/apache_beam/io/fileio_test.py | 156 ++++++++++++++++++++++++-
 2 files changed, 179 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a9a00bae/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index c248f12..574295e 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -27,6 +27,7 @@ import shutil
 import threading
 import time
 import zlib
+import bz2
 import weakref
 
 from apache_beam import coders
@@ -76,6 +77,9 @@ class CompressionTypes(object):
   # ZLIB compression (deflate with ZLIB headers).
   ZLIB = _CompressionType('zlib')
 
+  # BZIP2 compression  (deflate with BZIP2 headers)
+  BZIP2 = _CompressionType('bz2')
+
   # Uncompressed (i.e., may be split).
   UNCOMPRESSED = _CompressionType('uncompressed')
 
@@ -92,14 +96,17 @@ class CompressionTypes(object):
   def mime_type(cls, compression_type, default='application/octet-stream'):
     mime_types_by_compression_type = {
         cls.GZIP: 'application/x-gzip',
-        cls.ZLIB: 'application/octet-stream'
+        cls.ZLIB: 'application/octet-stream',
+        cls.BZIP2: 'application/octet-stream'
     }
     return mime_types_by_compression_type.get(compression_type, default)
 
   @classmethod
   def detect_compression_type(cls, file_path):
     """Returns the compression type of a file (based on its suffix)"""
-    compression_types_by_suffix = {'.gz': cls.GZIP, '.z': cls.ZLIB}
+    compression_types_by_suffix = {'.gz': cls.GZIP,
+                                   '.z': cls.ZLIB,
+                                   '.bz2': cls.BZIP2}
     lowercased_path = file_path.lower()
     for suffix, compression_type in compression_types_by_suffix.iteritems():
       if lowercased_path.endswith(suffix):
@@ -596,14 +603,21 @@ class _CompressedFile(object):
     self._compression_type = compression_type
 
     if self._readable():
-      self._decompressor = zlib.decompressobj(self._type_mask[compression_type])
+      if self._compression_type == CompressionTypes.BZIP2:
+        self._decompressor = bz2.BZ2Decompressor()
+      else:
+        self._decompressor = zlib.decompressobj(\
+                            self._type_mask[compression_type])
     else:
       self._decompressor = None
 
     if self._writeable():
-      self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
-                                          zlib.DEFLATED,
-                                          self._type_mask[compression_type])
+      if self._compression_type == CompressionTypes.BZIP2:
+        self._compressor = bz2.BZ2Compressor(9)
+      else:
+        self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
+                                            zlib.DEFLATED,
+                                            self._type_mask[compression_type])
     else:
       self._compressor = None
 
@@ -638,10 +652,13 @@ class _CompressedFile(object):
       buf = self._file.read(self._read_size)
       if buf:
         self._data += self._decompressor.decompress(buf)
-      else:
-        # EOF reached, flush.
+      elif self._compression_type != CompressionTypes.BZIP2:
+        # EOF reached, flush. BZIP2 does not have this flush method,
+        # because of it's block nature
         self._data += self._decompressor.flush()
         return
+      else:
+        return
 
   def _read_from_internal_buffer(self, num_bytes):
     """Read up to num_bytes from the internal buffer."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a9a00bae/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index b518b97..1ff9acd 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -25,6 +25,7 @@ import os
 import tempfile
 import unittest
 import zlib
+import bz2
 
 import apache_beam as beam
 from apache_beam import coders
@@ -149,6 +150,56 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, lines)
 
+  def test_read_entire_file_bzip(self):
+    lines = ['First', 'Second', 'Third']
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) +compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        compression_type=fileio.CompressionTypes.BZIP2)
+    read_lines = []
+    with source.reader() as reader:
+      for line in reader:
+        read_lines.append(line)
+    self.assertEqual(read_lines, lines)
+
+  def test_read_entire_file_bzip_auto(self):
+    lines = ['First', 'Second', 'Third']
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) + compressor.flush()
+    source = fileio.TextFileSource(file_path=self.create_temp_file(
+        data, suffix='.bz2'))
+    read_lines = []
+    with source.reader() as reader:
+      for line in reader:
+        read_lines.append(line)
+    self.assertEqual(read_lines, lines)
+
+  def test_read_entire_file_bzip_empty(self):
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('') + compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        compression_type=fileio.CompressionTypes.BZIP2)
+    read_lines = []
+    with source.reader() as reader:
+      for line in reader:
+        read_lines.append(line)
+    self.assertEqual(read_lines, [])
+
+  def test_read_entire_file_bzip_large(self):
+    lines = ['Line %d' % d for d in range(10 * 1000)]
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) + compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        compression_type=fileio.CompressionTypes.BZIP2)
+    read_lines = []
+    with source.reader() as reader:
+      for line in reader:
+        read_lines.append(line)
+    self.assertEqual(read_lines, lines)
+
   def test_read_entire_file_zlib(self):
     lines = ['First', 'Second', 'Third']
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -199,7 +250,7 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, lines)
 
-  def test_skip_entire_file_gzip(self):
+  def test_skip_entire_file_zlib(self):
     lines = ['First', 'Second', 'Third']
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
@@ -213,7 +264,21 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, [])
 
-  def test_skip_entire_file_zlib(self):
+  def test_skip_entire_file_bzip(self):
+    lines = ['First', 'Second', 'Third']
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) + compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        start_offset=1,  # Anything other than 0 should lead to a null-read.
+        compression_type=fileio.CompressionTypes.BZIP2)
+    read_lines = []
+    with source.reader() as reader:
+      for line in reader:
+        read_lines.append(line)
+    self.assertEqual(read_lines, [])
+
+  def test_skip_entire_file_gzip(self):
     lines = ['First', 'Second', 'Third']
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
@@ -241,6 +306,20 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, lines)
 
+  def test_consume_entire_file_bzip(self):
+    lines = ['First', 'Second', 'Third']
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) + compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        end_offset=1,  # Any end_offset should effectively be ignored.
+        compression_type=fileio.CompressionTypes.BZIP2)
+    read_lines = []
+    with source.reader() as reader:
+      for line in reader:
+        read_lines.append(line)
+    self.assertEqual(read_lines, lines)
+
   def test_consume_entire_file_zlib(self):
     lines = ['First', 'Second', 'Third']
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -289,6 +368,25 @@ class TestTextFileSource(unittest.TestCase):
     self.assertEqual(len(progress_record), 3)
     self.assertEqual(progress_record, [0, 6, 13])
 
+  def test_progress_entire_file_bzip(self):
+    lines = ['First', 'Second', 'Third']
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) + compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        compression_type=fileio.CompressionTypes.BZIP2)
+    progress_record = []
+    with source.reader() as reader:
+      self.assertEqual(-1, reader.get_progress().position.byte_offset)
+      for line in reader:
+        self.assertIsNotNone(line)
+        progress_record.append(reader.get_progress().position.byte_offset)
+      self.assertEqual(18,  # Reading the entire contents before we decide EOF.
+                       reader.get_progress().position.byte_offset)
+
+    self.assertEqual(len(progress_record), 3)
+    self.assertEqual(progress_record, [0, 6, 13])
+
   def test_progress_entire_file_zlib(self):
     lines = ['First', 'Second', 'Third']
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -353,6 +451,36 @@ class TestTextFileSource(unittest.TestCase):
                 dataflow_io.ReaderProgress(percent_complete=percent_complete)),
             None)
 
+  def test_bzip_file_unsplittable(self):
+    lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) + compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        compression_type=fileio.CompressionTypes.BZIP2)
+
+    with source.reader() as reader:
+      percents_complete = [x / 100.0 for x in range(101)]
+
+      # Cursor at beginning of file.
+      for percent_complete in percents_complete:
+        self.try_splitting_reader_at(
+            reader,
+            iobase.DynamicSplitRequest(
+                iobase.ReaderProgress(percent_complete=percent_complete)),
+            None)
+
+      # Cursor passed beginning of file.
+      reader_iter = iter(reader)
+      next(reader_iter)
+      next(reader_iter)
+      for percent_complete in percents_complete:
+        self.try_splitting_reader_at(
+            reader,
+            iobase.DynamicSplitRequest(
+                iobase.ReaderProgress(percent_complete=percent_complete)),
+            None)
+
   def test_zlib_file_unsplittable(self):
     lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -659,6 +787,30 @@ class TestNativeTextFileSink(unittest.TestCase):
     with gzip.GzipFile(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), [])
 
+  def test_write_text_bzip_file(self):
+    sink = fileio.NativeTextFileSink(
+        self.path, compression_type=fileio.CompressionTypes.BZIP2)
+    self._write_lines(sink, self.lines)
+
+    with bz2.BZ2File(self.path, 'r') as f:
+      self.assertEqual(f.read().splitlines(), self.lines)
+
+  def test_write_text_bzip_file_auto(self):
+    self.path = tempfile.NamedTemporaryFile(suffix='.bz2').name
+    sink = fileio.NativeTextFileSink(self.path)
+    self._write_lines(sink, self.lines)
+
+    with bz2.BZ2File(self.path, 'r') as f:
+      self.assertEqual(f.read().splitlines(), self.lines)
+
+  def test_write_text_bzip_file_empty(self):
+    sink = fileio.NativeTextFileSink(
+        self.path, compression_type=fileio.CompressionTypes.BZIP2)
+    self._write_lines(sink, [])
+
+    with bz2.BZ2File(self.path, 'r') as f:
+      self.assertEqual(f.read().splitlines(), [])
+
   def test_write_text_zlib_file(self):
     sink = fileio.NativeTextFileSink(
         self.path, compression_type=fileio.CompressionTypes.ZLIB)