You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/05 18:37:10 UTC

[1/2] beam git commit: Improve performance of fileio._CompressedFile

Repository: beam
Updated Branches:
  refs/heads/python-sdk 2df3eda4d -> 5b031139a


Improve performance of fileio._CompressedFile


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/81e44b83
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/81e44b83
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/81e44b83

Branch: refs/heads/python-sdk
Commit: 81e44b833ce54e44e6506eb028afe8564c46cf18
Parents: 2df3eda
Author: Charles Chen <cc...@google.com>
Authored: Wed Jan 4 15:48:30 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jan 5 10:36:42 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/81e44b83/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 4ee6a3e..6ea7844 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -664,6 +664,7 @@ class _CompressedFile(object):
         self._decompressor = bz2.BZ2Decompressor()
       else:
         self._decompressor = zlib.decompressobj(self._gzip_mask)
+      self._read_eof = False
     else:
       self._decompressor = None
 
@@ -707,7 +708,7 @@ class _CompressedFile(object):
 
   def _fetch_to_internal_buffer(self, num_bytes):
     """Fetch up to num_bytes into the internal buffer."""
-    while len(self._data) < num_bytes:
+    while not self._read_eof and len(self._data) < num_bytes:
       buf = self._file.read(self._read_size)
       if buf:
         self._data += self._decompressor.decompress(buf)
@@ -728,10 +729,14 @@ class _CompressedFile(object):
             pass  # All is as expected!
         else:
           self._data += self._decompressor.flush()
+        # Record that we have hit the end of file, so we won't unnecessarily
+        # repeat the completeness verification step above.
+        self._read_eof = True
         return
 
   def _read_from_internal_buffer(self, num_bytes):
     """Read up to num_bytes from the internal buffer."""
+    # TODO: this can be optimized to avoid a string copy operation.
     result = self._data[:num_bytes]
     self._data = self._data[num_bytes:]
     return result


[2/2] beam git commit: Closes #1733

Posted by ro...@apache.org.
Closes #1733


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5b031139
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5b031139
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5b031139

Branch: refs/heads/python-sdk
Commit: 5b031139a0f03cc9bbbf324edff3f9e5f8a1174e
Parents: 2df3eda 81e44b8
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jan 5 10:36:43 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jan 5 10:36:43 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------