You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2019/05/23 23:38:34 UTC
[beam] branch master updated: [BEAM-6027] Fix slow downloads when
reading from GCS (#8553)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4cf2830 [BEAM-6027] Fix slow downloads when reading from GCS (#8553)
4cf2830 is described below
commit 4cf2830990b0cd27b28a461de42473d4c4bddc12
Author: Fábio Franco Uechi <fa...@gmail.com>
AuthorDate: Thu May 23 20:38:18 2019 -0300
[BEAM-6027] Fix slow downloads when reading from GCS (#8553)
---
sdks/python/apache_beam/io/filesystemio.py | 17 ++++++++++++++++-
sdks/python/apache_beam/io/gcp/gcsio.py | 2 +-
2 files changed, 17 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/io/filesystemio.py b/sdks/python/apache_beam/io/filesystemio.py
index dca341d..8d21e99 100644
--- a/sdks/python/apache_beam/io/filesystemio.py
+++ b/sdks/python/apache_beam/io/filesystemio.py
@@ -80,16 +80,21 @@ class Uploader(with_metaclass(abc.ABCMeta, object)):
class DownloaderStream(io.RawIOBase):
"""Provides a stream interface for Downloader objects."""
- def __init__(self, downloader, mode='rb'):
+ def __init__(self,
+ downloader,
+ read_buffer_size=io.DEFAULT_BUFFER_SIZE,
+ mode='rb'):
"""Initializes the stream.
Args:
downloader: (Downloader) Filesystem dependent implementation.
+ read_buffer_size: (int) Buffer size to use during read operations.
mode: (string) Python mode attribute for this stream.
"""
self._downloader = downloader
self.mode = mode
self._position = 0
+ self._reader_buffer_size = read_buffer_size
def readinto(self, b):
"""Read up to len(b) bytes into b.
@@ -157,6 +162,16 @@ class DownloaderStream(io.RawIOBase):
def readable(self):
return True
+ def readall(self):
+ """Read until EOF, using multiple read() call."""
+ res = []
+ while True:
+ data = self.read(self._reader_buffer_size)
+ if not data:
+ break
+ res.append(data)
+ return b''.join(res)
+
class UploaderStream(io.RawIOBase):
"""Provides a stream interface for Uploader objects."""
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 33a94fc..5586eb4 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -174,7 +174,7 @@ class GcsIO(object):
if mode == 'r' or mode == 'rb':
downloader = GcsDownloader(self.client, filename,
buffer_size=read_buffer_size)
- return io.BufferedReader(DownloaderStream(downloader, mode=mode),
+ return io.BufferedReader(DownloaderStream(downloader, read_buffer_size=read_buffer_size, mode=mode),
buffer_size=read_buffer_size)
elif mode == 'w' or mode == 'wb':
uploader = GcsUploader(self.client, filename, mime_type)