You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/29 01:17:56 UTC
[1/2] incubator-beam git commit: Increased the GCS buffer size from
1MB to 8MB and introduced a 128kB buffer for the pipe.
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 351c3831d -> c155ef0eb
Increased the GCS buffer size from 1MB to 8MB and introduced a 128kB buffer for the pipe.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1f1fa06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1f1fa06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1f1fa06
Branch: refs/heads/python-sdk
Commit: a1f1fa06ee8683273182548e7eb2d6612040d2bf
Parents: 351c383
Author: Marian Dvorsky <ma...@google.com>
Authored: Thu Jul 28 13:02:15 2016 -0700
Committer: Marian Dvorsky <ma...@google.com>
Committed: Thu Jul 28 13:02:15 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/gcsio.py | 30 +++++++++++++++++++++---------
1 file changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1f1fa06/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index 9377266..88fcfb8 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -49,6 +49,7 @@ except ImportError:
DEFAULT_READ_BUFFER_SIZE = 1024 * 1024
+WRITE_CHUNK_SIZE = 8 * 1024 * 1024
def parse_gcs_path(gcs_path):
@@ -546,6 +547,10 @@ class GcsBufferedWriter(object):
self.closed = False
self.position = 0
+ # A small buffer to avoid CPU-heavy per-write pipe calls.
+ self.write_buffer = bytearray()
+ self.write_buffer_size = 128 * 1024
+
# Set up communication with uploading thread.
parent_conn, child_conn = multiprocessing.Pipe()
self.child_conn = child_conn
@@ -557,7 +562,7 @@ class GcsBufferedWriter(object):
bucket=self.bucket,
name=self.name))
self.upload = transfer.Upload(GcsBufferedWriter.PipeStream(child_conn),
- mime_type)
+ mime_type, chunksize=WRITE_CHUNK_SIZE)
self.upload.strategy = transfer.RESUMABLE_UPLOAD
# Start uploading thread.
@@ -598,14 +603,10 @@ class GcsBufferedWriter(object):
self._check_open()
if not data:
return
- try:
- self.conn.send_bytes(data)
- self.position += len(data)
- except IOError:
- if self.upload_thread.last_error:
- raise self.upload_thread.last_error # pylint: disable=raising-bad-type
- else:
- raise
+ self.write_buffer.extend(data)
+ if len(self.write_buffer) > self.write_buffer_size:
+ self._flush_write_buffer()
+ self.position += len(data)
def tell(self):
"""Return the total number of bytes passed to write() so far."""
@@ -613,6 +614,7 @@ class GcsBufferedWriter(object):
def close(self):
"""Close the current GCS file."""
+ self._flush_write_buffer()
self.closed = True
self.conn.close()
self.upload_thread.join()
@@ -635,3 +637,13 @@ class GcsBufferedWriter(object):
def writable(self):
return True
+
+ def _flush_write_buffer(self):
+ try:
+ self.conn.send_bytes(buffer(self.write_buffer))
+ self.write_buffer = bytearray()
+ except IOError:
+ if self.upload_thread.last_error:
+ raise self.upload_thread.last_error # pylint: disable=raising-bad-type
+ else:
+ raise
[2/2] incubator-beam git commit: Closes #752
Posted by ro...@apache.org.
Closes #752
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c155ef0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c155ef0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c155ef0e
Branch: refs/heads/python-sdk
Commit: c155ef0ebc88ac34a3681b8bff6152e1857da847
Parents: 351c383 a1f1fa0
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jul 28 18:17:43 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jul 28 18:17:43 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/gcsio.py | 30 +++++++++++++++++++++---------
1 file changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------