You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/15 16:49:43 UTC
[1/2] incubator-beam git commit: Use batch GCS operations during
FileSink write finalization
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 560fe79f8 -> 66f324b35
Use batch GCS operations during FileSink write finalization
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/313191e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/313191e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/313191e1
Branch: refs/heads/python-sdk
Commit: 313191e129b884e4e14e9f503a757147d368217c
Parents: 560fe79
Author: Charles Chen <cc...@google.com>
Authored: Thu Nov 10 11:54:08 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 15 08:48:47 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 177 ++++++++++++++++---------
sdks/python/apache_beam/io/fileio_test.py | 2 +-
sdks/python/apache_beam/io/gcsio.py | 78 +++++++++++
sdks/python/apache_beam/io/gcsio_test.py | 103 +++++++++++++-
sdks/python/apache_beam/utils/retry.py | 3 +
5 files changed, 298 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 669bfc9..ef20a7c 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -31,6 +31,7 @@ import zlib
import weakref
from apache_beam import coders
+from apache_beam.io import gcsio
from apache_beam.io import iobase
from apache_beam.io import range_trackers
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
@@ -451,8 +452,6 @@ class ChannelFactory(object):
'was %s' % type(compression_type))
if path.startswith('gs://'):
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.io import gcsio
raw_file = gcsio.GcsIO().open(
path,
mode,
@@ -470,40 +469,92 @@ class ChannelFactory(object):
return isinstance(fileobj, _CompressedFile)
@staticmethod
- def rename(src, dst):
+ def rename(src, dest):
if src.startswith('gs://'):
- assert dst.startswith('gs://'), dst
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.io import gcsio
- gcsio.GcsIO().rename(src, dst)
+ if not dest.startswith('gs://'):
+ raise ValueError('Destination %r must be GCS path.', dest)
+ gcsio.GcsIO().rename(src, dest)
else:
try:
- os.rename(src, dst)
+ os.rename(src, dest)
except OSError as err:
raise IOError(err)
@staticmethod
- def copytree(src, dst):
+ def rename_batch(src_dest_pairs):
+ # Filter out local and GCS operations.
+ local_src_dest_pairs = []
+ gcs_src_dest_pairs = []
+ for src, dest in src_dest_pairs:
+ if src.startswith('gs://'):
+ if not dest.startswith('gs://'):
+ raise ValueError('Destination %r must be GCS path.', dest)
+ gcs_src_dest_pairs.append((src, dest))
+ else:
+ local_src_dest_pairs.append((src, dest))
+
+ # Execute local operations.
+ exceptions = []
+ for src, dest in local_src_dest_pairs:
+ try:
+ ChannelFactory.rename(src, dest)
+ except Exception as e: # pylint: disable=broad-except
+ exceptions.append((src, dest, e))
+
+ # Execute GCS operations.
+ exceptions += ChannelFactory._rename_gcs_batch(gcs_src_dest_pairs)
+
+ return exceptions
+
+ @staticmethod
+ def _rename_gcs_batch(src_dest_pairs):
+ # Prepare batches.
+ gcs_batches = []
+ gcs_current_batch = []
+ for src, dest in src_dest_pairs:
+ if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE:
+ gcs_batches.append(gcs_current_batch)
+ gcs_current_batch = []
+ if gcs_current_batch:
+ gcs_batches.append(gcs_current_batch)
+
+ # Execute GCS renames if any and return exceptions.
+ exceptions = []
+ for batch in gcs_batches:
+ copy_statuses = gcsio.GcsIO().copy_batch(batch)
+ copy_succeeded = []
+ for src, dest, exception in copy_statuses:
+ if exception:
+ exceptions.append((src, dest, exception))
+ else:
+ copy_succeeded.append((src, dest))
+ delete_batch = [src for src, dest in copy_succeeded]
+ delete_statuses = gcsio.GcsIO().delete_batch(delete_batch)
+ for i, (src, exception) in enumerate(delete_statuses):
+ dest = copy_succeeded[i]
+ if exception:
+ exceptions.append((src, dest, exception))
+ return exceptions
+
+ @staticmethod
+ def copytree(src, dest):
if src.startswith('gs://'):
- assert dst.startswith('gs://'), dst
+ if not dest.startswith('gs://'):
+ raise ValueError('Destination %r must be GCS path.', dest)
assert src.endswith('/'), src
- assert dst.endswith('/'), dst
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.io import gcsio
- gcsio.GcsIO().copytree(src, dst)
+ assert dest.endswith('/'), dest
+ gcsio.GcsIO().copytree(src, dest)
else:
try:
- if os.path.exists(dst):
- shutil.rmtree(dst)
- shutil.copytree(src, dst)
+ if os.path.exists(dest):
+ shutil.rmtree(dest)
+ shutil.copytree(src, dest)
except OSError as err:
raise IOError(err)
@staticmethod
def exists(path):
if path.startswith('gs://'):
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.io import gcsio
return gcsio.GcsIO().exists(path)
else:
return os.path.exists(path)
@@ -511,8 +562,6 @@ class ChannelFactory(object):
@staticmethod
def rmdir(path):
if path.startswith('gs://'):
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.io import gcsio
gcs = gcsio.GcsIO()
if not path.endswith('/'):
path += '/'
@@ -528,8 +577,6 @@ class ChannelFactory(object):
@staticmethod
def rm(path):
if path.startswith('gs://'):
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.io import gcsio
gcsio.GcsIO().delete(path)
else:
try:
@@ -540,8 +587,6 @@ class ChannelFactory(object):
@staticmethod
def glob(path):
if path.startswith('gs://'):
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.io import gcsio
return gcsio.GcsIO().glob(path)
else:
return glob.glob(path)
@@ -554,8 +599,6 @@ class ChannelFactory(object):
path: a string that gives the path of a single file.
"""
if path.startswith('gs://'):
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.io import gcsio
return gcsio.GcsIO().size(path)
else:
return os.path.getsize(path)
@@ -803,7 +846,6 @@ class FileSink(iobase.Sink):
def finalize_write(self, init_result, writer_results):
writer_results = sorted(writer_results)
num_shards = len(writer_results)
- channel_factory = ChannelFactory()
min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS)
num_threads = max(1, min_threads)
@@ -815,55 +857,66 @@ class FileSink(iobase.Sink):
])
rename_ops.append((shard, final_name))
+ batches = []
+ current_batch = []
+ for rename_op in rename_ops:
+ current_batch.append(rename_op)
+ if len(current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE:
+ batches.append(current_batch)
+ current_batch = []
+ if current_batch:
+ batches.append(current_batch)
+
logging.info(
- 'Starting finalize_write threads with num_shards: %d, num_threads: %d',
- num_shards, num_threads)
+ 'Starting finalize_write threads with num_shards: %d, '
+ 'batches: %d, num_threads: %d',
+ num_shards, len(batches), num_threads)
start_time = time.time()
# Use a thread pool for renaming operations.
- def _rename_file(rename_op):
- """_rename_file executes single (old_name, new_name) rename operation."""
- old_name, final_name = rename_op
- try:
- channel_factory.rename(old_name, final_name)
- except IOError as e:
- # May have already been copied.
- try:
- exists = channel_factory.exists(final_name)
- except Exception as exists_e: # pylint: disable=broad-except
- logging.warning('Exception when checking if file %s exists: '
- '%s', final_name, exists_e)
- # Returning original exception after logging the exception from
- # exists() call.
- return (None, e)
- if not exists:
- logging.warning(('IOError in _rename_file. old_name: %s, '
- 'final_name: %s, err: %s'), old_name, final_name, e)
- return (None, e)
- except Exception as e: # pylint: disable=broad-except
- logging.warning(('Exception in _rename_file. old_name: %s, '
- 'final_name: %s, err: %s'), old_name, final_name, e)
- return (None, e)
- return (final_name, None)
+ def _rename_batch(batch):
+ """_rename_batch executes batch rename operations."""
+ exceptions = []
+ exception_infos = ChannelFactory.rename_batch(batch)
+ for src, dest, exception in exception_infos:
+ if exception:
+ should_report = True
+ if isinstance(exception, IOError):
+ # May have already been copied.
+ try:
+ if ChannelFactory.exists(dest):
+ should_report = False
+ except Exception as exists_e: # pylint: disable=broad-except
+ logging.warning('Exception when checking if file %s exists: '
+ '%s', dest, exists_e)
+ if should_report:
+ logging.warning(('Exception in _rename_batch. src: %s, '
+ 'dest: %s, err: %s'), src, dest, exception)
+ exceptions.append(exception)
+ return exceptions
# ThreadPool crashes in old versions of Python (< 2.7.5) if created from a
# child thread. (http://bugs.python.org/issue10015)
if not hasattr(threading.current_thread(), '_children'):
threading.current_thread()._children = weakref.WeakKeyDictionary()
- rename_results = ThreadPool(num_threads).map(_rename_file, rename_ops)
+ exception_batches = ThreadPool(num_threads).map(_rename_batch, batches)
- for final_name, err in rename_results:
- if err:
- logging.warning('Error when processing rename_results: %s', err)
- raise err
- else:
- yield final_name
+ all_exceptions = []
+ for exceptions in exception_batches:
+ if exceptions:
+ all_exceptions += exceptions
+ if all_exceptions:
+ raise Exception('Encountered exceptions in finalize_write: %s',
+ all_exceptions)
+
+ for shard, final_name in rename_ops:
+ yield final_name
logging.info('Renamed %d shards in %.2f seconds.', num_shards,
time.time() - start_time)
try:
- channel_factory.rmdir(init_result)
+ ChannelFactory.rmdir(init_result)
except IOError:
# May have already been removed.
pass
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 7e6e60b..b55fa19 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -848,7 +848,7 @@ class TestFileSink(unittest.TestCase):
res2 = writer2.close()
os.remove(res2)
- with self.assertRaises(IOError):
+ with self.assertRaises(Exception):
list(sink.finalize_write(init_token, [res1, res2]))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index 9fcce5b..1b08994 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -31,6 +31,7 @@ import threading
import traceback
from apitools.base.py.exceptions import HttpError
+from apitools.base.py.batch import BatchApiRequest
import apitools.base.py.transfer as transfer
from apache_beam.internal import auth
@@ -50,6 +51,11 @@ DEFAULT_READ_BUFFER_SIZE = 1024 * 1024
WRITE_CHUNK_SIZE = 8 * 1024 * 1024
+# Maximum number of operations permitted in GcsIO.copy_batch() and
+# GcsIO.delete_batch().
+MAX_BATCH_OPERATION_SIZE = 100
+
+
def parse_gcs_path(gcs_path):
"""Return the bucket and object names of the given gs:// path."""
match = re.match('^gs://([^/]+)/(.+)$', gcs_path)
@@ -167,6 +173,39 @@ class GcsIO(object):
return
raise
+ # We intentionally do not decorate this method with a retry, as retrying is
+ # handled in BatchApiRequest.Execute().
+ def delete_batch(self, paths):
+ """Deletes the objects at the given GCS paths.
+
+ Args:
+ paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
+ not to exceed MAX_BATCH_OPERATION_SIZE in length.
+
+ Returns: List of tuples of (path, exception) in the same order as the paths
+ argument, where exception is None if the operation succeeded or
+ the relevant exception if the operation failed.
+ """
+ batch_request = BatchApiRequest(
+ retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES)
+ for path in paths:
+ bucket, object_path = parse_gcs_path(path)
+ request = storage.StorageObjectsDeleteRequest(
+ bucket=bucket, object=object_path)
+ batch_request.Add(self.client.objects, 'Delete', request)
+ api_calls = batch_request.Execute(self.client._http) # pylint: disable=protected-access
+ result_statuses = []
+ for i, api_call in enumerate(api_calls):
+ path = paths[i]
+ exception = None
+ if api_call.is_error:
+ exception = api_call.exception
+ # Return success when the file doesn't exist anymore for idempotency.
+ if isinstance(exception, HttpError) and exception.status_code == 404:
+ exception = None
+ result_statuses.append((path, exception))
+ return result_statuses
+
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def copy(self, src, dest):
@@ -193,6 +232,45 @@ class GcsIO(object):
raise GcsIOError(errno.ENOENT, 'Source file not found: %s' % src)
raise
+ # We intentionally do not decorate this method with a retry, as retrying is
+ # handled in BatchApiRequest.Execute().
+ def copy_batch(self, src_dest_pairs):
+ """Copies the given GCS object from src to dest.
+
+ Args:
+ src_dest_pairs: list of (src, dest) tuples of gs://<bucket>/<name> files
+ paths to copy from src to dest, not to exceed
+ MAX_BATCH_OPERATION_SIZE in length.
+
+ Returns: List of tuples of (src, dest, exception) in the same order as the
+ src_dest_pairs argument, where exception is None if the operation
+ succeeded or the relevant exception if the operation failed.
+ """
+ batch_request = BatchApiRequest(
+ retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES)
+ for src, dest in src_dest_pairs:
+ src_bucket, src_path = parse_gcs_path(src)
+ dest_bucket, dest_path = parse_gcs_path(dest)
+ request = storage.StorageObjectsCopyRequest(
+ sourceBucket=src_bucket,
+ sourceObject=src_path,
+ destinationBucket=dest_bucket,
+ destinationObject=dest_path)
+ batch_request.Add(self.client.objects, 'Copy', request)
+ api_calls = batch_request.Execute(self.client._http) # pylint: disable=protected-access
+ result_statuses = []
+ for i, api_call in enumerate(api_calls):
+ src, dest = src_dest_pairs[i]
+ exception = None
+ if api_call.is_error:
+ exception = api_call.exception
+ # Translate 404 to the appropriate not found exception.
+ if isinstance(exception, HttpError) and exception.status_code == 404:
+ exception = (
+ GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
+ result_statuses.append((src, dest, exception))
+ return result_statuses
+
# We intentionally do not decorate this method with a retry, since the
# underlying copy and delete operations are already idempotent operations
# protected by retry decorators.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py
index 2e9945a..95c8e58 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -16,6 +16,7 @@
#
"""Tests for Google Cloud Storage client."""
+import errno
import logging
import multiprocessing
import os
@@ -24,11 +25,11 @@ import threading
import unittest
import httplib2
+import mock
from apitools.base.py.exceptions import HttpError
from apache_beam.internal.clients import storage
from apache_beam.io import gcsio
-from mock import patch
class FakeGcsClient(object):
@@ -37,6 +38,8 @@ class FakeGcsClient(object):
def __init__(self):
self.objects = FakeGcsObjects()
+ # Referenced in GcsIO.batch_copy() and GcsIO.batch_delete().
+ self._http = object()
class FakeFile(object):
@@ -165,6 +168,33 @@ class FakeGcsObjects(object):
return result
+class FakeApiCall(object):
+
+ def __init__(self, exception):
+ self.exception = exception
+ self.is_error = exception is not None
+
+
+class FakeBatchApiRequest(object):
+
+ def __init__(self, **unused_kwargs):
+ self.operations = []
+
+ def Add(self, service, method, request): # pylint: disable=invalid-name
+ self.operations.append((service, method, request))
+
+ def Execute(self, unused_http, **unused_kwargs): # pylint: disable=invalid-name
+ api_calls = []
+ for service, method, request in self.operations:
+ exception = None
+ try:
+ getattr(service, method)(request)
+ except Exception as e: # pylint: disable=broad-except
+ exception = e
+ api_calls.append(FakeApiCall(exception))
+ return api_calls
+
+
class TestGCSPathParser(unittest.TestCase):
def test_gcs_path(self):
@@ -201,7 +231,7 @@ class TestGCSIO(unittest.TestCase):
self.assertFalse(self.gcs.exists(file_name + 'xyz'))
self.assertTrue(self.gcs.exists(file_name))
- @patch.object(FakeGcsObjects, 'Get')
+ @mock.patch.object(FakeGcsObjects, 'Get')
def test_exists_failure(self, mock_get):
# Raising an error other than 404. Raising 404 is a valid failure for
# exists() call.
@@ -251,6 +281,37 @@ class TestGCSIO(unittest.TestCase):
self.assertFalse(
gcsio.parse_gcs_path(file_name) in self.client.objects.files)
+ @mock.patch('apache_beam.io.gcsio.BatchApiRequest')
+ def test_delete_batch(self, *unused_args):
+ gcsio.BatchApiRequest = FakeBatchApiRequest
+ file_name_pattern = 'gs://gcsio-test/delete_me_%d'
+ file_size = 1024
+ num_files = 10
+
+ # Test deletion of non-existent files.
+ result = self.gcs.delete_batch(
+ [file_name_pattern % i for i in range(num_files)])
+ self.assertTrue(result)
+ for i, (file_name, exception) in enumerate(result):
+ self.assertEqual(file_name, file_name_pattern % i)
+ self.assertEqual(exception, None)
+ self.assertFalse(self.gcs.exists(file_name_pattern % i))
+
+ # Insert some files.
+ for i in range(num_files):
+ self._insert_random_file(self.client, file_name_pattern % i, file_size)
+
+ # Check files inserted properly.
+ for i in range(num_files):
+ self.assertTrue(self.gcs.exists(file_name_pattern % i))
+
+ # Execute batch delete.
+ self.gcs.delete_batch([file_name_pattern % i for i in range(num_files)])
+
+ # Check files deleted properly.
+ for i in range(num_files):
+ self.assertFalse(self.gcs.exists(file_name_pattern % i))
+
def test_copy(self):
src_file_name = 'gs://gcsio-test/source'
dest_file_name = 'gs://gcsio-test/dest'
@@ -271,6 +332,44 @@ class TestGCSIO(unittest.TestCase):
self.assertRaises(IOError, self.gcs.copy, 'gs://gcsio-test/non-existent',
'gs://gcsio-test/non-existent-destination')
+ @mock.patch('apache_beam.io.gcsio.BatchApiRequest')
+ def test_copy_batch(self, *unused_args):
+ gcsio.BatchApiRequest = FakeBatchApiRequest
+ from_name_pattern = 'gs://gcsio-test/copy_me_%d'
+ to_name_pattern = 'gs://gcsio-test/destination_%d'
+ file_size = 1024
+ num_files = 10
+
+ # Test copy of non-existent files.
+ result = self.gcs.copy_batch(
+ [(from_name_pattern % i, to_name_pattern % i)
+ for i in range(num_files)])
+ self.assertTrue(result)
+ for i, (src, dest, exception) in enumerate(result):
+ self.assertEqual(src, from_name_pattern % i)
+ self.assertEqual(dest, to_name_pattern % i)
+ self.assertTrue(isinstance(exception, IOError))
+ self.assertEqual(exception.errno, errno.ENOENT)
+ self.assertFalse(self.gcs.exists(from_name_pattern % i))
+ self.assertFalse(self.gcs.exists(to_name_pattern % i))
+
+ # Insert some files.
+ for i in range(num_files):
+ self._insert_random_file(self.client, from_name_pattern % i, file_size)
+
+ # Check files inserted properly.
+ for i in range(num_files):
+ self.assertTrue(self.gcs.exists(from_name_pattern % i))
+
+ # Execute batch copy.
+ self.gcs.copy_batch([(from_name_pattern % i, to_name_pattern % i)
+ for i in range(num_files)])
+
+ # Check files copied properly.
+ for i in range(num_files):
+ self.assertTrue(self.gcs.exists(from_name_pattern % i))
+ self.assertTrue(self.gcs.exists(to_name_pattern % i))
+
def test_copytree(self):
src_dir_name = 'gs://gcsio-test/source/'
dest_dir_name = 'gs://gcsio-test/dest/'
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/utils/retry.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py
index 67cbeb2..1f5af88 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -98,6 +98,9 @@ def retry_on_server_errors_and_timeout_filter(exception):
return retry_on_server_errors_filter(exception)
+SERVER_ERROR_OR_TIMEOUT_CODES = [408, 500, 502, 503, 504, 598, 599]
+
+
class Clock(object):
"""A simple clock implementing sleep()."""
[2/2] incubator-beam git commit: Closes #1337
Posted by ro...@apache.org.
Closes #1337
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/66f324b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/66f324b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/66f324b3
Branch: refs/heads/python-sdk
Commit: 66f324b350c50720cc88357bc2d56e2ecd99adc8
Parents: 560fe79 313191e
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Nov 15 08:48:48 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 15 08:48:48 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 177 ++++++++++++++++---------
sdks/python/apache_beam/io/fileio_test.py | 2 +-
sdks/python/apache_beam/io/gcsio.py | 78 +++++++++++
sdks/python/apache_beam/io/gcsio_test.py | 103 +++++++++++++-
sdks/python/apache_beam/utils/retry.py | 3 +
5 files changed, 298 insertions(+), 65 deletions(-)
----------------------------------------------------------------------