You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/29 01:17:56 UTC

[1/2] incubator-beam git commit: Increased the GCS buffer size from 1MB to 8MB and introduced a 128kB buffer for the pipe.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 351c3831d -> c155ef0eb


Increased the GCS buffer size from 1MB to 8MB and introduced a 128kB buffer for the pipe.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1f1fa06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1f1fa06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1f1fa06

Branch: refs/heads/python-sdk
Commit: a1f1fa06ee8683273182548e7eb2d6612040d2bf
Parents: 351c383
Author: Marian Dvorsky <ma...@google.com>
Authored: Thu Jul 28 13:02:15 2016 -0700
Committer: Marian Dvorsky <ma...@google.com>
Committed: Thu Jul 28 13:02:15 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcsio.py | 30 +++++++++++++++++++++---------
 1 file changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1f1fa06/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index 9377266..88fcfb8 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -49,6 +49,7 @@ except ImportError:
 
 
 DEFAULT_READ_BUFFER_SIZE = 1024 * 1024
+WRITE_CHUNK_SIZE = 8 * 1024 * 1024
 
 
 def parse_gcs_path(gcs_path):
@@ -546,6 +547,10 @@ class GcsBufferedWriter(object):
     self.closed = False
     self.position = 0
 
+    # A small buffer to avoid CPU-heavy per-write pipe calls.
+    self.write_buffer = bytearray()
+    self.write_buffer_size = 128 * 1024
+
     # Set up communication with uploading thread.
     parent_conn, child_conn = multiprocessing.Pipe()
     self.child_conn = child_conn
@@ -557,7 +562,7 @@ class GcsBufferedWriter(object):
             bucket=self.bucket,
             name=self.name))
     self.upload = transfer.Upload(GcsBufferedWriter.PipeStream(child_conn),
-                                  mime_type)
+                                  mime_type, chunksize=WRITE_CHUNK_SIZE)
     self.upload.strategy = transfer.RESUMABLE_UPLOAD
 
     # Start uploading thread.
@@ -598,14 +603,10 @@ class GcsBufferedWriter(object):
     self._check_open()
     if not data:
       return
-    try:
-      self.conn.send_bytes(data)
-      self.position += len(data)
-    except IOError:
-      if self.upload_thread.last_error:
-        raise self.upload_thread.last_error  # pylint: disable=raising-bad-type
-      else:
-        raise
+    self.write_buffer.extend(data)
+    if len(self.write_buffer) > self.write_buffer_size:
+      self._flush_write_buffer()
+    self.position += len(data)
 
   def tell(self):
     """Return the total number of bytes passed to write() so far."""
@@ -613,6 +614,7 @@ class GcsBufferedWriter(object):
 
   def close(self):
     """Close the current GCS file."""
+    self._flush_write_buffer()
     self.closed = True
     self.conn.close()
     self.upload_thread.join()
@@ -635,3 +637,13 @@ class GcsBufferedWriter(object):
 
   def writable(self):
     return True
+
+  def _flush_write_buffer(self):
+    try:
+      self.conn.send_bytes(buffer(self.write_buffer))
+      self.write_buffer = bytearray()
+    except IOError:
+      if self.upload_thread.last_error:
+        raise self.upload_thread.last_error  # pylint: disable=raising-bad-type
+      else:
+        raise


[2/2] incubator-beam git commit: Closes #752

Posted by ro...@apache.org.
Closes #752


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c155ef0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c155ef0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c155ef0e

Branch: refs/heads/python-sdk
Commit: c155ef0ebc88ac34a3681b8bff6152e1857da847
Parents: 351c383 a1f1fa0
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jul 28 18:17:43 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jul 28 18:17:43 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcsio.py | 30 +++++++++++++++++++++---------
 1 file changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------