You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2019/05/23 23:38:34 UTC

[beam] branch master updated: [BEAM-6027] Fix slow downloads when reading from GCS (#8553)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cf2830  [BEAM-6027] Fix slow downloads when reading from GCS (#8553)
4cf2830 is described below

commit 4cf2830990b0cd27b28a461de42473d4c4bddc12
Author: Fábio Franco Uechi <fa...@gmail.com>
AuthorDate: Thu May 23 20:38:18 2019 -0300

    [BEAM-6027] Fix slow downloads when reading from GCS (#8553)
---
 sdks/python/apache_beam/io/filesystemio.py | 17 ++++++++++++++++-
 sdks/python/apache_beam/io/gcp/gcsio.py    |  2 +-
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/io/filesystemio.py b/sdks/python/apache_beam/io/filesystemio.py
index dca341d..8d21e99 100644
--- a/sdks/python/apache_beam/io/filesystemio.py
+++ b/sdks/python/apache_beam/io/filesystemio.py
@@ -80,16 +80,21 @@ class Uploader(with_metaclass(abc.ABCMeta, object)):
 class DownloaderStream(io.RawIOBase):
   """Provides a stream interface for Downloader objects."""
 
-  def __init__(self, downloader, mode='rb'):
+  def __init__(self,
+               downloader,
+               read_buffer_size=io.DEFAULT_BUFFER_SIZE,
+               mode='rb'):
     """Initializes the stream.
 
     Args:
       downloader: (Downloader) Filesystem dependent implementation.
+      read_buffer_size: (int) Buffer size to use during read operations.
       mode: (string) Python mode attribute for this stream.
     """
     self._downloader = downloader
     self.mode = mode
     self._position = 0
+    self._reader_buffer_size = read_buffer_size
 
   def readinto(self, b):
     """Read up to len(b) bytes into b.
@@ -157,6 +162,16 @@ class DownloaderStream(io.RawIOBase):
   def readable(self):
     return True
 
+  def readall(self):
+    """Read until EOF, using multiple read() call."""
+    res = []
+    while True:
+      data = self.read(self._reader_buffer_size)
+      if not data:
+        break
+      res.append(data)
+    return b''.join(res)
+
 
 class UploaderStream(io.RawIOBase):
   """Provides a stream interface for Uploader objects."""
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 33a94fc..5586eb4 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -174,7 +174,7 @@ class GcsIO(object):
     if mode == 'r' or mode == 'rb':
       downloader = GcsDownloader(self.client, filename,
                                  buffer_size=read_buffer_size)
-      return io.BufferedReader(DownloaderStream(downloader, mode=mode),
+      return io.BufferedReader(DownloaderStream(downloader, read_buffer_size=read_buffer_size, mode=mode),
                                buffer_size=read_buffer_size)
     elif mode == 'w' or mode == 'wb':
       uploader = GcsUploader(self.client, filename, mime_type)