You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2022/04/09 23:00:22 UTC

[beam] branch master updated: Re-raise exceptions swallowed in several Python I/O connectors

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 867a5859c4b Re-raise exceptions swallowed in several Python I/O connectors
     new f5a9712ef94 Merge pull request #17329 from chamikaramj/re_raise_swallowed_exceptions
867a5859c4b is described below

commit 867a5859c4b2c9cd27bd398a12ce2f1d062f4f2b
Author: Chamikara Jayalath <ch...@gmail.com>
AuthorDate: Sat Apr 9 00:26:10 2022 -0700

    Re-raise exceptions swallowed in several Python I/O connectors
---
 .../io/gcp/datastore/v1new/datastoreio.py          |  2 +
 .../io/gcp/datastore/v1new/datastoreio_test.py     | 18 +++++----
 sdks/python/apache_beam/io/gcp/gcsio.py            |  1 +
 sdks/python/apache_beam/io/gcp/gcsio_test.py       | 46 ++++++++++++++++++++--
 4 files changed, 57 insertions(+), 10 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
index 4ac2803619d..dea51a0ce89 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
@@ -301,8 +301,10 @@ class ReadFromDatastore(PTransform):
       except (ClientError, GoogleAPICallError) as e:
         # e.code.value contains the numeric http status code.
         service_call_metric.call(e.code.value)
+        raise
       except HttpError as e:
         service_call_metric.call(e)
+        raise
 
 
 class _Mutate(PTransform):
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio_test.py
index 603bcd018c3..8a7977e475c 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio_test.py
@@ -328,13 +328,17 @@ class DatastoreioTest(unittest.TestCase):
       client_query.fetch.side_effect = [
           exceptions.DeadlineExceeded("Deadline exceed")
       ]
-      list(_query_fn.process(self._mock_query))
-      self.verify_read_call_metric(
-          self._PROJECT, self._NAMESPACE, "deadline_exceeded", 1)
-      # Test success
-      client_query.fetch.side_effect = [[]]
-      list(_query_fn.process(self._mock_query))
-      self.verify_read_call_metric(self._PROJECT, self._NAMESPACE, "ok", 1)
+      try:
+        list(_query_fn.process(self._mock_query))
+      except Exception:
+        self.verify_read_call_metric(
+            self._PROJECT, self._NAMESPACE, "deadline_exceeded", 1)
+        # Test success
+        client_query.fetch.side_effect = [[]]
+        list(_query_fn.process(self._mock_query))
+        self.verify_read_call_metric(self._PROJECT, self._NAMESPACE, "ok", 1)
+      else:
+        raise Exception('Excepted  _query_fn.process call to raise an error')
 
   def verify_read_call_metric(self, project_id, namespace, status, count):
     """Check if a metric was recorded for the Datastore IO read API call."""
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index cc740c397e9..0caf4415247 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -642,6 +642,7 @@ class GcsDownloader(Downloader):
       service_call_metric.call('ok')
     except HttpError as e:
       service_call_metric.call(e)
+      raise
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index 1d2836a5614..2f867ffc528 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -103,10 +103,17 @@ class FakeGcsObjects(object):
     # has to persist even past the deletion of the object.
     self.last_generation = {}
     self.list_page_tokens = {}
+    self._fail_when_getting_metadata = []
+    self._fail_when_reading = []
 
-  def add_file(self, f):
+  def add_file(
+      self, f, fail_when_getting_metadata=False, fail_when_reading=False):
     self.files[(f.bucket, f.object)] = f
     self.last_generation[(f.bucket, f.object)] = f.generation
+    if fail_when_getting_metadata:
+      self._fail_when_getting_metadata.append(f)
+    if fail_when_reading:
+      self._fail_when_reading.append(f)
 
   def get_file(self, bucket, obj):
     return self.files.get((bucket, obj), None)
@@ -123,8 +130,12 @@ class FakeGcsObjects(object):
       # Failing with an HTTP 404 if file does not exist.
       raise HttpError({'status': 404}, None, None)
     if download is None:
+      if f in self._fail_when_getting_metadata:
+        raise HttpError({'status': 429}, None, None)
       return f.get_metadata()
     else:
+      if f in self._fail_when_reading:
+        raise HttpError({'status': 429}, None, None)
       stream = download.stream
 
       def get_range_callback(start, end):
@@ -303,7 +314,15 @@ class SampleOptions(object):
     'time', time=mock.MagicMock(side_effect=range(100)), sleep=mock.MagicMock())
 class TestGCSIO(unittest.TestCase):
   def _insert_random_file(
-      self, client, path, size, generation=1, crc32c=None, last_updated=None):
+      self,
+      client,
+      path,
+      size,
+      generation=1,
+      crc32c=None,
+      last_updated=None,
+      fail_when_getting_metadata=False,
+      fail_when_reading=False):
     bucket, name = gcsio.parse_gcs_path(path)
     f = FakeFile(
         bucket,
@@ -312,7 +331,7 @@ class TestGCSIO(unittest.TestCase):
         generation,
         crc32c=crc32c,
         last_updated=last_updated)
-    client.objects.add_file(f)
+    client.objects.add_file(f, fail_when_getting_metadata, fail_when_reading)
     return f
 
   def setUp(self):
@@ -841,6 +860,27 @@ class TestGCSIO(unittest.TestCase):
 
     self.assertEqual(metric_value, 2)
 
+  def test_downloader_fail_non_existent_object(self):
+    file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+    with self.assertRaises(IOError):
+      self.gcs.open(file_name, 'r')
+
+  def test_downloader_fail_when_getting_metadata(self):
+    file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+    file_size = 5 * 1024 * 1024 + 100
+    self._insert_random_file(
+        self.client, file_name, file_size, fail_when_getting_metadata=True)
+    with self.assertRaises(HttpError):
+      self.gcs.open(file_name, 'r')
+
+  def test_downloader_fail_when_reading(self):
+    file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+    file_size = 5 * 1024 * 1024 + 100
+    self._insert_random_file(
+        self.client, file_name, file_size, fail_when_reading=True)
+    with self.assertRaises(HttpError):
+      self.gcs.open(file_name, 'r')
+
   def test_uploader_monitoring_info(self):
     # Clear the process wide metric container.
     MetricsEnvironment.process_wide_container().reset()