You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/11 18:24:02 UTC

[3/4] incubator-beam git commit: Add support for bz2 compression

Add support for bz2 compression


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a9a00bae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a9a00bae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a9a00bae

Branch: refs/heads/python-sdk
Commit: a9a00bae2f25237ffb58049d5ea72f532da24d78
Parents: 240f8f6
Author: tim1357 <se...@gmail.com>
Authored: Mon Sep 26 15:15:51 2016 -0400
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Oct 11 11:15:07 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py      |  33 ++++--
 sdks/python/apache_beam/io/fileio_test.py | 156 ++++++++++++++++++++++++-
 2 files changed, 179 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a9a00bae/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index c248f12..574295e 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -27,6 +27,7 @@ import shutil
 import threading
 import time
 import zlib
+import bz2
 import weakref
 
 from apache_beam import coders
@@ -76,6 +77,9 @@ class CompressionTypes(object):
   # ZLIB compression (deflate with ZLIB headers).
   ZLIB = _CompressionType('zlib')
 
+  # BZIP2 compression  (deflate with BZIP2 headers)
+  BZIP2 = _CompressionType('bz2')
+
   # Uncompressed (i.e., may be split).
   UNCOMPRESSED = _CompressionType('uncompressed')
 
@@ -92,14 +96,17 @@ class CompressionTypes(object):
   def mime_type(cls, compression_type, default='application/octet-stream'):
     mime_types_by_compression_type = {
         cls.GZIP: 'application/x-gzip',
-        cls.ZLIB: 'application/octet-stream'
+        cls.ZLIB: 'application/octet-stream',
+        cls.BZIP2: 'application/octet-stream'
     }
     return mime_types_by_compression_type.get(compression_type, default)
 
   @classmethod
   def detect_compression_type(cls, file_path):
     """Returns the compression type of a file (based on its suffix)"""
-    compression_types_by_suffix = {'.gz': cls.GZIP, '.z': cls.ZLIB}
+    compression_types_by_suffix = {'.gz': cls.GZIP,
+                                   '.z': cls.ZLIB,
+                                   '.bz2': cls.BZIP2}
     lowercased_path = file_path.lower()
     for suffix, compression_type in compression_types_by_suffix.iteritems():
       if lowercased_path.endswith(suffix):
@@ -596,14 +603,21 @@ class _CompressedFile(object):
     self._compression_type = compression_type
 
     if self._readable():
-      self._decompressor = zlib.decompressobj(self._type_mask[compression_type])
+      if self._compression_type == CompressionTypes.BZIP2:
+        self._decompressor = bz2.BZ2Decompressor()
+      else:
+        self._decompressor = zlib.decompressobj(\
+                            self._type_mask[compression_type])
     else:
       self._decompressor = None
 
     if self._writeable():
-      self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
-                                          zlib.DEFLATED,
-                                          self._type_mask[compression_type])
+      if self._compression_type == CompressionTypes.BZIP2:
+        self._compressor = bz2.BZ2Compressor(9)
+      else:
+        self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
+                                            zlib.DEFLATED,
+                                            self._type_mask[compression_type])
     else:
       self._compressor = None
 
@@ -638,10 +652,13 @@ class _CompressedFile(object):
       buf = self._file.read(self._read_size)
       if buf:
         self._data += self._decompressor.decompress(buf)
-      else:
-        # EOF reached, flush.
+      elif self._compression_type != CompressionTypes.BZIP2:
+        # EOF reached, flush. BZIP2 does not have this flush method,
+        # because of it's block nature
         self._data += self._decompressor.flush()
         return
+      else:
+        return
 
   def _read_from_internal_buffer(self, num_bytes):
     """Read up to num_bytes from the internal buffer."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a9a00bae/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index b518b97..1ff9acd 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -25,6 +25,7 @@ import os
 import tempfile
 import unittest
 import zlib
+import bz2
 
 import apache_beam as beam
 from apache_beam import coders
@@ -149,6 +150,56 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, lines)
 
+  def test_read_entire_file_bzip(self):
+    lines = ['First', 'Second', 'Third']
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) +compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        compression_type=fileio.CompressionTypes.BZIP2)
+    read_lines = []
+    with source.reader() as reader:
+      for line in reader:
+        read_lines.append(line)
+    self.assertEqual(read_lines, lines)
+
+  def test_read_entire_file_bzip_auto(self):
+    lines = ['First', 'Second', 'Third']
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) + compressor.flush()
+    source = fileio.TextFileSource(file_path=self.create_temp_file(
+        data, suffix='.bz2'))
+    read_lines = []
+    with source.reader() as reader:
+      for line in reader:
+        read_lines.append(line)
+    self.assertEqual(read_lines, lines)
+
+  def test_read_entire_file_bzip_empty(self):
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('') + compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        compression_type=fileio.CompressionTypes.BZIP2)
+    read_lines = []
+    with source.reader() as reader:
+      for line in reader:
+        read_lines.append(line)
+    self.assertEqual(read_lines, [])
+
+  def test_read_entire_file_bzip_large(self):
+    lines = ['Line %d' % d for d in range(10 * 1000)]
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) + compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        compression_type=fileio.CompressionTypes.BZIP2)
+    read_lines = []
+    with source.reader() as reader:
+      for line in reader:
+        read_lines.append(line)
+    self.assertEqual(read_lines, lines)
+
   def test_read_entire_file_zlib(self):
     lines = ['First', 'Second', 'Third']
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -199,7 +250,7 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, lines)
 
-  def test_skip_entire_file_gzip(self):
+  def test_skip_entire_file_zlib(self):
     lines = ['First', 'Second', 'Third']
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
@@ -213,7 +264,21 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, [])
 
-  def test_skip_entire_file_zlib(self):
+  def test_skip_entire_file_bzip(self):
+    lines = ['First', 'Second', 'Third']
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) + compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        start_offset=1,  # Anything other than 0 should lead to a null-read.
+        compression_type=fileio.CompressionTypes.BZIP2)
+    read_lines = []
+    with source.reader() as reader:
+      for line in reader:
+        read_lines.append(line)
+    self.assertEqual(read_lines, [])
+
+  def test_skip_entire_file_gzip(self):
     lines = ['First', 'Second', 'Third']
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
@@ -241,6 +306,20 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, lines)
 
+  def test_consume_entire_file_bzip(self):
+    lines = ['First', 'Second', 'Third']
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) + compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        end_offset=1,  # Any end_offset should effectively be ignored.
+        compression_type=fileio.CompressionTypes.BZIP2)
+    read_lines = []
+    with source.reader() as reader:
+      for line in reader:
+        read_lines.append(line)
+    self.assertEqual(read_lines, lines)
+
   def test_consume_entire_file_zlib(self):
     lines = ['First', 'Second', 'Third']
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -289,6 +368,25 @@ class TestTextFileSource(unittest.TestCase):
     self.assertEqual(len(progress_record), 3)
     self.assertEqual(progress_record, [0, 6, 13])
 
+  def test_progress_entire_file_bzip(self):
+    lines = ['First', 'Second', 'Third']
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) + compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        compression_type=fileio.CompressionTypes.BZIP2)
+    progress_record = []
+    with source.reader() as reader:
+      self.assertEqual(-1, reader.get_progress().position.byte_offset)
+      for line in reader:
+        self.assertIsNotNone(line)
+        progress_record.append(reader.get_progress().position.byte_offset)
+      self.assertEqual(18,  # Reading the entire contents before we decide EOF.
+                       reader.get_progress().position.byte_offset)
+
+    self.assertEqual(len(progress_record), 3)
+    self.assertEqual(progress_record, [0, 6, 13])
+
   def test_progress_entire_file_zlib(self):
     lines = ['First', 'Second', 'Third']
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -353,6 +451,36 @@ class TestTextFileSource(unittest.TestCase):
                 dataflow_io.ReaderProgress(percent_complete=percent_complete)),
             None)
 
+  def test_bzip_file_unsplittable(self):
+    lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
+    compressor = bz2.BZ2Compressor(8)
+    data = compressor.compress('\n'.join(lines)) + compressor.flush()
+    source = fileio.TextFileSource(
+        file_path=self.create_temp_file(data),
+        compression_type=fileio.CompressionTypes.BZIP2)
+
+    with source.reader() as reader:
+      percents_complete = [x / 100.0 for x in range(101)]
+
+      # Cursor at beginning of file.
+      for percent_complete in percents_complete:
+        self.try_splitting_reader_at(
+            reader,
+            iobase.DynamicSplitRequest(
+                iobase.ReaderProgress(percent_complete=percent_complete)),
+            None)
+
+      # Cursor passed beginning of file.
+      reader_iter = iter(reader)
+      next(reader_iter)
+      next(reader_iter)
+      for percent_complete in percents_complete:
+        self.try_splitting_reader_at(
+            reader,
+            iobase.DynamicSplitRequest(
+                iobase.ReaderProgress(percent_complete=percent_complete)),
+            None)
+
   def test_zlib_file_unsplittable(self):
     lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
@@ -659,6 +787,30 @@ class TestNativeTextFileSink(unittest.TestCase):
     with gzip.GzipFile(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), [])
 
+  def test_write_text_bzip_file(self):
+    sink = fileio.NativeTextFileSink(
+        self.path, compression_type=fileio.CompressionTypes.BZIP2)
+    self._write_lines(sink, self.lines)
+
+    with bz2.BZ2File(self.path, 'r') as f:
+      self.assertEqual(f.read().splitlines(), self.lines)
+
+  def test_write_text_bzip_file_auto(self):
+    self.path = tempfile.NamedTemporaryFile(suffix='.bz2').name
+    sink = fileio.NativeTextFileSink(self.path)
+    self._write_lines(sink, self.lines)
+
+    with bz2.BZ2File(self.path, 'r') as f:
+      self.assertEqual(f.read().splitlines(), self.lines)
+
+  def test_write_text_bzip_file_empty(self):
+    sink = fileio.NativeTextFileSink(
+        self.path, compression_type=fileio.CompressionTypes.BZIP2)
+    self._write_lines(sink, [])
+
+    with bz2.BZ2File(self.path, 'r') as f:
+      self.assertEqual(f.read().splitlines(), [])
+
   def test_write_text_zlib_file(self):
     sink = fileio.NativeTextFileSink(
         self.path, compression_type=fileio.CompressionTypes.ZLIB)