You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/07 21:23:28 UTC
[1/2] incubator-beam git commit: Add support for ZLIB and DEFLATE
compression
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 9bc04b750 -> a580b31ac
Add support for ZLIB and DEFLATE compression
Code originally contributed by Slaven Bilac.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e1b3ac30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e1b3ac30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e1b3ac30
Branch: refs/heads/python-sdk
Commit: e1b3ac30b5dd5b14058247fad73b6b235e618094
Parents: 9bc04b7
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 7 13:40:18 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 7 13:40:18 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 168 +++++++++++++++++++++----
sdks/python/apache_beam/io/fileio_test.py | 30 ++++-
2 files changed, 173 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1b3ac30/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 6475a34..31b6a93 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -20,7 +20,6 @@
from __future__ import absolute_import
import glob
-import gzip
import logging
from multiprocessing.pool import ThreadPool
import os
@@ -28,6 +27,7 @@ import re
import shutil
import tempfile
import time
+import zlib
from apache_beam import coders
from apache_beam.io import iobase
@@ -269,13 +269,129 @@ class _CompressionType(object):
class CompressionTypes(object):
"""Enum-like class representing known compression types."""
NO_COMPRESSION = _CompressionType(1) # No compression.
- DEFLATE = _CompressionType(2) # 'Deflate' ie gzip compression.
+ DEFLATE = _CompressionType(2) # 'Deflate' compression (without headers).
+ GZIP = _CompressionType(3) # gzip compression (deflate with gzip headers).
+ ZLIB = _CompressionType(4) # zlib compression (deflate with zlib headers).
@staticmethod
- def valid_compression_type(compression_type):
+ def is_valid_compression_type(compression_type):
"""Returns true for valid compression types, false otherwise."""
return isinstance(compression_type, _CompressionType)
+ @staticmethod
+ def mime_type(compression_type, default='application/octet-stream'):
+ if compression_type == CompressionTypes.GZIP:
+ return 'application/x-gzip'
+ elif compression_type == CompressionTypes.ZLIB:
+ return 'application/octet-stream'
+ elif compression_type == CompressionTypes.DEFLATE:
+ return 'application/octet-stream'
+ else:
+ return default
+
+
+class _CompressedFile(object):
+ """Somewhat limited file wrapper for easier handling of compressed files."""
+ _type_mask = {
+ CompressionTypes.ZLIB: zlib.MAX_WBITS,
+ CompressionTypes.GZIP: zlib.MAX_WBITS | 16,
+ CompressionTypes.DEFLATE: -zlib.MAX_WBITS,
+ }
+
+ def __init__(self,
+ fileobj=None,
+ compression_type=CompressionTypes.ZLIB,
+ read_size=16384):
+ self._validate_compression_type(compression_type)
+ if not fileobj:
+ raise ValueError('fileobj must be opened file but was %s' % fileobj)
+
+ self.fileobj = fileobj
+ self.data = ''
+ self.read_size = read_size
+ self.compression_type = compression_type
+ if self._readable():
+ self.decompressor = self._create_decompressor(self.compression_type)
+ else:
+ self.decompressor = None
+ if self._writeable():
+ self.compressor = self._create_compressor(self.compression_type)
+ else:
+ self.compressor = None
+
+ def _validate_compression_type(self, compression_type):
+ if not CompressionTypes.is_valid_compression_type(compression_type):
+ raise TypeError('compression_type must be CompressionType object but '
+ 'was %s' % type(compression_type))
+ if compression_type == CompressionTypes.NO_COMPRESSION:
+ raise ValueError('cannot create object with no compression')
+
+ def _create_compressor(self, compression_type):
+ self._validate_compression_type(compression_type)
+ return zlib.compressobj(9, zlib.DEFLATED,
+ self._type_mask[compression_type])
+
+ def _create_decompressor(self, compression_type):
+ self._validate_compression_type(compression_type)
+ return zlib.decompressobj(self._type_mask[compression_type])
+
+ def _readable(self):
+ mode = self.fileobj.mode
+ return 'r' in mode or 'a' in mode
+
+ def _writeable(self):
+ mode = self.fileobj.mode
+ return 'w' in mode or 'a' in mode
+
+ def write(self, data):
+ """Write data to file."""
+ if not self.compressor:
+ raise ValueError('compressor not initialized')
+ compressed = self.compressor.compress(data)
+ if compressed:
+ self.fileobj.write(compressed)
+
+ def _read(self, num_bytes):
+ """Read num_bytes into internal buffer."""
+ while not num_bytes or len(self.data) < num_bytes:
+ buf = self.fileobj.read(self.read_size)
+ if not buf:
+ # EOF reached, flush.
+ self.data += self.decompressor.flush()
+ break
+
+ self.data += self.decompressor.decompress(buf)
+ result = self.data[:num_bytes]
+ self.data = self.data[num_bytes:]
+ return result
+
+ def read(self, num_bytes):
+ if not self.decompressor:
+ raise ValueError('decompressor not initialized')
+ return self._read(num_bytes)
+
+ @property
+ def closed(self):
+ return not self.fileobj or self.fileobj.closed()
+
+ def close(self):
+ if self.fileobj is None:
+ return
+
+ if self._writeable():
+ self.fileobj.write(self.compressor.flush())
+ self.fileobj.close()
+
+ def flush(self):
+ if self._writeable():
+ self.fileobj.write(self.compressor.flush())
+ self.fileobj.flush()
+
+ # TODO(slaven): Add support for seeking to a file position.
+ @property
+ def seekable(self):
+ return False
+
class FileSink(iobase.Sink):
"""A sink to a GCS or local files.
@@ -302,7 +418,8 @@ class FileSink(iobase.Sink):
file_name_suffix='',
num_shards=0,
shard_name_template=None,
- mime_type='application/octet-stream'):
+ mime_type='application/octet-stream',
+ compression_type=CompressionTypes.NO_COMPRESSION):
if shard_name_template is None:
shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
elif shard_name_template is '':
@@ -311,8 +428,12 @@ class FileSink(iobase.Sink):
self.file_name_suffix = file_name_suffix
self.num_shards = num_shards
self.coder = coder
- self.mime_type = mime_type
self.shard_name_format = self._template_to_format(shard_name_template)
+ if not CompressionTypes.is_valid_compression_type(compression_type):
+ raise TypeError('compression_type must be CompressionType object but '
+ 'was %s' % type(compression_type))
+ self.compression_type = compression_type
+ self.mime_type = CompressionTypes.mime_type(compression_type, mime_type)
def open(self, temp_path):
"""Opens ``temp_path``, returning an opaque file handle object.
@@ -320,7 +441,12 @@ class FileSink(iobase.Sink):
The returned file handle is passed to ``write_[encoded_]record`` and
``close``.
"""
- return ChannelFactory.open(temp_path, 'wb', self.mime_type)
+ raw_file = ChannelFactory.open(temp_path, 'wb', self.mime_type)
+ if self.compression_type == CompressionTypes.NO_COMPRESSION:
+ return raw_file
+ else:
+ return _CompressedFile(fileobj=raw_file,
+ compression_type=self.compression_type)
def write_record(self, file_handle, value):
"""Writes a single record go the file handle returned by ``open()``.
@@ -508,36 +634,34 @@ class TextFileSink(FileSink):
'TextFileSink: file_name_suffix must be a string; got %r instead' %
file_name_suffix)
- if not CompressionTypes.valid_compression_type(compression_type):
- raise TypeError('compression_type must be CompressionType object but '
- 'was %s' % type(compression_type))
- if compression_type == CompressionTypes.DEFLATE:
- mime_type = 'application/x-gzip'
- else:
- mime_type = 'text/plain'
-
super(TextFileSink, self).__init__(file_path_prefix,
file_name_suffix=file_name_suffix,
num_shards=num_shards,
shard_name_template=shard_name_template,
coder=coder,
- mime_type=mime_type)
+ mime_type='text/plain',
+ compression_type=compression_type)
self.compression_type = compression_type
self.append_trailing_newlines = append_trailing_newlines
- def open(self, temp_path):
- """Opens ''temp_path'', returning a writeable file object."""
- fobj = ChannelFactory.open(temp_path, 'wb', self.mime_type)
- if self.compression_type == CompressionTypes.DEFLATE:
- return gzip.GzipFile(fileobj=fobj)
- return fobj
-
def write_encoded_record(self, file_handle, encoded_value):
+ """Writes a single encoded record."""
file_handle.write(encoded_value)
if self.append_trailing_newlines:
file_handle.write('\n')
+ def close(self, file_handle):
+ """Finalize and close the file handle returned from ``open()``.
+
+ Args:
+ file_handle: file handle to be closed.
+ Raises:
+ ValueError: if file_handle is already closed.
+ """
+ if file_handle is not None:
+ file_handle.close()
+
class NativeTextFileSink(iobase.NativeSink):
"""A sink to a GCS or local text file or files."""
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1b3ac30/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index b77e1c1..e71ba6d 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -24,6 +24,7 @@ import logging
import os
import tempfile
import unittest
+import zlib
import apache_beam as beam
from apache_beam import coders
@@ -347,7 +348,7 @@ class NativeTestTextFileSink(unittest.TestCase):
self.assertEqual(f.read().splitlines(), lines)
-class TestPureTextFileSink(unittest.TestCase):
+class TestTextFileSink(unittest.TestCase):
def setUp(self):
self.lines = ['Line %d' % d for d in range(100)]
@@ -366,14 +367,37 @@ class TestPureTextFileSink(unittest.TestCase):
with open(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), self.lines)
+ def test_write_deflate_file(self):
+ sink = fileio.TextFileSink(self.path,
+ compression_type=fileio.CompressionTypes.DEFLATE)
+ self._write_lines(sink, self.lines)
+
+ with open(self.path, 'r') as f:
+ content = f.read()
+ self.assertEqual(
+ zlib.decompress(content, -zlib.MAX_WBITS).splitlines(), self.lines)
+
def test_write_gzip_file(self):
- sink = fileio.TextFileSink(
- self.path, compression_type=fileio.CompressionTypes.DEFLATE)
+ sink = fileio.TextFileSink(self.path,
+ compression_type=fileio.CompressionTypes.GZIP)
self._write_lines(sink, self.lines)
with gzip.GzipFile(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), self.lines)
+ def test_write_zlib_file(self):
+ sink = fileio.TextFileSink(self.path,
+ compression_type=fileio.CompressionTypes.ZLIB)
+ self._write_lines(sink, self.lines)
+
+ with open(self.path, 'r') as f:
+ content = f.read()
+ # Below decompress option should work for both zlib/gzip header
+ # auto detection.
+ self.assertEqual(
+ zlib.decompress(content, zlib.MAX_WBITS | 32).splitlines(),
+ self.lines)
+
class MyFileSink(fileio.FileSink):
[2/2] incubator-beam git commit: Closes #604
Posted by ro...@apache.org.
Closes #604
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a580b31a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a580b31a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a580b31a
Branch: refs/heads/python-sdk
Commit: a580b31ac5480318e632c3d8e33ea0bcfe5177d3
Parents: 9bc04b7 e1b3ac3
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 7 14:22:22 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 7 14:22:22 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 168 +++++++++++++++++++++----
sdks/python/apache_beam/io/fileio_test.py | 30 ++++-
2 files changed, 173 insertions(+), 25 deletions(-)
----------------------------------------------------------------------