You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/18 21:43:49 UTC
[1/2] beam git commit: Moving from a string-based buffer to a
cStringIO based on in order to help with performance of compressed sources.
Repository: beam
Updated Branches:
refs/heads/python-sdk 36a7d3491 -> 678da7a8b
Moving from a string-based buffer to a cStringIO based on in order to
help with performance of compressed sources.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee09668b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee09668b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee09668b
Branch: refs/heads/python-sdk
Commit: ee09668becaed21bfb15bac267a2df5e166bd6b9
Parents: 36a7d34
Author: Gus Katsiapis <ka...@katsiapis-linux.mtv.corp.google.com>
Authored: Fri Jan 13 18:28:12 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 13:43:20 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 124 ++++++++++++++++++------------
sdks/python/apache_beam/io/gcsio.py | 4 +-
2 files changed, 76 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ee09668b/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 6ea7844..ebc4fed 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -19,6 +19,7 @@
from __future__ import absolute_import
import bz2
+import cStringIO
import glob
import logging
from multiprocessing.pool import ThreadPool
@@ -68,6 +69,7 @@ class _CompressionType(object):
class CompressionTypes(object):
"""Enum-like class representing known compression types."""
+
# Detect compression based on filename extension.
#
# The following extensions are currently recognized by auto-detection:
@@ -642,9 +644,9 @@ class ChannelFactory(object):
class _CompressedFile(object):
"""Somewhat limited file wrapper for easier handling of compressed files."""
- # The bit mask to use for the wbits parameters of the GZIP compressor and
+ # The bit mask to use for the wbits parameters of the zlib compressor and
# decompressor objects.
- _gzip_mask = zlib.MAX_WBITS | 16
+ _gzip_mask = zlib.MAX_WBITS | 16 # Mask when using GZIP headers.
def __init__(self,
fileobj,
@@ -652,49 +654,47 @@ class _CompressedFile(object):
read_size=gcsio.DEFAULT_READ_BUFFER_SIZE):
if not fileobj:
raise ValueError('fileobj must be opened file but was %s' % fileobj)
- self._validate_compression_type(compression_type)
+
+ if not CompressionTypes.is_valid_compression_type(compression_type):
+ raise TypeError('compression_type must be CompressionType object but '
+ 'was %s' % type(compression_type))
+ if compression_type in (CompressionTypes.AUTO, CompressionTypes.UNCOMPRESSED
+ ):
+ raise ValueError(
+ 'Cannot create object with unspecified or no compression')
self._file = fileobj
- self._data = ''
- self._read_size = read_size
self._compression_type = compression_type
- if self._readable():
+ if self.readable():
+ self._read_size = read_size
+ self._read_buffer = cStringIO.StringIO()
+ self._read_position = 0
+ self._read_eof = False
+
if self._compression_type == CompressionTypes.BZIP2:
self._decompressor = bz2.BZ2Decompressor()
else:
+ assert self._compression_type == CompressionTypes.GZIP
self._decompressor = zlib.decompressobj(self._gzip_mask)
- self._read_eof = False
else:
self._decompressor = None
- if self._writeable():
+ if self.writeable():
if self._compression_type == CompressionTypes.BZIP2:
self._compressor = bz2.BZ2Compressor()
else:
+ assert self._compression_type == CompressionTypes.GZIP
self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED, self._gzip_mask)
else:
self._compressor = None
- def _validate_compression_type(self, compression_type):
- if not CompressionTypes.is_valid_compression_type(compression_type):
- raise TypeError('compression_type must be CompressionType object but '
- 'was %s' % type(compression_type))
- if compression_type in (CompressionTypes.AUTO, CompressionTypes.UNCOMPRESSED
- ):
- raise ValueError(
- 'Cannot create object with unspecified or no compression')
- if compression_type not in (CompressionTypes.BZIP2, CompressionTypes.GZIP):
- raise ValueError(
- 'compression_type %s not supported for whole-file compression',
- compression_type)
-
- def _readable(self):
+ def readable(self):
mode = self._file.mode
return 'r' in mode or 'a' in mode
- def _writeable(self):
+ def writeable(self):
mode = self._file.mode
return 'w' in mode or 'a' in mode
@@ -708,10 +708,27 @@ class _CompressedFile(object):
def _fetch_to_internal_buffer(self, num_bytes):
"""Fetch up to num_bytes into the internal buffer."""
- while not self._read_eof and len(self._data) < num_bytes:
+ if (not self._read_eof and self._read_position > 0 and
+ (self._read_buffer.tell() - self._read_position) < num_bytes):
+ # There aren't enough number of bytes to accommodate a read, so we
+ # prepare for a possibly large read by clearing up all internal buffers
+ # but without dropping any previous held data.
+ self._read_buffer.seek(self._read_position)
+ data = self._read_buffer.read()
+ self._read_position = 0
+ self._read_buffer.seek(0)
+ self._read_buffer.truncate(0)
+ self._read_buffer.write(data)
+
+ while not self._read_eof and (self._read_buffer.tell() - self._read_position
+ ) < num_bytes:
+ # Continue reading from the underlying file object until enough bytes are
+ # available, or EOF is reached.
buf = self._file.read(self._read_size)
if buf:
- self._data += self._decompressor.decompress(buf)
+ decompressed = self._decompressor.decompress(buf)
+ del buf # Free up some possibly large and no-longer-needed memory.
+ self._read_buffer.write(decompressed)
else:
# EOF reached.
# Verify completeness and no corruption and flush (if needed by
@@ -728,62 +745,67 @@ class _CompressedFile(object):
except EOFError:
pass # All is as expected!
else:
- self._data += self._decompressor.flush()
+ self._read_buffer.write(self._decompressor.flush())
+
# Record that we have hit the end of file, so we won't unnecessarily
# repeat the completeness verification step above.
self._read_eof = True
- return
- def _read_from_internal_buffer(self, num_bytes):
- """Read up to num_bytes from the internal buffer."""
- # TODO: this can be optimized to avoid a string copy operation.
- result = self._data[:num_bytes]
- self._data = self._data[num_bytes:]
+ def _read_from_internal_buffer(self, read_fn):
+ """Read from the internal buffer by using the supplied read_fn."""
+ self._read_buffer.seek(self._read_position)
+ result = read_fn()
+ self._read_position += len(result)
+ self._read_buffer.seek(0, os.SEEK_END) # Allow future writes.
return result
def read(self, num_bytes):
if not self._decompressor:
raise ValueError('decompressor not initialized')
+
self._fetch_to_internal_buffer(num_bytes)
- return self._read_from_internal_buffer(num_bytes)
+ return self._read_from_internal_buffer(
+ lambda: self._read_buffer.read(num_bytes))
def readline(self):
"""Equivalent to standard file.readline(). Same return conventions apply."""
if not self._decompressor:
raise ValueError('decompressor not initialized')
- result = ''
+
+ io = cStringIO.StringIO()
while True:
- self._fetch_to_internal_buffer(self._read_size)
- if not self._data:
- break # EOF reached.
- index = self._data.find('\n')
- if index == -1:
- result += self._read_from_internal_buffer(len(self._data))
- else:
- result += self._read_from_internal_buffer(index + 1)
- break # Newline reached.
- return result
+ # Ensure that the internal buffer has at least half the read_size. Going
+ # with half the _read_size (as opposed to a full _read_size) to ensure
+ # that actual fetches are more evenly spread out, as opposed to having 2
+ # consecutive reads at the beginning of a read.
+ self._fetch_to_internal_buffer(self._read_size / 2)
+ line = self._read_from_internal_buffer(
+ lambda: self._read_buffer.readline())
+ io.write(line)
+ if line.endswith('\n') or not line:
+ break # Newline or EOF reached.
+
+ return io.getvalue()
- @property
def closed(self):
return not self._file or self._file.closed()
def close(self):
- if self._file is None:
- return
+ if self.readable():
+ self._read_buffer.close()
- if self._writeable():
+ if self.writeable():
self._file.write(self._compressor.flush())
+
self._file.close()
def flush(self):
- if self._writeable():
+ if self.writeable():
self._file.write(self._compressor.flush())
self._file.flush()
- # TODO: Add support for seeking to a file position.
- @property
def seekable(self):
+ # TODO: Add support for seeking to a file position.
return False
def __enter__(self):
http://git-wip-us.apache.org/repos/asf/beam/blob/ee09668b/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index d1fac66..0d18cc0 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -385,6 +385,7 @@ class GcsIO(object):
return file_sizes
+# TODO: Consider using cStringIO instead of buffers and data_lists when reading.
class GcsBufferedReader(object):
"""A class for reading Google Cloud Storage files."""
@@ -398,7 +399,6 @@ class GcsBufferedReader(object):
self.bucket, self.name = parse_gcs_path(path)
self.mode = mode
self.buffer_size = buffer_size
- self.mode = mode
# Get object state.
get_request = (storage.StorageObjectsGetRequest(
@@ -627,6 +627,8 @@ class GcsBufferedReader(object):
return False
+# TODO: Consider using cStringIO instead of buffers and data_lists when reading
+# and writing.
class GcsBufferedWriter(object):
"""A class for writing Google Cloud Storage files."""
[2/2] beam git commit: Closes #1777
Posted by ro...@apache.org.
Closes #1777
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/678da7a8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/678da7a8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/678da7a8
Branch: refs/heads/python-sdk
Commit: 678da7a8b42c956a52a5a83aee851cba7e2aae98
Parents: 36a7d34 ee09668
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Jan 18 13:43:21 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 13:43:21 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 124 ++++++++++++++++++------------
sdks/python/apache_beam/io/gcsio.py | 4 +-
2 files changed, 76 insertions(+), 52 deletions(-)
----------------------------------------------------------------------