You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2022/04/09 23:00:22 UTC
[beam] branch master updated: Re-raise exceptions swallowed in several Python I/O connectors
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 867a5859c4b Re-raise exceptions swallowed in several Python I/O connectors
new f5a9712ef94 Merge pull request #17329 from chamikaramj/re_raise_swallowed_exceptions
867a5859c4b is described below
commit 867a5859c4b2c9cd27bd398a12ce2f1d062f4f2b
Author: Chamikara Jayalath <ch...@gmail.com>
AuthorDate: Sat Apr 9 00:26:10 2022 -0700
Re-raise exceptions swallowed in several Python I/O connectors
---
.../io/gcp/datastore/v1new/datastoreio.py | 2 +
.../io/gcp/datastore/v1new/datastoreio_test.py | 18 +++++----
sdks/python/apache_beam/io/gcp/gcsio.py | 1 +
sdks/python/apache_beam/io/gcp/gcsio_test.py | 46 ++++++++++++++++++++--
4 files changed, 57 insertions(+), 10 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
index 4ac2803619d..dea51a0ce89 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
@@ -301,8 +301,10 @@ class ReadFromDatastore(PTransform):
except (ClientError, GoogleAPICallError) as e:
# e.code.value contains the numeric http status code.
service_call_metric.call(e.code.value)
+ raise
except HttpError as e:
service_call_metric.call(e)
+ raise
class _Mutate(PTransform):
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio_test.py
index 603bcd018c3..8a7977e475c 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio_test.py
@@ -328,13 +328,17 @@ class DatastoreioTest(unittest.TestCase):
client_query.fetch.side_effect = [
exceptions.DeadlineExceeded("Deadline exceed")
]
- list(_query_fn.process(self._mock_query))
- self.verify_read_call_metric(
- self._PROJECT, self._NAMESPACE, "deadline_exceeded", 1)
- # Test success
- client_query.fetch.side_effect = [[]]
- list(_query_fn.process(self._mock_query))
- self.verify_read_call_metric(self._PROJECT, self._NAMESPACE, "ok", 1)
+ try:
+ list(_query_fn.process(self._mock_query))
+ except Exception:
+ self.verify_read_call_metric(
+ self._PROJECT, self._NAMESPACE, "deadline_exceeded", 1)
+ # Test success
+ client_query.fetch.side_effect = [[]]
+ list(_query_fn.process(self._mock_query))
+ self.verify_read_call_metric(self._PROJECT, self._NAMESPACE, "ok", 1)
+ else:
+ raise Exception('Excepted _query_fn.process call to raise an error')
def verify_read_call_metric(self, project_id, namespace, status, count):
"""Check if a metric was recorded for the Datastore IO read API call."""
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index cc740c397e9..0caf4415247 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -642,6 +642,7 @@ class GcsDownloader(Downloader):
service_call_metric.call('ok')
except HttpError as e:
service_call_metric.call(e)
+ raise
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index 1d2836a5614..2f867ffc528 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -103,10 +103,17 @@ class FakeGcsObjects(object):
# has to persist even past the deletion of the object.
self.last_generation = {}
self.list_page_tokens = {}
+ self._fail_when_getting_metadata = []
+ self._fail_when_reading = []
- def add_file(self, f):
+ def add_file(
+ self, f, fail_when_getting_metadata=False, fail_when_reading=False):
self.files[(f.bucket, f.object)] = f
self.last_generation[(f.bucket, f.object)] = f.generation
+ if fail_when_getting_metadata:
+ self._fail_when_getting_metadata.append(f)
+ if fail_when_reading:
+ self._fail_when_reading.append(f)
def get_file(self, bucket, obj):
return self.files.get((bucket, obj), None)
@@ -123,8 +130,12 @@ class FakeGcsObjects(object):
# Failing with an HTTP 404 if file does not exist.
raise HttpError({'status': 404}, None, None)
if download is None:
+ if f in self._fail_when_getting_metadata:
+ raise HttpError({'status': 429}, None, None)
return f.get_metadata()
else:
+ if f in self._fail_when_reading:
+ raise HttpError({'status': 429}, None, None)
stream = download.stream
def get_range_callback(start, end):
@@ -303,7 +314,15 @@ class SampleOptions(object):
'time', time=mock.MagicMock(side_effect=range(100)), sleep=mock.MagicMock())
class TestGCSIO(unittest.TestCase):
def _insert_random_file(
- self, client, path, size, generation=1, crc32c=None, last_updated=None):
+ self,
+ client,
+ path,
+ size,
+ generation=1,
+ crc32c=None,
+ last_updated=None,
+ fail_when_getting_metadata=False,
+ fail_when_reading=False):
bucket, name = gcsio.parse_gcs_path(path)
f = FakeFile(
bucket,
@@ -312,7 +331,7 @@ class TestGCSIO(unittest.TestCase):
generation,
crc32c=crc32c,
last_updated=last_updated)
- client.objects.add_file(f)
+ client.objects.add_file(f, fail_when_getting_metadata, fail_when_reading)
return f
def setUp(self):
@@ -841,6 +860,27 @@ class TestGCSIO(unittest.TestCase):
self.assertEqual(metric_value, 2)
+ def test_downloader_fail_non_existent_object(self):
+ file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+ with self.assertRaises(IOError):
+ self.gcs.open(file_name, 'r')
+
+ def test_downloader_fail_when_getting_metadata(self):
+ file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+ file_size = 5 * 1024 * 1024 + 100
+ self._insert_random_file(
+ self.client, file_name, file_size, fail_when_getting_metadata=True)
+ with self.assertRaises(HttpError):
+ self.gcs.open(file_name, 'r')
+
+ def test_downloader_fail_when_reading(self):
+ file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
+ file_size = 5 * 1024 * 1024 + 100
+ self._insert_random_file(
+ self.client, file_name, file_size, fail_when_reading=True)
+ with self.assertRaises(HttpError):
+ self.gcs.open(file_name, 'r')
+
def test_uploader_monitoring_info(self):
# Clear the process wide metric container.
MetricsEnvironment.process_wide_container().reset()