You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/15 16:49:43 UTC

[1/2] incubator-beam git commit: Use batch GCS operations during FileSink write finalization

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 560fe79f8 -> 66f324b35


Use batch GCS operations during FileSink write finalization


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/313191e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/313191e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/313191e1

Branch: refs/heads/python-sdk
Commit: 313191e129b884e4e14e9f503a757147d368217c
Parents: 560fe79
Author: Charles Chen <cc...@google.com>
Authored: Thu Nov 10 11:54:08 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 15 08:48:47 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py      | 177 ++++++++++++++++---------
 sdks/python/apache_beam/io/fileio_test.py |   2 +-
 sdks/python/apache_beam/io/gcsio.py       |  78 +++++++++++
 sdks/python/apache_beam/io/gcsio_test.py  | 103 +++++++++++++-
 sdks/python/apache_beam/utils/retry.py    |   3 +
 5 files changed, 298 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 669bfc9..ef20a7c 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -31,6 +31,7 @@ import zlib
 import weakref
 
 from apache_beam import coders
+from apache_beam.io import gcsio
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
@@ -451,8 +452,6 @@ class ChannelFactory(object):
                       'was %s' % type(compression_type))
 
     if path.startswith('gs://'):
-      # pylint: disable=wrong-import-order, wrong-import-position
-      from apache_beam.io import gcsio
       raw_file = gcsio.GcsIO().open(
           path,
           mode,
@@ -470,40 +469,92 @@ class ChannelFactory(object):
     return isinstance(fileobj, _CompressedFile)
 
   @staticmethod
-  def rename(src, dst):
+  def rename(src, dest):
     if src.startswith('gs://'):
-      assert dst.startswith('gs://'), dst
-      # pylint: disable=wrong-import-order, wrong-import-position
-      from apache_beam.io import gcsio
-      gcsio.GcsIO().rename(src, dst)
+      if not dest.startswith('gs://'):
+        raise ValueError('Destination %r must be GCS path.', dest)
+      gcsio.GcsIO().rename(src, dest)
     else:
       try:
-        os.rename(src, dst)
+        os.rename(src, dest)
       except OSError as err:
         raise IOError(err)
 
   @staticmethod
-  def copytree(src, dst):
+  def rename_batch(src_dest_pairs):
+    # Filter out local and GCS operations.
+    local_src_dest_pairs = []
+    gcs_src_dest_pairs = []
+    for src, dest in src_dest_pairs:
+      if src.startswith('gs://'):
+        if not dest.startswith('gs://'):
+          raise ValueError('Destination %r must be GCS path.', dest)
+        gcs_src_dest_pairs.append((src, dest))
+      else:
+        local_src_dest_pairs.append((src, dest))
+
+    # Execute local operations.
+    exceptions = []
+    for src, dest in local_src_dest_pairs:
+      try:
+        ChannelFactory.rename(src, dest)
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions.append((src, dest, e))
+
+    # Execute GCS operations.
+    exceptions += ChannelFactory._rename_gcs_batch(gcs_src_dest_pairs)
+
+    return exceptions
+
+  @staticmethod
+  def _rename_gcs_batch(src_dest_pairs):
+    # Prepare batches.
+    gcs_batches = []
+    gcs_current_batch = []
+    for src, dest in src_dest_pairs:
+      if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE:
+        gcs_batches.append(gcs_current_batch)
+        gcs_current_batch = []
+    if gcs_current_batch:
+      gcs_batches.append(gcs_current_batch)
+
+    # Execute GCS renames if any and return exceptions.
+    exceptions = []
+    for batch in gcs_batches:
+      copy_statuses = gcsio.GcsIO().copy_batch(batch)
+      copy_succeeded = []
+      for src, dest, exception in copy_statuses:
+        if exception:
+          exceptions.append((src, dest, exception))
+        else:
+          copy_succeeded.append((src, dest))
+      delete_batch = [src for src, dest in copy_succeeded]
+      delete_statuses = gcsio.GcsIO().delete_batch(delete_batch)
+      for i, (src, exception) in enumerate(delete_statuses):
+        dest = copy_succeeded[i]
+        if exception:
+          exceptions.append((src, dest, exception))
+    return exceptions
+
+  @staticmethod
+  def copytree(src, dest):
     if src.startswith('gs://'):
-      assert dst.startswith('gs://'), dst
+      if not dest.startswith('gs://'):
+        raise ValueError('Destination %r must be GCS path.', dest)
       assert src.endswith('/'), src
-      assert dst.endswith('/'), dst
-      # pylint: disable=wrong-import-order, wrong-import-position
-      from apache_beam.io import gcsio
-      gcsio.GcsIO().copytree(src, dst)
+      assert dest.endswith('/'), dest
+      gcsio.GcsIO().copytree(src, dest)
     else:
       try:
-        if os.path.exists(dst):
-          shutil.rmtree(dst)
-        shutil.copytree(src, dst)
+        if os.path.exists(dest):
+          shutil.rmtree(dest)
+        shutil.copytree(src, dest)
       except OSError as err:
         raise IOError(err)
 
   @staticmethod
   def exists(path):
     if path.startswith('gs://'):
-      # pylint: disable=wrong-import-order, wrong-import-position
-      from apache_beam.io import gcsio
       return gcsio.GcsIO().exists(path)
     else:
       return os.path.exists(path)
@@ -511,8 +562,6 @@ class ChannelFactory(object):
   @staticmethod
   def rmdir(path):
     if path.startswith('gs://'):
-      # pylint: disable=wrong-import-order, wrong-import-position
-      from apache_beam.io import gcsio
       gcs = gcsio.GcsIO()
       if not path.endswith('/'):
         path += '/'
@@ -528,8 +577,6 @@ class ChannelFactory(object):
   @staticmethod
   def rm(path):
     if path.startswith('gs://'):
-      # pylint: disable=wrong-import-order, wrong-import-position
-      from apache_beam.io import gcsio
       gcsio.GcsIO().delete(path)
     else:
       try:
@@ -540,8 +587,6 @@ class ChannelFactory(object):
   @staticmethod
   def glob(path):
     if path.startswith('gs://'):
-      # pylint: disable=wrong-import-order, wrong-import-position
-      from apache_beam.io import gcsio
       return gcsio.GcsIO().glob(path)
     else:
       return glob.glob(path)
@@ -554,8 +599,6 @@ class ChannelFactory(object):
       path: a string that gives the path of a single file.
     """
     if path.startswith('gs://'):
-      # pylint: disable=wrong-import-order, wrong-import-position
-      from apache_beam.io import gcsio
       return gcsio.GcsIO().size(path)
     else:
       return os.path.getsize(path)
@@ -803,7 +846,6 @@ class FileSink(iobase.Sink):
   def finalize_write(self, init_result, writer_results):
     writer_results = sorted(writer_results)
     num_shards = len(writer_results)
-    channel_factory = ChannelFactory()
     min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS)
     num_threads = max(1, min_threads)
 
@@ -815,55 +857,66 @@ class FileSink(iobase.Sink):
       ])
       rename_ops.append((shard, final_name))
 
+    batches = []
+    current_batch = []
+    for rename_op in rename_ops:
+      current_batch.append(rename_op)
+      if len(current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE:
+        batches.append(current_batch)
+        current_batch = []
+    if current_batch:
+      batches.append(current_batch)
+
     logging.info(
-        'Starting finalize_write threads with num_shards: %d, num_threads: %d',
-        num_shards, num_threads)
+        'Starting finalize_write threads with num_shards: %d, '
+        'batches: %d, num_threads: %d',
+        num_shards, len(batches), num_threads)
     start_time = time.time()
 
     # Use a thread pool for renaming operations.
-    def _rename_file(rename_op):
-      """_rename_file executes single (old_name, new_name) rename operation."""
-      old_name, final_name = rename_op
-      try:
-        channel_factory.rename(old_name, final_name)
-      except IOError as e:
-        # May have already been copied.
-        try:
-          exists = channel_factory.exists(final_name)
-        except Exception as exists_e:  # pylint: disable=broad-except
-          logging.warning('Exception when checking if file %s exists: '
-                          '%s', final_name, exists_e)
-          # Returning original exception after logging the exception from
-          # exists() call.
-          return (None, e)
-        if not exists:
-          logging.warning(('IOError in _rename_file. old_name: %s, '
-                           'final_name: %s, err: %s'), old_name, final_name, e)
-          return (None, e)
-      except Exception as e:  # pylint: disable=broad-except
-        logging.warning(('Exception in _rename_file. old_name: %s, '
-                         'final_name: %s, err: %s'), old_name, final_name, e)
-        return (None, e)
-      return (final_name, None)
+    def _rename_batch(batch):
+      """_rename_batch executes batch rename operations."""
+      exceptions = []
+      exception_infos = ChannelFactory.rename_batch(batch)
+      for src, dest, exception in exception_infos:
+        if exception:
+          should_report = True
+          if isinstance(exception, IOError):
+            # May have already been copied.
+            try:
+              if ChannelFactory.exists(dest):
+                should_report = False
+            except Exception as exists_e:  # pylint: disable=broad-except
+              logging.warning('Exception when checking if file %s exists: '
+                              '%s', dest, exists_e)
+          if should_report:
+            logging.warning(('Exception in _rename_batch. src: %s, '
+                             'dest: %s, err: %s'), src, dest, exception)
+            exceptions.append(exception)
+      return exceptions
 
     # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a
     # child thread. (http://bugs.python.org/issue10015)
     if not hasattr(threading.current_thread(), '_children'):
       threading.current_thread()._children = weakref.WeakKeyDictionary()
-    rename_results = ThreadPool(num_threads).map(_rename_file, rename_ops)
+    exception_batches = ThreadPool(num_threads).map(_rename_batch, batches)
 
-    for final_name, err in rename_results:
-      if err:
-        logging.warning('Error when processing rename_results: %s', err)
-        raise err
-      else:
-        yield final_name
+    all_exceptions = []
+    for exceptions in exception_batches:
+      if exceptions:
+        all_exceptions += exceptions
+    if all_exceptions:
+      raise Exception('Encountered exceptions in finalize_write: %s',
+                      all_exceptions)
+
+    for shard, final_name in rename_ops:
+      yield final_name
 
     logging.info('Renamed %d shards in %.2f seconds.', num_shards,
                  time.time() - start_time)
 
     try:
-      channel_factory.rmdir(init_result)
+      ChannelFactory.rmdir(init_result)
     except IOError:
       # May have already been removed.
       pass

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 7e6e60b..b55fa19 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -848,7 +848,7 @@ class TestFileSink(unittest.TestCase):
     res2 = writer2.close()
 
     os.remove(res2)
-    with self.assertRaises(IOError):
+    with self.assertRaises(Exception):
       list(sink.finalize_write(init_token, [res1, res2]))
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index 9fcce5b..1b08994 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -31,6 +31,7 @@ import threading
 import traceback
 
 from apitools.base.py.exceptions import HttpError
+from apitools.base.py.batch import BatchApiRequest
 import apitools.base.py.transfer as transfer
 
 from apache_beam.internal import auth
@@ -50,6 +51,11 @@ DEFAULT_READ_BUFFER_SIZE = 1024 * 1024
 WRITE_CHUNK_SIZE = 8 * 1024 * 1024
 
 
+# Maximum number of operations permitted in GcsIO.copy_batch() and
+# GcsIO.delete_batch().
+MAX_BATCH_OPERATION_SIZE = 100
+
+
 def parse_gcs_path(gcs_path):
   """Return the bucket and object names of the given gs:// path."""
   match = re.match('^gs://([^/]+)/(.+)$', gcs_path)
@@ -167,6 +173,39 @@ class GcsIO(object):
         return
       raise
 
+  # We intentionally do not decorate this method with a retry, as retrying is
+  # handled in BatchApiRequest.Execute().
+  def delete_batch(self, paths):
+    """Deletes the objects at the given GCS paths.
+
+    Args:
+      paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
+             not to exceed MAX_BATCH_OPERATION_SIZE in length.
+
+    Returns: List of tuples of (path, exception) in the same order as the paths
+             argument, where exception is None if the operation succeeded or
+             the relevant exception if the operation failed.
+    """
+    batch_request = BatchApiRequest(
+        retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES)
+    for path in paths:
+      bucket, object_path = parse_gcs_path(path)
+      request = storage.StorageObjectsDeleteRequest(
+          bucket=bucket, object=object_path)
+      batch_request.Add(self.client.objects, 'Delete', request)
+    api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
+    result_statuses = []
+    for i, api_call in enumerate(api_calls):
+      path = paths[i]
+      exception = None
+      if api_call.is_error:
+        exception = api_call.exception
+        # Return success when the file doesn't exist anymore for idempotency.
+        if isinstance(exception, HttpError) and exception.status_code == 404:
+          exception = None
+      result_statuses.append((path, exception))
+    return result_statuses
+
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def copy(self, src, dest):
@@ -193,6 +232,45 @@ class GcsIO(object):
         raise GcsIOError(errno.ENOENT, 'Source file not found: %s' % src)
       raise
 
+  # We intentionally do not decorate this method with a retry, as retrying is
+  # handled in BatchApiRequest.Execute().
+  def copy_batch(self, src_dest_pairs):
+    """Copies the given GCS object from src to dest.
+
+    Args:
+      src_dest_pairs: list of (src, dest) tuples of gs://<bucket>/<name> files
+                      paths to copy from src to dest, not to exceed
+                      MAX_BATCH_OPERATION_SIZE in length.
+
+    Returns: List of tuples of (src, dest, exception) in the same order as the
+             src_dest_pairs argument, where exception is None if the operation
+             succeeded or the relevant exception if the operation failed.
+    """
+    batch_request = BatchApiRequest(
+        retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES)
+    for src, dest in src_dest_pairs:
+      src_bucket, src_path = parse_gcs_path(src)
+      dest_bucket, dest_path = parse_gcs_path(dest)
+      request = storage.StorageObjectsCopyRequest(
+          sourceBucket=src_bucket,
+          sourceObject=src_path,
+          destinationBucket=dest_bucket,
+          destinationObject=dest_path)
+      batch_request.Add(self.client.objects, 'Copy', request)
+    api_calls = batch_request.Execute(self.client._http)  # pylint: disable=protected-access
+    result_statuses = []
+    for i, api_call in enumerate(api_calls):
+      src, dest = src_dest_pairs[i]
+      exception = None
+      if api_call.is_error:
+        exception = api_call.exception
+        # Translate 404 to the appropriate not found exception.
+        if isinstance(exception, HttpError) and exception.status_code == 404:
+          exception = (
+              GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
+      result_statuses.append((src, dest, exception))
+    return result_statuses
+
   # We intentionally do not decorate this method with a retry, since the
   # underlying copy and delete operations are already idempotent operations
   # protected by retry decorators.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py
index 2e9945a..95c8e58 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -16,6 +16,7 @@
 #
 """Tests for Google Cloud Storage client."""
 
+import errno
 import logging
 import multiprocessing
 import os
@@ -24,11 +25,11 @@ import threading
 import unittest
 
 import httplib2
+import mock
 from apitools.base.py.exceptions import HttpError
 from apache_beam.internal.clients import storage
 
 from apache_beam.io import gcsio
-from mock import patch
 
 
 class FakeGcsClient(object):
@@ -37,6 +38,8 @@ class FakeGcsClient(object):
 
   def __init__(self):
     self.objects = FakeGcsObjects()
+    # Referenced in GcsIO.batch_copy() and GcsIO.batch_delete().
+    self._http = object()
 
 
 class FakeFile(object):
@@ -165,6 +168,33 @@ class FakeGcsObjects(object):
     return result
 
 
+class FakeApiCall(object):
+
+  def __init__(self, exception):
+    self.exception = exception
+    self.is_error = exception is not None
+
+
+class FakeBatchApiRequest(object):
+
+  def __init__(self, **unused_kwargs):
+    self.operations = []
+
+  def Add(self, service, method, request):  # pylint: disable=invalid-name
+    self.operations.append((service, method, request))
+
+  def Execute(self, unused_http, **unused_kwargs):  # pylint: disable=invalid-name
+    api_calls = []
+    for service, method, request in self.operations:
+      exception = None
+      try:
+        getattr(service, method)(request)
+      except Exception as e:  # pylint: disable=broad-except
+        exception = e
+      api_calls.append(FakeApiCall(exception))
+    return api_calls
+
+
 class TestGCSPathParser(unittest.TestCase):
 
   def test_gcs_path(self):
@@ -201,7 +231,7 @@ class TestGCSIO(unittest.TestCase):
     self.assertFalse(self.gcs.exists(file_name + 'xyz'))
     self.assertTrue(self.gcs.exists(file_name))
 
-  @patch.object(FakeGcsObjects, 'Get')
+  @mock.patch.object(FakeGcsObjects, 'Get')
   def test_exists_failure(self, mock_get):
     # Raising an error other than 404. Raising 404 is a valid failure for
     # exists() call.
@@ -251,6 +281,37 @@ class TestGCSIO(unittest.TestCase):
     self.assertFalse(
         gcsio.parse_gcs_path(file_name) in self.client.objects.files)
 
+  @mock.patch('apache_beam.io.gcsio.BatchApiRequest')
+  def test_delete_batch(self, *unused_args):
+    gcsio.BatchApiRequest = FakeBatchApiRequest
+    file_name_pattern = 'gs://gcsio-test/delete_me_%d'
+    file_size = 1024
+    num_files = 10
+
+    # Test deletion of non-existent files.
+    result = self.gcs.delete_batch(
+        [file_name_pattern % i for i in range(num_files)])
+    self.assertTrue(result)
+    for i, (file_name, exception) in enumerate(result):
+      self.assertEqual(file_name, file_name_pattern % i)
+      self.assertEqual(exception, None)
+      self.assertFalse(self.gcs.exists(file_name_pattern % i))
+
+    # Insert some files.
+    for i in range(num_files):
+      self._insert_random_file(self.client, file_name_pattern % i, file_size)
+
+    # Check files inserted properly.
+    for i in range(num_files):
+      self.assertTrue(self.gcs.exists(file_name_pattern % i))
+
+    # Execute batch delete.
+    self.gcs.delete_batch([file_name_pattern % i for i in range(num_files)])
+
+    # Check files deleted properly.
+    for i in range(num_files):
+      self.assertFalse(self.gcs.exists(file_name_pattern % i))
+
   def test_copy(self):
     src_file_name = 'gs://gcsio-test/source'
     dest_file_name = 'gs://gcsio-test/dest'
@@ -271,6 +332,44 @@ class TestGCSIO(unittest.TestCase):
     self.assertRaises(IOError, self.gcs.copy, 'gs://gcsio-test/non-existent',
                       'gs://gcsio-test/non-existent-destination')
 
+  @mock.patch('apache_beam.io.gcsio.BatchApiRequest')
+  def test_copy_batch(self, *unused_args):
+    gcsio.BatchApiRequest = FakeBatchApiRequest
+    from_name_pattern = 'gs://gcsio-test/copy_me_%d'
+    to_name_pattern = 'gs://gcsio-test/destination_%d'
+    file_size = 1024
+    num_files = 10
+
+    # Test copy of non-existent files.
+    result = self.gcs.copy_batch(
+        [(from_name_pattern % i, to_name_pattern % i)
+         for i in range(num_files)])
+    self.assertTrue(result)
+    for i, (src, dest, exception) in enumerate(result):
+      self.assertEqual(src, from_name_pattern % i)
+      self.assertEqual(dest, to_name_pattern % i)
+      self.assertTrue(isinstance(exception, IOError))
+      self.assertEqual(exception.errno, errno.ENOENT)
+      self.assertFalse(self.gcs.exists(from_name_pattern % i))
+      self.assertFalse(self.gcs.exists(to_name_pattern % i))
+
+    # Insert some files.
+    for i in range(num_files):
+      self._insert_random_file(self.client, from_name_pattern % i, file_size)
+
+    # Check files inserted properly.
+    for i in range(num_files):
+      self.assertTrue(self.gcs.exists(from_name_pattern % i))
+
+    # Execute batch copy.
+    self.gcs.copy_batch([(from_name_pattern % i, to_name_pattern % i)
+                         for i in range(num_files)])
+
+    # Check files copied properly.
+    for i in range(num_files):
+      self.assertTrue(self.gcs.exists(from_name_pattern % i))
+      self.assertTrue(self.gcs.exists(to_name_pattern % i))
+
   def test_copytree(self):
     src_dir_name = 'gs://gcsio-test/source/'
     dest_dir_name = 'gs://gcsio-test/dest/'

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/utils/retry.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py
index 67cbeb2..1f5af88 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -98,6 +98,9 @@ def retry_on_server_errors_and_timeout_filter(exception):
   return retry_on_server_errors_filter(exception)
 
 
+SERVER_ERROR_OR_TIMEOUT_CODES = [408, 500, 502, 503, 504, 598, 599]
+
+
 class Clock(object):
   """A simple clock implementing sleep()."""
 


[2/2] incubator-beam git commit: Closes #1337

Posted by ro...@apache.org.
Closes #1337


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/66f324b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/66f324b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/66f324b3

Branch: refs/heads/python-sdk
Commit: 66f324b350c50720cc88357bc2d56e2ecd99adc8
Parents: 560fe79 313191e
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Nov 15 08:48:48 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 15 08:48:48 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py      | 177 ++++++++++++++++---------
 sdks/python/apache_beam/io/fileio_test.py |   2 +-
 sdks/python/apache_beam/io/gcsio.py       |  78 +++++++++++
 sdks/python/apache_beam/io/gcsio_test.py  | 103 +++++++++++++-
 sdks/python/apache_beam/utils/retry.py    |   3 +
 5 files changed, 298 insertions(+), 65 deletions(-)
----------------------------------------------------------------------