You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/28 17:47:17 UTC
[1/2] beam git commit: A few memory and IO optimizations in Avro and
FileIO
Repository: beam
Updated Branches:
refs/heads/python-sdk 2d1655107 -> 5107dfad8
A few memory and IO optimizations in Avro and FileIO
* Using a buffer when decompressing with Snappy in Avro in order to avoid
unnecessary copies during slicing of large strings.
* Removing GC-related optimizations.
* Aligning _CompressedFile read_size with that of GCSIO to avoid reading
deficiencies for compressed files.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/672a7d7b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/672a7d7b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/672a7d7b
Branch: refs/heads/python-sdk
Commit: 672a7d7b0509f0c82d79516c92824911a80840f4
Parents: 2d16551
Author: Gus Katsiapis <ka...@katsiapis-linux.mtv.corp.google.com>
Authored: Mon Dec 26 12:13:06 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 09:46:48 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/avroio.py | 10 ++++++----
sdks/python/apache_beam/io/filebasedsource_test.py | 8 ++++----
sdks/python/apache_beam/io/fileio.py | 2 +-
sdks/python/apache_beam/io/gcsio.py | 6 +++---
4 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/672a7d7b/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 75a9f2a..5dab651 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -16,7 +16,7 @@
#
"""Implements a source for reading Avro files."""
-import cStringIO as StringIO
+import cStringIO
import os
import zlib
@@ -198,8 +198,10 @@ class _AvroBlock(object):
raise ValueError('Snappy does not seem to be installed.')
# Compressed data includes a 4-byte CRC32 checksum which we verify.
- result = snappy.decompress(data[:-4])
- avroio.BinaryDecoder(StringIO.StringIO(data[-4:])).check_crc32(result)
+ # We take care to avoid extra copies of data while slicing large objects
+ # by use of a buffer.
+ result = snappy.decompress(buffer(data)[:-4])
+ avroio.BinaryDecoder(cStringIO.StringIO(data[-4:])).check_crc32(result)
return result
else:
raise ValueError('Unknown codec: %r', codec)
@@ -209,7 +211,7 @@ class _AvroBlock(object):
def records(self):
decoder = avroio.BinaryDecoder(
- StringIO.StringIO(self._decompressed_block_bytes))
+ cStringIO.StringIO(self._decompressed_block_bytes))
reader = avroio.DatumReader(
writers_schema=self._schema, readers_schema=self._schema)
http://git-wip-us.apache.org/repos/asf/beam/blob/672a7d7b/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index c93dc5a..f6fab4a 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -16,7 +16,7 @@
#
import bz2
-import cStringIO as StringIO
+import cStringIO
import gzip
import logging
import math
@@ -451,7 +451,7 @@ class TestFileBasedSource(unittest.TestCase):
chunks = [lines[splits[i-1]:splits[i]] for i in xrange(1, len(splits))]
compressed_chunks = []
for c in chunks:
- out = StringIO.StringIO()
+ out = cStringIO.StringIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
f.write('\n'.join(c))
compressed_chunks.append(out.getvalue())
@@ -498,7 +498,7 @@ class TestFileBasedSource(unittest.TestCase):
chunks = [lines[splits[i - 1]:splits[i]] for i in xrange(1, len(splits))]
compressed_chunks = []
for c in chunks:
- out = StringIO.StringIO()
+ out = cStringIO.StringIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
f.write('\n'.join(c))
compressed_chunks.append(out.getvalue())
@@ -518,7 +518,7 @@ class TestFileBasedSource(unittest.TestCase):
chunks_to_write = []
for i, c in enumerate(chunks):
if i%2 == 0:
- out = StringIO.StringIO()
+ out = cStringIO.StringIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
f.write('\n'.join(c))
chunks_to_write.append(out.getvalue())
http://git-wip-us.apache.org/repos/asf/beam/blob/672a7d7b/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index d7ff35b..4ee6a3e 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -649,7 +649,7 @@ class _CompressedFile(object):
def __init__(self,
fileobj,
compression_type=CompressionTypes.GZIP,
- read_size=16384):
+ read_size=gcsio.DEFAULT_READ_BUFFER_SIZE):
if not fileobj:
raise ValueError('fileobj must be opened file but was %s' % fileobj)
self._validate_compression_type(compression_type)
http://git-wip-us.apache.org/repos/asf/beam/blob/672a7d7b/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index f150c4c..d1fac66 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -20,7 +20,7 @@ This library evolved from the Google App Engine GCS client available at
https://github.com/GoogleCloudPlatform/appengine-gcs-client.
"""
-import cStringIO as StringIO
+import cStringIO
import errno
import fnmatch
import logging
@@ -418,7 +418,7 @@ class GcsBufferedReader(object):
get_request.generation = metadata.generation
# Initialize read buffer state.
- self.download_stream = StringIO.StringIO()
+ self.download_stream = cStringIO.StringIO()
self.downloader = transfer.Download(
self.download_stream, auto_transfer=False, chunksize=buffer_size)
self.client.objects.Get(get_request, download=self.downloader)
@@ -558,7 +558,7 @@ class GcsBufferedReader(object):
end = start + size - 1
self.downloader.GetRange(start, end)
value = self.download_stream.getvalue()
- # Clear the StringIO object after we've read its contents.
+ # Clear the cStringIO object after we've read its contents.
self.download_stream.truncate(0)
assert len(value) == size
return value
[2/2] beam git commit: Closes #1694
Posted by dh...@apache.org.
Closes #1694
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5107dfad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5107dfad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5107dfad
Branch: refs/heads/python-sdk
Commit: 5107dfad80ae236f34a66a171c5e4a8a9c6bc720
Parents: 2d16551 672a7d7
Author: Dan Halperin <dh...@google.com>
Authored: Wed Dec 28 09:47:01 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 09:47:01 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/avroio.py | 10 ++++++----
sdks/python/apache_beam/io/filebasedsource_test.py | 8 ++++----
sdks/python/apache_beam/io/fileio.py | 2 +-
sdks/python/apache_beam/io/gcsio.py | 6 +++---
4 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------