You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/09 16:52:23 UTC
[beam] branch master updated: Merge pull request #17544 from [BEAM-14415] Exception handling tests for BQIO streaming inserts in Python
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3689dc5fbeb Merge pull request #17544 from [BEAM-14415] Exception handling tests for BQIO streaming inserts in Python
3689dc5fbeb is described below
commit 3689dc5fbeb31b352a9c6d98b7936bd25a5d3a4b
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Mon May 9 12:52:15 2022 -0400
Merge pull request #17544 from [BEAM-14415] Exception handling tests for BQIO streaming inserts in Python
* Exception handling tests for BQIO streaming inserts in Python
* Adding non-retriable tests
* Added streaming retries
* Added streaming retries
* fix lint
* fix formatter
---
sdks/python/apache_beam/io/gcp/bigquery_test.py | 263 ++++++++++++++++++++++++
sdks/python/apache_beam/utils/retry.py | 22 ++
2 files changed, 285 insertions(+)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index a4fd5053df0..74722e4e538 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -34,6 +34,7 @@ import hamcrest as hc
import mock
import pytest
import pytz
+import requests
from parameterized import param
from parameterized import parameterized
@@ -72,15 +73,19 @@ from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
+from apache_beam.utils import retry
# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
+
try:
from apitools.base.py.exceptions import HttpError
from google.cloud import bigquery as gcp_bigquery
+ from google.api_core import exceptions
except ImportError:
gcp_bigquery = None
HttpError = None
+ exceptions = None
# pylint: enable=wrong-import-order, wrong-import-position
_LOGGER = logging.getLogger(__name__)
@@ -833,6 +838,264 @@ class TestWriteToBigQuery(unittest.TestCase):
test_client=client))
+class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
+
+ # Using https://cloud.google.com/bigquery/docs/error-messages and
+ # https://googleapis.dev/python/google-api-core/latest/_modules/google
+ # /api_core/exceptions.html
+ # to determine error types and messages to try for retriables.
+ @parameterized.expand([
+ param(
+ exception_type=exceptions.Forbidden,
+ error_reason='rateLimitExceeded'),
+ param(
+ exception_type=exceptions.DeadlineExceeded,
+ error_reason='somereason'),
+ param(
+ exception_type=exceptions.ServiceUnavailable,
+ error_reason='backendError'),
+ param(
+ exception_type=exceptions.InternalServerError,
+ error_reason='internalError'),
+ param(
+ exception_type=exceptions.InternalServerError,
+ error_reason='backendError'),
+ ])
+ @mock.patch('time.sleep')
+ @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+ def test_insert_all_retries_if_structured_retriable(
+ self,
+ mock_send,
+ unused_mock_sleep,
+ exception_type=None,
+ error_reason=None):
+ # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
+ mock_send.side_effect = [
+ exception_type(
+ 'some retriable exception', errors=[{
+ 'reason': error_reason
+ }]),
+ exception_type(
+ 'some retriable exception', errors=[{
+ 'reason': error_reason
+ }]),
+ exception_type(
+ 'some retriable exception', errors=[{
+ 'reason': error_reason
+ }]),
+ exception_type(
+ 'some retriable exception', errors=[{
+ 'reason': error_reason
+ }]),
+ ]
+
+ with self.assertRaises(Exception) as exc:
+ with beam.Pipeline() as p:
+ _ = (
+ p
+ | beam.Create([{
+ 'columnA': 'value1'
+ }])
+ | WriteToBigQuery(
+ table='project:dataset.table',
+ schema={
+ 'fields': [{
+ 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+ }]
+ },
+ create_disposition='CREATE_NEVER',
+ method='STREAMING_INSERTS'))
+ self.assertEqual(4, mock_send.call_count)
+ self.assertIn('some retriable exception', exc.exception.args[0])
+
+ # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
+ # /api_core/exceptions.html
+ # to determine error types and messages to try for retriables.
+ @parameterized.expand([
+ param(
+ exception_type=requests.exceptions.ConnectionError,
+ error_message='some connection error'),
+ param(
+ exception_type=requests.exceptions.Timeout,
+ error_message='some timeout error'),
+ param(
+ exception_type=ConnectionError,
+ error_message='some py connection error'),
+ param(
+ exception_type=exceptions.BadGateway,
+ error_message='some badgateway error'),
+ ])
+ @mock.patch('time.sleep')
+ @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+ def test_insert_all_retries_if_unstructured_retriable(
+ self,
+ mock_send,
+ unused_mock_sleep,
+ exception_type=None,
+ error_message=None):
+ # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+ mock_send.side_effect = [
+ exception_type(error_message),
+ exception_type(error_message),
+ exception_type(error_message),
+ exception_type(error_message),
+ ]
+
+ with self.assertRaises(Exception) as exc:
+ with beam.Pipeline() as p:
+ _ = (
+ p
+ | beam.Create([{
+ 'columnA': 'value1'
+ }])
+ | WriteToBigQuery(
+ table='project:dataset.table',
+ schema={
+ 'fields': [{
+ 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+ }]
+ },
+ create_disposition='CREATE_NEVER',
+ method='STREAMING_INSERTS'))
+ self.assertEqual(4, mock_send.call_count)
+ self.assertIn(error_message, exc.exception.args[0])
+
+ # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
+ # /api_core/exceptions.html
+ # to determine error types and messages to try for retriables.
+ @parameterized.expand([
+ param(
+ exception_type=retry.PermanentException,
+ error_args=('nonretriable', )),
+ param(
+ exception_type=exceptions.BadRequest,
+ error_args=(
+ 'forbidden morbidden', [{
+ 'reason': 'nonretriablereason'
+ }])),
+ param(
+ exception_type=exceptions.BadRequest,
+ error_args=('BAD REQUEST!', [{
+ 'reason': 'nonretriablereason'
+ }])),
+ param(
+ exception_type=exceptions.MethodNotAllowed,
+ error_args=(
+ 'method not allowed!', [{
+ 'reason': 'nonretriablereason'
+ }])),
+ param(
+ exception_type=exceptions.MethodNotAllowed,
+ error_args=('method not allowed!', 'args')),
+ param(exception_type=exceptions.Unknown, error_args=('unknown!', 'args')),
+ param(
+ exception_type=exceptions.Aborted, error_args=('abortet!', 'abort')),
+ ])
+ @mock.patch('time.sleep')
+ @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+ def test_insert_all_unretriable_errors(
+ self, mock_send, unused_mock_sleep, exception_type=None, error_args=None):
+ # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+ mock_send.side_effect = [
+ exception_type(*error_args),
+ exception_type(*error_args),
+ exception_type(*error_args),
+ exception_type(*error_args),
+ ]
+
+ with self.assertRaises(Exception):
+ with beam.Pipeline() as p:
+ _ = (
+ p
+ | beam.Create([{
+ 'columnA': 'value1'
+ }])
+ | WriteToBigQuery(
+ table='project:dataset.table',
+ schema={
+ 'fields': [{
+ 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+ }]
+ },
+ create_disposition='CREATE_NEVER',
+ method='STREAMING_INSERTS'))
+ self.assertEqual(1, mock_send.call_count)
+
+ # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
+ # /api_core/exceptions.html
+ # to determine error types and messages to try for retriables.
+ @parameterized.expand([
+ param(
+ exception_type=retry.PermanentException,
+ error_args=('nonretriable', )),
+ param(
+ exception_type=exceptions.BadRequest,
+ error_args=(
+ 'forbidden morbidden', [{
+ 'reason': 'nonretriablereason'
+ }])),
+ param(
+ exception_type=exceptions.BadRequest,
+ error_args=('BAD REQUEST!', [{
+ 'reason': 'nonretriablereason'
+ }])),
+ param(
+ exception_type=exceptions.MethodNotAllowed,
+ error_args=(
+ 'method not allowed!', [{
+ 'reason': 'nonretriablereason'
+ }])),
+ param(
+ exception_type=exceptions.MethodNotAllowed,
+ error_args=('method not allowed!', 'args')),
+ param(exception_type=exceptions.Unknown, error_args=('unknown!', 'args')),
+ param(
+ exception_type=exceptions.Aborted, error_args=('abortet!', 'abort')),
+ param(
+ exception_type=requests.exceptions.ConnectionError,
+ error_args=('some connection error', )),
+ param(
+ exception_type=requests.exceptions.Timeout,
+ error_args=('some timeout error', )),
+ param(
+ exception_type=ConnectionError,
+ error_args=('some py connection error', )),
+ param(
+ exception_type=exceptions.BadGateway,
+ error_args=('some badgateway error', )),
+ ])
+ @mock.patch('time.sleep')
+ @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+ def test_insert_all_unretriable_errors_streaming(
+ self, mock_send, unused_mock_sleep, exception_type=None, error_args=None):
+ # In this test, a STREAMING pipeline will retry ALL errors, and never throw
+ # an exception.
+ mock_send.side_effect = [
+ exception_type(*error_args),
+ exception_type(*error_args),
+ [] # Errors thrown twice, and then succeeded
+ ]
+
+ opt = StandardOptions()
+ opt.streaming = True
+ with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+ _ = (
+ p
+ | beam.Create([{
+ 'columnA': 'value1'
+ }])
+ | WriteToBigQuery(
+ table='project:dataset.table',
+ schema={
+ 'fields': [{
+ 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+ }]
+ },
+ create_disposition='CREATE_NEVER',
+ method='STREAMING_INSERTS'))
+ self.assertEqual(3, mock_send.call_count)
+
+
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class BigQueryStreamingInsertTransformTests(unittest.TestCase):
def test_dofn_client_process_performs_batching(self):
diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py
index 4784de1e599..695a718278f 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -41,8 +41,10 @@ from apache_beam.io.filesystem import BeamIOError
# TODO(sourabhbajaj): Remove the GCP specific error code to a submodule
try:
from apitools.base.py.exceptions import HttpError
+ from google.api_core.exceptions import GoogleAPICallError
except ImportError as e:
HttpError = None
+ GoogleAPICallError = None
# Protect against environments where aws tools are not available.
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -123,6 +125,14 @@ def retry_on_server_errors_filter(exception):
"""Filter allowing retries on server errors and non-HttpErrors."""
if (HttpError is not None) and isinstance(exception, HttpError):
return exception.status_code >= 500
+ if GoogleAPICallError is not None and isinstance(exception,
+ GoogleAPICallError):
+ if exception.code >= 500: # 500 are internal server errors
+ return True
+ else:
+ # If we have a GoogleAPICallError with a code that doesn't
+ # indicate a server error, we do not need to retry.
+ return False
if (S3ClientError is not None) and isinstance(exception, S3ClientError):
return exception.code is None or exception.code >= 500
return not isinstance(exception, PermanentException)
@@ -134,6 +144,10 @@ def retry_on_server_errors_and_notfound_filter(exception):
if HttpError is not None and isinstance(exception, HttpError):
if exception.status_code == 404: # 404 Not Found
return True
+ if GoogleAPICallError is not None and isinstance(exception,
+ GoogleAPICallError):
+ if exception.code == 404: # 404 Not found
+ return True
return retry_on_server_errors_filter(exception)
@@ -141,6 +155,10 @@ def retry_on_server_errors_and_timeout_filter(exception):
if HttpError is not None and isinstance(exception, HttpError):
if exception.status_code == 408: # 408 Request Timeout
return True
+ if GoogleAPICallError is not None and isinstance(exception,
+ GoogleAPICallError):
+ if exception.code == 408: # 408 Request Timeout
+ return True
if S3ClientError is not None and isinstance(exception, S3ClientError):
if exception.code == 408: # 408 Request Timeout
return True
@@ -155,6 +173,10 @@ def retry_on_server_errors_timeout_or_quota_issues_filter(exception):
if HttpError is not None and isinstance(exception, HttpError):
if exception.status_code == 403:
return True
+ if GoogleAPICallError is not None and isinstance(exception,
+ GoogleAPICallError):
+ if exception.code == 403:
+ return True
if S3ClientError is not None and isinstance(exception, S3ClientError):
if exception.code == 403:
return True