You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/18 21:43:49 UTC

[1/2] beam git commit: Moving from a string-based buffer to a cStringIO based on in order to help with performance of compressed sources.

Repository: beam
Updated Branches:
  refs/heads/python-sdk 36a7d3491 -> 678da7a8b


Moving from a string-based buffer to a cStringIO based on in order to
help with performance of compressed sources.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee09668b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee09668b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee09668b

Branch: refs/heads/python-sdk
Commit: ee09668becaed21bfb15bac267a2df5e166bd6b9
Parents: 36a7d34
Author: Gus Katsiapis <ka...@katsiapis-linux.mtv.corp.google.com>
Authored: Fri Jan 13 18:28:12 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 13:43:20 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py | 124 ++++++++++++++++++------------
 sdks/python/apache_beam/io/gcsio.py  |   4 +-
 2 files changed, 76 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ee09668b/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 6ea7844..ebc4fed 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -19,6 +19,7 @@
 from __future__ import absolute_import
 
 import bz2
+import cStringIO
 import glob
 import logging
 from multiprocessing.pool import ThreadPool
@@ -68,6 +69,7 @@ class _CompressionType(object):
 
 class CompressionTypes(object):
   """Enum-like class representing known compression types."""
+
   # Detect compression based on filename extension.
   #
   # The following extensions are currently recognized by auto-detection:
@@ -642,9 +644,9 @@ class ChannelFactory(object):
 class _CompressedFile(object):
   """Somewhat limited file wrapper for easier handling of compressed files."""
 
-  # The bit mask to use for the wbits parameters of the GZIP compressor and
+  # The bit mask to use for the wbits parameters of the zlib compressor and
   # decompressor objects.
-  _gzip_mask = zlib.MAX_WBITS | 16
+  _gzip_mask = zlib.MAX_WBITS | 16  # Mask when using GZIP headers.
 
   def __init__(self,
                fileobj,
@@ -652,49 +654,47 @@ class _CompressedFile(object):
                read_size=gcsio.DEFAULT_READ_BUFFER_SIZE):
     if not fileobj:
       raise ValueError('fileobj must be opened file but was %s' % fileobj)
-    self._validate_compression_type(compression_type)
+
+    if not CompressionTypes.is_valid_compression_type(compression_type):
+      raise TypeError('compression_type must be CompressionType object but '
+                      'was %s' % type(compression_type))
+    if compression_type in (CompressionTypes.AUTO, CompressionTypes.UNCOMPRESSED
+                           ):
+      raise ValueError(
+          'Cannot create object with unspecified or no compression')
 
     self._file = fileobj
-    self._data = ''
-    self._read_size = read_size
     self._compression_type = compression_type
 
-    if self._readable():
+    if self.readable():
+      self._read_size = read_size
+      self._read_buffer = cStringIO.StringIO()
+      self._read_position = 0
+      self._read_eof = False
+
       if self._compression_type == CompressionTypes.BZIP2:
         self._decompressor = bz2.BZ2Decompressor()
       else:
+        assert self._compression_type == CompressionTypes.GZIP
         self._decompressor = zlib.decompressobj(self._gzip_mask)
-      self._read_eof = False
     else:
       self._decompressor = None
 
-    if self._writeable():
+    if self.writeable():
       if self._compression_type == CompressionTypes.BZIP2:
         self._compressor = bz2.BZ2Compressor()
       else:
+        assert self._compression_type == CompressionTypes.GZIP
         self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
                                             zlib.DEFLATED, self._gzip_mask)
     else:
       self._compressor = None
 
-  def _validate_compression_type(self, compression_type):
-    if not CompressionTypes.is_valid_compression_type(compression_type):
-      raise TypeError('compression_type must be CompressionType object but '
-                      'was %s' % type(compression_type))
-    if compression_type in (CompressionTypes.AUTO, CompressionTypes.UNCOMPRESSED
-                           ):
-      raise ValueError(
-          'Cannot create object with unspecified or no compression')
-    if compression_type not in (CompressionTypes.BZIP2, CompressionTypes.GZIP):
-      raise ValueError(
-          'compression_type %s not supported for whole-file compression',
-          compression_type)
-
-  def _readable(self):
+  def readable(self):
     mode = self._file.mode
     return 'r' in mode or 'a' in mode
 
-  def _writeable(self):
+  def writeable(self):
     mode = self._file.mode
     return 'w' in mode or 'a' in mode
 
@@ -708,10 +708,27 @@ class _CompressedFile(object):
 
   def _fetch_to_internal_buffer(self, num_bytes):
     """Fetch up to num_bytes into the internal buffer."""
-    while not self._read_eof and len(self._data) < num_bytes:
+    if (not self._read_eof and self._read_position > 0 and
+        (self._read_buffer.tell() - self._read_position) < num_bytes):
+      # There aren't enough number of bytes to accommodate a read, so we
+      # prepare for a possibly large read by clearing up all internal buffers
+      # but without dropping any previous held data.
+      self._read_buffer.seek(self._read_position)
+      data = self._read_buffer.read()
+      self._read_position = 0
+      self._read_buffer.seek(0)
+      self._read_buffer.truncate(0)
+      self._read_buffer.write(data)
+
+    while not self._read_eof and (self._read_buffer.tell() - self._read_position
+                                 ) < num_bytes:
+      # Continue reading from the underlying file object until enough bytes are
+      # available, or EOF is reached.
       buf = self._file.read(self._read_size)
       if buf:
-        self._data += self._decompressor.decompress(buf)
+        decompressed = self._decompressor.decompress(buf)
+        del buf  # Free up some possibly large and no-longer-needed memory.
+        self._read_buffer.write(decompressed)
       else:
         # EOF reached.
         # Verify completeness and no corruption and flush (if needed by
@@ -728,62 +745,67 @@ class _CompressedFile(object):
           except EOFError:
             pass  # All is as expected!
         else:
-          self._data += self._decompressor.flush()
+          self._read_buffer.write(self._decompressor.flush())
+
         # Record that we have hit the end of file, so we won't unnecessarily
         # repeat the completeness verification step above.
         self._read_eof = True
-        return
 
-  def _read_from_internal_buffer(self, num_bytes):
-    """Read up to num_bytes from the internal buffer."""
-    # TODO: this can be optimized to avoid a string copy operation.
-    result = self._data[:num_bytes]
-    self._data = self._data[num_bytes:]
+  def _read_from_internal_buffer(self, read_fn):
+    """Read from the internal buffer by using the supplied read_fn."""
+    self._read_buffer.seek(self._read_position)
+    result = read_fn()
+    self._read_position += len(result)
+    self._read_buffer.seek(0, os.SEEK_END)  # Allow future writes.
     return result
 
   def read(self, num_bytes):
     if not self._decompressor:
       raise ValueError('decompressor not initialized')
+
     self._fetch_to_internal_buffer(num_bytes)
-    return self._read_from_internal_buffer(num_bytes)
+    return self._read_from_internal_buffer(
+        lambda: self._read_buffer.read(num_bytes))
 
   def readline(self):
     """Equivalent to standard file.readline(). Same return conventions apply."""
     if not self._decompressor:
       raise ValueError('decompressor not initialized')
-    result = ''
+
+    io = cStringIO.StringIO()
     while True:
-      self._fetch_to_internal_buffer(self._read_size)
-      if not self._data:
-        break  # EOF reached.
-      index = self._data.find('\n')
-      if index == -1:
-        result += self._read_from_internal_buffer(len(self._data))
-      else:
-        result += self._read_from_internal_buffer(index + 1)
-        break  # Newline reached.
-    return result
+      # Ensure that the internal buffer has at least half the read_size. Going
+      # with half the _read_size (as opposed to a full _read_size) to ensure
+      # that actual fetches are more evenly spread out, as opposed to having 2
+      # consecutive reads at the beginning of a read.
+      self._fetch_to_internal_buffer(self._read_size / 2)
+      line = self._read_from_internal_buffer(
+          lambda: self._read_buffer.readline())
+      io.write(line)
+      if line.endswith('\n') or not line:
+        break  # Newline or EOF reached.
+
+    return io.getvalue()
 
-  @property
   def closed(self):
     return not self._file or self._file.closed()
 
   def close(self):
-    if self._file is None:
-      return
+    if self.readable():
+      self._read_buffer.close()
 
-    if self._writeable():
+    if self.writeable():
       self._file.write(self._compressor.flush())
+
     self._file.close()
 
   def flush(self):
-    if self._writeable():
+    if self.writeable():
       self._file.write(self._compressor.flush())
     self._file.flush()
 
-  # TODO: Add support for seeking to a file position.
-  @property
   def seekable(self):
+    # TODO: Add support for seeking to a file position.
     return False
 
   def __enter__(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/ee09668b/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index d1fac66..0d18cc0 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -385,6 +385,7 @@ class GcsIO(object):
     return file_sizes
 
 
+# TODO: Consider using cStringIO instead of buffers and data_lists when reading.
 class GcsBufferedReader(object):
   """A class for reading Google Cloud Storage files."""
 
@@ -398,7 +399,6 @@ class GcsBufferedReader(object):
     self.bucket, self.name = parse_gcs_path(path)
     self.mode = mode
     self.buffer_size = buffer_size
-    self.mode = mode
 
     # Get object state.
     get_request = (storage.StorageObjectsGetRequest(
@@ -627,6 +627,8 @@ class GcsBufferedReader(object):
     return False
 
 
+# TODO: Consider using cStringIO instead of buffers and data_lists when reading
+# and writing.
 class GcsBufferedWriter(object):
   """A class for writing Google Cloud Storage files."""
 


[2/2] beam git commit: Closes #1777

Posted by ro...@apache.org.
Closes #1777


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/678da7a8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/678da7a8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/678da7a8

Branch: refs/heads/python-sdk
Commit: 678da7a8b42c956a52a5a83aee851cba7e2aae98
Parents: 36a7d34 ee09668
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Jan 18 13:43:21 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jan 18 13:43:21 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py | 124 ++++++++++++++++++------------
 sdks/python/apache_beam/io/gcsio.py  |   4 +-
 2 files changed, 76 insertions(+), 52 deletions(-)
----------------------------------------------------------------------