You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/28 17:47:17 UTC

[1/2] beam git commit: A few memory and IO optimizations in Avro and FileIO

Repository: beam
Updated Branches:
  refs/heads/python-sdk 2d1655107 -> 5107dfad8


A few memory and IO optimizations in Avro and FileIO

* Using a buffer when decompressing with Snappy in Avro in order to avoid
  unnecessary copies during slicing of large strings.

* Removing GC-related optimizations.

* Aligning _CompressedFile read_size with that of GCSIO to avoid reading
  deficiencies for compressed files.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/672a7d7b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/672a7d7b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/672a7d7b

Branch: refs/heads/python-sdk
Commit: 672a7d7b0509f0c82d79516c92824911a80840f4
Parents: 2d16551
Author: Gus Katsiapis <ka...@katsiapis-linux.mtv.corp.google.com>
Authored: Mon Dec 26 12:13:06 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 09:46:48 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio.py               | 10 ++++++----
 sdks/python/apache_beam/io/filebasedsource_test.py |  8 ++++----
 sdks/python/apache_beam/io/fileio.py               |  2 +-
 sdks/python/apache_beam/io/gcsio.py                |  6 +++---
 4 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/672a7d7b/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 75a9f2a..5dab651 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -16,7 +16,7 @@
 #
 """Implements a source for reading Avro files."""
 
-import cStringIO as StringIO
+import cStringIO
 import os
 import zlib
 
@@ -198,8 +198,10 @@ class _AvroBlock(object):
         raise ValueError('Snappy does not seem to be installed.')
 
       # Compressed data includes a 4-byte CRC32 checksum which we verify.
-      result = snappy.decompress(data[:-4])
-      avroio.BinaryDecoder(StringIO.StringIO(data[-4:])).check_crc32(result)
+      # We take care to avoid extra copies of data while slicing large objects
+      # by use of a buffer.
+      result = snappy.decompress(buffer(data)[:-4])
+      avroio.BinaryDecoder(cStringIO.StringIO(data[-4:])).check_crc32(result)
       return result
     else:
       raise ValueError('Unknown codec: %r', codec)
@@ -209,7 +211,7 @@ class _AvroBlock(object):
 
   def records(self):
     decoder = avroio.BinaryDecoder(
-        StringIO.StringIO(self._decompressed_block_bytes))
+        cStringIO.StringIO(self._decompressed_block_bytes))
     reader = avroio.DatumReader(
         writers_schema=self._schema, readers_schema=self._schema)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/672a7d7b/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index c93dc5a..f6fab4a 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -16,7 +16,7 @@
 #
 
 import bz2
-import cStringIO as StringIO
+import cStringIO
 import gzip
 import logging
 import math
@@ -451,7 +451,7 @@ class TestFileBasedSource(unittest.TestCase):
     chunks = [lines[splits[i-1]:splits[i]] for i in xrange(1, len(splits))]
     compressed_chunks = []
     for c in chunks:
-      out = StringIO.StringIO()
+      out = cStringIO.StringIO()
       with gzip.GzipFile(fileobj=out, mode="w") as f:
         f.write('\n'.join(c))
       compressed_chunks.append(out.getvalue())
@@ -498,7 +498,7 @@ class TestFileBasedSource(unittest.TestCase):
     chunks = [lines[splits[i - 1]:splits[i]] for i in xrange(1, len(splits))]
     compressed_chunks = []
     for c in chunks:
-      out = StringIO.StringIO()
+      out = cStringIO.StringIO()
       with gzip.GzipFile(fileobj=out, mode="w") as f:
         f.write('\n'.join(c))
       compressed_chunks.append(out.getvalue())
@@ -518,7 +518,7 @@ class TestFileBasedSource(unittest.TestCase):
     chunks_to_write = []
     for i, c in enumerate(chunks):
       if i%2 == 0:
-        out = StringIO.StringIO()
+        out = cStringIO.StringIO()
         with gzip.GzipFile(fileobj=out, mode="w") as f:
           f.write('\n'.join(c))
         chunks_to_write.append(out.getvalue())

http://git-wip-us.apache.org/repos/asf/beam/blob/672a7d7b/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index d7ff35b..4ee6a3e 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -649,7 +649,7 @@ class _CompressedFile(object):
   def __init__(self,
                fileobj,
                compression_type=CompressionTypes.GZIP,
-               read_size=16384):
+               read_size=gcsio.DEFAULT_READ_BUFFER_SIZE):
     if not fileobj:
       raise ValueError('fileobj must be opened file but was %s' % fileobj)
     self._validate_compression_type(compression_type)

http://git-wip-us.apache.org/repos/asf/beam/blob/672a7d7b/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index f150c4c..d1fac66 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -20,7 +20,7 @@ This library evolved from the Google App Engine GCS client available at
 https://github.com/GoogleCloudPlatform/appengine-gcs-client.
 """
 
-import cStringIO as StringIO
+import cStringIO
 import errno
 import fnmatch
 import logging
@@ -418,7 +418,7 @@ class GcsBufferedReader(object):
     get_request.generation = metadata.generation
 
     # Initialize read buffer state.
-    self.download_stream = StringIO.StringIO()
+    self.download_stream = cStringIO.StringIO()
     self.downloader = transfer.Download(
         self.download_stream, auto_transfer=False, chunksize=buffer_size)
     self.client.objects.Get(get_request, download=self.downloader)
@@ -558,7 +558,7 @@ class GcsBufferedReader(object):
     end = start + size - 1
     self.downloader.GetRange(start, end)
     value = self.download_stream.getvalue()
-    # Clear the StringIO object after we've read its contents.
+    # Clear the cStringIO object after we've read its contents.
     self.download_stream.truncate(0)
     assert len(value) == size
     return value


[2/2] beam git commit: Closes #1694

Posted by dh...@apache.org.
Closes #1694


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5107dfad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5107dfad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5107dfad

Branch: refs/heads/python-sdk
Commit: 5107dfad80ae236f34a66a171c5e4a8a9c6bc720
Parents: 2d16551 672a7d7
Author: Dan Halperin <dh...@google.com>
Authored: Wed Dec 28 09:47:01 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 09:47:01 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio.py               | 10 ++++++----
 sdks/python/apache_beam/io/filebasedsource_test.py |  8 ++++----
 sdks/python/apache_beam/io/fileio.py               |  2 +-
 sdks/python/apache_beam/io/gcsio.py                |  6 +++---
 4 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------