You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/10 20:49:03 UTC

[2/3] incubator-beam git commit: Fixes GcsIO.exists() to properly handle files that do not exist.

Fixes GcsIO.exists() to properly handle files that do not exist.

Currently this invocation fails for non existing files instead of returning false.

Updates FileSink.finalize_write() so that we capture and log any transient errors that get thrown at the channel_factory.exists() call.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d8a76a53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d8a76a53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d8a76a53

Branch: refs/heads/python-sdk
Commit: d8a76a53cf60209d13f929ddaec87cd95f6a040a
Parents: d72ffb0
Author: Chamikara Jayalath <ch...@google.com>
Authored: Wed Aug 3 18:25:41 2016 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Mon Aug 8 16:35:25 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py     | 13 ++++++++++---
 sdks/python/apache_beam/io/gcsio.py      |  9 +++++++--
 sdks/python/apache_beam/io/gcsio_test.py | 10 +++++++++-
 3 files changed, 26 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d8a76a53/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index b1e091b..ecacb9f 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -530,15 +530,22 @@ class FileSink(iobase.Sink):
         channel_factory.rename(old_name, final_name)
       except IOError as e:
         # May have already been copied.
-        exists = channel_factory.exists(final_name)
+        try:
+          exists = channel_factory.exists(final_name)
+        except Exception as exists_e:  # pylint: disable=broad-except
+          logging.warning('Exception when invoking channel_factory.exists(): '
+                          '%s', exists_e)
+          # Returning original exception after logging the exception from
+          # exists() call.
+          return (None, e)
         if not exists:
           logging.warning(('IOError in _rename_file. old_name: %s, '
                            'final_name: %s, err: %s'), old_name, final_name, e)
-          return(None, e)
+          return (None, e)
       except Exception as e:  # pylint: disable=broad-except
         logging.warning(('Exception in _rename_file. old_name: %s, '
                          'final_name: %s, err: %s'), old_name, final_name, e)
-        return(None, e)
+        return (None, e)
       return (final_name, None)
 
     # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d8a76a53/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index 88fcfb8..7bb532c 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -234,8 +234,13 @@ class GcsIO(object):
                                                  object=object_path)
       self.client.objects.Get(request)  # metadata
       return True
-    except IOError:
-      return False
+    except HttpError as http_error:
+      if http_error.status_code == 404:
+        # HTTP 404 indicates that the file did not exist
+        return False
+      else:
+        # We re-raise all other exceptions
+        raise
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d8a76a53/sdks/python/apache_beam/io/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py
index 7b15ef3..b909fdd 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -79,7 +79,8 @@ class FakeGcsObjects(object):
   def Get(self, get_request, download=None):  # pylint: disable=invalid-name
     f = self.get_file(get_request.bucket, get_request.object)
     if f is None:
-      raise ValueError('Specified object does not exist.')
+      # Failing with a HTTP 404 if file does not exist.
+      raise HttpError({'status':404}, None, None)
     if download is None:
       return f.get_metadata()
     else:
@@ -189,6 +190,13 @@ class TestGCSIO(unittest.TestCase):
     self.client = FakeGcsClient()
     self.gcs = gcsio.GcsIO(self.client)
 
+  def test_exists(self):
+    file_name = 'gs://gcsio-test/dummy_file'
+    file_size = 1234
+    self._insert_random_file(self.client, file_name, file_size)
+    self.assertFalse(self.gcs.exists(file_name + 'xyz'))
+    self.assertTrue(self.gcs.exists(file_name))
+
   def test_size(self):
     file_name = 'gs://gcsio-test/dummy_file'
     file_size = 1234