You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/07 21:23:28 UTC

[1/2] incubator-beam git commit: Add support for ZLIB and DEFLATE compression

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 9bc04b750 -> a580b31ac


Add support for ZLIB and DEFLATE compression

Code originally contributed by Slaven Bilac.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e1b3ac30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e1b3ac30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e1b3ac30

Branch: refs/heads/python-sdk
Commit: e1b3ac30b5dd5b14058247fad73b6b235e618094
Parents: 9bc04b7
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 7 13:40:18 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 7 13:40:18 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py      | 168 +++++++++++++++++++++----
 sdks/python/apache_beam/io/fileio_test.py |  30 ++++-
 2 files changed, 173 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1b3ac30/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 6475a34..31b6a93 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -20,7 +20,6 @@
 from __future__ import absolute_import
 
 import glob
-import gzip
 import logging
 from multiprocessing.pool import ThreadPool
 import os
@@ -28,6 +27,7 @@ import re
 import shutil
 import tempfile
 import time
+import zlib
 
 from apache_beam import coders
 from apache_beam.io import iobase
@@ -269,13 +269,129 @@ class _CompressionType(object):
 class CompressionTypes(object):
   """Enum-like class representing known compression types."""
   NO_COMPRESSION = _CompressionType(1)  # No compression.
-  DEFLATE = _CompressionType(2)  # 'Deflate' ie gzip compression.
+  DEFLATE = _CompressionType(2)  # 'Deflate' compression (without headers).
+  GZIP = _CompressionType(3)  # gzip compression (deflate with gzip headers).
+  ZLIB = _CompressionType(4)  # zlib compression (deflate with zlib headers).
 
   @staticmethod
-  def valid_compression_type(compression_type):
+  def is_valid_compression_type(compression_type):
     """Returns true for valid compression types, false otherwise."""
     return isinstance(compression_type, _CompressionType)
 
+  @staticmethod
+  def mime_type(compression_type, default='application/octet-stream'):
+    if compression_type == CompressionTypes.GZIP:
+      return 'application/x-gzip'
+    elif compression_type == CompressionTypes.ZLIB:
+      return 'application/octet-stream'
+    elif compression_type == CompressionTypes.DEFLATE:
+      return 'application/octet-stream'
+    else:
+      return default
+
+
+class _CompressedFile(object):
+  """Somewhat limited file wrapper for easier handling of compressed files."""
+  _type_mask = {
+      CompressionTypes.ZLIB:  zlib.MAX_WBITS,
+      CompressionTypes.GZIP: zlib.MAX_WBITS | 16,
+      CompressionTypes.DEFLATE: -zlib.MAX_WBITS,
+  }
+
+  def __init__(self,
+               fileobj=None,
+               compression_type=CompressionTypes.ZLIB,
+               read_size=16384):
+    self._validate_compression_type(compression_type)
+    if not fileobj:
+      raise ValueError('fileobj must be opened file but was %s' % fileobj)
+
+    self.fileobj = fileobj
+    self.data = ''
+    self.read_size = read_size
+    self.compression_type = compression_type
+    if self._readable():
+      self.decompressor = self._create_decompressor(self.compression_type)
+    else:
+      self.decompressor = None
+    if self._writeable():
+      self.compressor = self._create_compressor(self.compression_type)
+    else:
+      self.compressor = None
+
+  def _validate_compression_type(self, compression_type):
+    if not CompressionTypes.is_valid_compression_type(compression_type):
+      raise TypeError('compression_type must be CompressionType object but '
+                      'was %s' % type(compression_type))
+    if compression_type == CompressionTypes.NO_COMPRESSION:
+      raise ValueError('cannot create object with no compression')
+
+  def _create_compressor(self, compression_type):
+    self._validate_compression_type(compression_type)
+    return zlib.compressobj(9, zlib.DEFLATED,
+                            self._type_mask[compression_type])
+
+  def _create_decompressor(self, compression_type):
+    self._validate_compression_type(compression_type)
+    return zlib.decompressobj(self._type_mask[compression_type])
+
+  def _readable(self):
+    mode = self.fileobj.mode
+    return 'r' in mode or 'a' in mode
+
+  def _writeable(self):
+    mode = self.fileobj.mode
+    return 'w' in mode or 'a' in mode
+
+  def write(self, data):
+    """Write data to file."""
+    if not self.compressor:
+      raise ValueError('compressor not initialized')
+    compressed = self.compressor.compress(data)
+    if compressed:
+      self.fileobj.write(compressed)
+
+  def _read(self, num_bytes):
+    """Read num_bytes into internal buffer."""
+    while not num_bytes or len(self.data) < num_bytes:
+      buf = self.fileobj.read(self.read_size)
+      if not buf:
+        # EOF reached, flush.
+        self.data += self.decompressor.flush()
+        break
+
+      self.data += self.decompressor.decompress(buf)
+    result = self.data[:num_bytes]
+    self.data = self.data[num_bytes:]
+    return result
+
+  def read(self, num_bytes):
+    if not self.decompressor:
+      raise ValueError('decompressor not initialized')
+    return self._read(num_bytes)
+
+  @property
+  def closed(self):
+    return not self.fileobj or self.fileobj.closed()
+
+  def close(self):
+    if self.fileobj is None:
+      return
+
+    if self._writeable():
+      self.fileobj.write(self.compressor.flush())
+    self.fileobj.close()
+
+  def flush(self):
+    if self._writeable():
+      self.fileobj.write(self.compressor.flush())
+    self.fileobj.flush()
+
+  # TODO(slaven): Add support for seeking to a file position.
+  @property
+  def seekable(self):
+    return False
+
 
 class FileSink(iobase.Sink):
   """A sink to a GCS or local files.
@@ -302,7 +418,8 @@ class FileSink(iobase.Sink):
                file_name_suffix='',
                num_shards=0,
                shard_name_template=None,
-               mime_type='application/octet-stream'):
+               mime_type='application/octet-stream',
+               compression_type=CompressionTypes.NO_COMPRESSION):
     if shard_name_template is None:
       shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
     elif shard_name_template is '':
@@ -311,8 +428,12 @@ class FileSink(iobase.Sink):
     self.file_name_suffix = file_name_suffix
     self.num_shards = num_shards
     self.coder = coder
-    self.mime_type = mime_type
     self.shard_name_format = self._template_to_format(shard_name_template)
+    if not CompressionTypes.is_valid_compression_type(compression_type):
+      raise TypeError('compression_type must be CompressionType object but '
+                      'was %s' % type(compression_type))
+    self.compression_type = compression_type
+    self.mime_type = CompressionTypes.mime_type(compression_type, mime_type)
 
   def open(self, temp_path):
     """Opens ``temp_path``, returning an opaque file handle object.
@@ -320,7 +441,12 @@ class FileSink(iobase.Sink):
     The returned file handle is passed to ``write_[encoded_]record`` and
     ``close``.
     """
-    return ChannelFactory.open(temp_path, 'wb', self.mime_type)
+    raw_file = ChannelFactory.open(temp_path, 'wb', self.mime_type)
+    if self.compression_type == CompressionTypes.NO_COMPRESSION:
+      return raw_file
+    else:
+      return _CompressedFile(fileobj=raw_file,
+                             compression_type=self.compression_type)
 
   def write_record(self, file_handle, value):
     """Writes a single record go the file handle returned by ``open()``.
@@ -508,36 +634,34 @@ class TextFileSink(FileSink):
           'TextFileSink: file_name_suffix must be a string; got %r instead' %
           file_name_suffix)
 
-    if not CompressionTypes.valid_compression_type(compression_type):
-      raise TypeError('compression_type must be CompressionType object but '
-                      'was %s' % type(compression_type))
-    if compression_type == CompressionTypes.DEFLATE:
-      mime_type = 'application/x-gzip'
-    else:
-      mime_type = 'text/plain'
-
     super(TextFileSink, self).__init__(file_path_prefix,
                                        file_name_suffix=file_name_suffix,
                                        num_shards=num_shards,
                                        shard_name_template=shard_name_template,
                                        coder=coder,
-                                       mime_type=mime_type)
+                                       mime_type='text/plain',
+                                       compression_type=compression_type)
 
     self.compression_type = compression_type
     self.append_trailing_newlines = append_trailing_newlines
 
-  def open(self, temp_path):
-    """Opens ''temp_path'', returning a writeable file object."""
-    fobj = ChannelFactory.open(temp_path, 'wb', self.mime_type)
-    if self.compression_type == CompressionTypes.DEFLATE:
-      return gzip.GzipFile(fileobj=fobj)
-    return fobj
-
   def write_encoded_record(self, file_handle, encoded_value):
+    """Writes a single encoded record."""
     file_handle.write(encoded_value)
     if self.append_trailing_newlines:
       file_handle.write('\n')
 
+  def close(self, file_handle):
+    """Finalize and close the file handle returned from ``open()``.
+
+    Args:
+      file_handle: file handle to be closed.
+    Raises:
+      ValueError: if file_handle is already closed.
+    """
+    if file_handle is not None:
+      file_handle.close()
+
 
 class NativeTextFileSink(iobase.NativeSink):
   """A sink to a GCS or local text file or files."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1b3ac30/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index b77e1c1..e71ba6d 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -24,6 +24,7 @@ import logging
 import os
 import tempfile
 import unittest
+import zlib
 
 import apache_beam as beam
 from apache_beam import coders
@@ -347,7 +348,7 @@ class NativeTestTextFileSink(unittest.TestCase):
       self.assertEqual(f.read().splitlines(), lines)
 
 
-class TestPureTextFileSink(unittest.TestCase):
+class TestTextFileSink(unittest.TestCase):
 
   def setUp(self):
     self.lines = ['Line %d' % d for d in range(100)]
@@ -366,14 +367,37 @@ class TestPureTextFileSink(unittest.TestCase):
     with open(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), self.lines)
 
+  def test_write_deflate_file(self):
+    sink = fileio.TextFileSink(self.path,
+                               compression_type=fileio.CompressionTypes.DEFLATE)
+    self._write_lines(sink, self.lines)
+
+    with open(self.path, 'r') as f:
+      content = f.read()
+      self.assertEqual(
+          zlib.decompress(content, -zlib.MAX_WBITS).splitlines(), self.lines)
+
   def test_write_gzip_file(self):
-    sink = fileio.TextFileSink(
-        self.path, compression_type=fileio.CompressionTypes.DEFLATE)
+    sink = fileio.TextFileSink(self.path,
+                               compression_type=fileio.CompressionTypes.GZIP)
     self._write_lines(sink, self.lines)
 
     with gzip.GzipFile(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), self.lines)
 
+  def test_write_zlib_file(self):
+    sink = fileio.TextFileSink(self.path,
+                               compression_type=fileio.CompressionTypes.ZLIB)
+    self._write_lines(sink, self.lines)
+
+    with open(self.path, 'r') as f:
+      content = f.read()
+      # Below decompress option should work for both zlib/gzip header
+      # auto detection.
+      self.assertEqual(
+          zlib.decompress(content, zlib.MAX_WBITS | 32).splitlines(),
+          self.lines)
+
 
 class MyFileSink(fileio.FileSink):
 


[2/2] incubator-beam git commit: Closes #604

Posted by ro...@apache.org.
Closes #604


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a580b31a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a580b31a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a580b31a

Branch: refs/heads/python-sdk
Commit: a580b31ac5480318e632c3d8e33ea0bcfe5177d3
Parents: 9bc04b7 e1b3ac3
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 7 14:22:22 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 7 14:22:22 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py      | 168 +++++++++++++++++++++----
 sdks/python/apache_beam/io/fileio_test.py |  30 ++++-
 2 files changed, 173 insertions(+), 25 deletions(-)
----------------------------------------------------------------------