You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/09 16:52:23 UTC

[beam] branch master updated: Merge pull request #17544 from [BEAM-14415] Exception handling tests for BQIO streaming inserts in Python

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3689dc5fbeb Merge pull request #17544 from [BEAM-14415] Exception handling tests for BQIO streaming inserts in Python
3689dc5fbeb is described below

commit 3689dc5fbeb31b352a9c6d98b7936bd25a5d3a4b
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Mon May 9 12:52:15 2022 -0400

    Merge pull request #17544 from [BEAM-14415] Exception handling tests for BQIO streaming inserts in Python
    
    * Exception handling tests for BQIO streaming inserts in Python
    
    * Adding non-retriable tests
    
    * Added streaming retries
    
    * Added streaming retries
    
    * fix lint
    
    * fix formatter
---
 sdks/python/apache_beam/io/gcp/bigquery_test.py | 263 ++++++++++++++++++++++++
 sdks/python/apache_beam/utils/retry.py          |  22 ++
 2 files changed, 285 insertions(+)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index a4fd5053df0..74722e4e538 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -34,6 +34,7 @@ import hamcrest as hc
 import mock
 import pytest
 import pytz
+import requests
 from parameterized import param
 from parameterized import parameterized
 
@@ -72,15 +73,19 @@ from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
+from apache_beam.utils import retry
 
 # Protect against environments where bigquery library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
+
 try:
   from apitools.base.py.exceptions import HttpError
   from google.cloud import bigquery as gcp_bigquery
+  from google.api_core import exceptions
 except ImportError:
   gcp_bigquery = None
   HttpError = None
+  exceptions = None
 # pylint: enable=wrong-import-order, wrong-import-position
 
 _LOGGER = logging.getLogger(__name__)
@@ -833,6 +838,264 @@ class TestWriteToBigQuery(unittest.TestCase):
               test_client=client))
 
 
+class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
+
+  # Using https://cloud.google.com/bigquery/docs/error-messages and
+  # https://googleapis.dev/python/google-api-core/latest/_modules/google
+  #    /api_core/exceptions.html
+  # to determine error types and messages to try for retriables.
+  @parameterized.expand([
+      param(
+          exception_type=exceptions.Forbidden,
+          error_reason='rateLimitExceeded'),
+      param(
+          exception_type=exceptions.DeadlineExceeded,
+          error_reason='somereason'),
+      param(
+          exception_type=exceptions.ServiceUnavailable,
+          error_reason='backendError'),
+      param(
+          exception_type=exceptions.InternalServerError,
+          error_reason='internalError'),
+      param(
+          exception_type=exceptions.InternalServerError,
+          error_reason='backendError'),
+  ])
+  @mock.patch('time.sleep')
+  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+  def test_insert_all_retries_if_structured_retriable(
+      self,
+      mock_send,
+      unused_mock_sleep,
+      exception_type=None,
+      error_reason=None):
+    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
+    mock_send.side_effect = [
+        exception_type(
+            'some retriable exception', errors=[{
+                'reason': error_reason
+            }]),
+        exception_type(
+            'some retriable exception', errors=[{
+                'reason': error_reason
+            }]),
+        exception_type(
+            'some retriable exception', errors=[{
+                'reason': error_reason
+            }]),
+        exception_type(
+            'some retriable exception', errors=[{
+                'reason': error_reason
+            }]),
+    ]
+
+    with self.assertRaises(Exception) as exc:
+      with beam.Pipeline() as p:
+        _ = (
+            p
+            | beam.Create([{
+                'columnA': 'value1'
+            }])
+            | WriteToBigQuery(
+                table='project:dataset.table',
+                schema={
+                    'fields': [{
+                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                    }]
+                },
+                create_disposition='CREATE_NEVER',
+                method='STREAMING_INSERTS'))
+    self.assertEqual(4, mock_send.call_count)
+    self.assertIn('some retriable exception', exc.exception.args[0])
+
+  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
+  #   /api_core/exceptions.html
+  # to determine error types and messages to try for retriables.
+  @parameterized.expand([
+      param(
+          exception_type=requests.exceptions.ConnectionError,
+          error_message='some connection error'),
+      param(
+          exception_type=requests.exceptions.Timeout,
+          error_message='some timeout error'),
+      param(
+          exception_type=ConnectionError,
+          error_message='some py connection error'),
+      param(
+          exception_type=exceptions.BadGateway,
+          error_message='some badgateway error'),
+  ])
+  @mock.patch('time.sleep')
+  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+  def test_insert_all_retries_if_unstructured_retriable(
+      self,
+      mock_send,
+      unused_mock_sleep,
+      exception_type=None,
+      error_message=None):
+    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+    mock_send.side_effect = [
+        exception_type(error_message),
+        exception_type(error_message),
+        exception_type(error_message),
+        exception_type(error_message),
+    ]
+
+    with self.assertRaises(Exception) as exc:
+      with beam.Pipeline() as p:
+        _ = (
+            p
+            | beam.Create([{
+                'columnA': 'value1'
+            }])
+            | WriteToBigQuery(
+                table='project:dataset.table',
+                schema={
+                    'fields': [{
+                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                    }]
+                },
+                create_disposition='CREATE_NEVER',
+                method='STREAMING_INSERTS'))
+    self.assertEqual(4, mock_send.call_count)
+    self.assertIn(error_message, exc.exception.args[0])
+
+  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
+  #   /api_core/exceptions.html
+  # to determine error types and messages to try for retriables.
+  @parameterized.expand([
+      param(
+          exception_type=retry.PermanentException,
+          error_args=('nonretriable', )),
+      param(
+          exception_type=exceptions.BadRequest,
+          error_args=(
+              'forbidden morbidden', [{
+                  'reason': 'nonretriablereason'
+              }])),
+      param(
+          exception_type=exceptions.BadRequest,
+          error_args=('BAD REQUEST!', [{
+              'reason': 'nonretriablereason'
+          }])),
+      param(
+          exception_type=exceptions.MethodNotAllowed,
+          error_args=(
+              'method not allowed!', [{
+                  'reason': 'nonretriablereason'
+              }])),
+      param(
+          exception_type=exceptions.MethodNotAllowed,
+          error_args=('method not allowed!', 'args')),
+      param(exception_type=exceptions.Unknown, error_args=('unknown!', 'args')),
+      param(
+          exception_type=exceptions.Aborted, error_args=('abortet!', 'abort')),
+  ])
+  @mock.patch('time.sleep')
+  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+  def test_insert_all_unretriable_errors(
+      self, mock_send, unused_mock_sleep, exception_type=None, error_args=None):
+    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+    mock_send.side_effect = [
+        exception_type(*error_args),
+        exception_type(*error_args),
+        exception_type(*error_args),
+        exception_type(*error_args),
+    ]
+
+    with self.assertRaises(Exception):
+      with beam.Pipeline() as p:
+        _ = (
+            p
+            | beam.Create([{
+                'columnA': 'value1'
+            }])
+            | WriteToBigQuery(
+                table='project:dataset.table',
+                schema={
+                    'fields': [{
+                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                    }]
+                },
+                create_disposition='CREATE_NEVER',
+                method='STREAMING_INSERTS'))
+    self.assertEqual(1, mock_send.call_count)
+
+  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
+  #    /api_core/exceptions.html
+  # to determine error types and messages to try for retriables.
+  @parameterized.expand([
+      param(
+          exception_type=retry.PermanentException,
+          error_args=('nonretriable', )),
+      param(
+          exception_type=exceptions.BadRequest,
+          error_args=(
+              'forbidden morbidden', [{
+                  'reason': 'nonretriablereason'
+              }])),
+      param(
+          exception_type=exceptions.BadRequest,
+          error_args=('BAD REQUEST!', [{
+              'reason': 'nonretriablereason'
+          }])),
+      param(
+          exception_type=exceptions.MethodNotAllowed,
+          error_args=(
+              'method not allowed!', [{
+                  'reason': 'nonretriablereason'
+              }])),
+      param(
+          exception_type=exceptions.MethodNotAllowed,
+          error_args=('method not allowed!', 'args')),
+      param(exception_type=exceptions.Unknown, error_args=('unknown!', 'args')),
+      param(
+          exception_type=exceptions.Aborted, error_args=('abortet!', 'abort')),
+      param(
+          exception_type=requests.exceptions.ConnectionError,
+          error_args=('some connection error', )),
+      param(
+          exception_type=requests.exceptions.Timeout,
+          error_args=('some timeout error', )),
+      param(
+          exception_type=ConnectionError,
+          error_args=('some py connection error', )),
+      param(
+          exception_type=exceptions.BadGateway,
+          error_args=('some badgateway error', )),
+  ])
+  @mock.patch('time.sleep')
+  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+  def test_insert_all_unretriable_errors_streaming(
+      self, mock_send, unused_mock_sleep, exception_type=None, error_args=None):
+    # In this test, a STREAMING pipeline will retry ALL errors, and never throw
+    # an exception.
+    mock_send.side_effect = [
+        exception_type(*error_args),
+        exception_type(*error_args),
+        []  # Errors thrown twice, and then succeeded
+    ]
+
+    opt = StandardOptions()
+    opt.streaming = True
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      _ = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS'))
+    self.assertEqual(3, mock_send.call_count)
+
+
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class BigQueryStreamingInsertTransformTests(unittest.TestCase):
   def test_dofn_client_process_performs_batching(self):
diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py
index 4784de1e599..695a718278f 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -41,8 +41,10 @@ from apache_beam.io.filesystem import BeamIOError
 # TODO(sourabhbajaj): Remove the GCP specific error code to a submodule
 try:
   from apitools.base.py.exceptions import HttpError
+  from google.api_core.exceptions import GoogleAPICallError
 except ImportError as e:
   HttpError = None
+  GoogleAPICallError = None
 
 # Protect against environments where aws tools are not available.
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -123,6 +125,14 @@ def retry_on_server_errors_filter(exception):
   """Filter allowing retries on server errors and non-HttpErrors."""
   if (HttpError is not None) and isinstance(exception, HttpError):
     return exception.status_code >= 500
+  if GoogleAPICallError is not None and isinstance(exception,
+                                                   GoogleAPICallError):
+    if exception.code >= 500:  # 500 are internal server errors
+      return True
+    else:
+      # If we have a GoogleAPICallError with a code that doesn't
+      # indicate a server error, we do not need to retry.
+      return False
   if (S3ClientError is not None) and isinstance(exception, S3ClientError):
     return exception.code is None or exception.code >= 500
   return not isinstance(exception, PermanentException)
@@ -134,6 +144,10 @@ def retry_on_server_errors_and_notfound_filter(exception):
   if HttpError is not None and isinstance(exception, HttpError):
     if exception.status_code == 404:  # 404 Not Found
       return True
+  if GoogleAPICallError is not None and isinstance(exception,
+                                                   GoogleAPICallError):
+    if exception.code == 404:  # 404 Not found
+      return True
   return retry_on_server_errors_filter(exception)
 
 
@@ -141,6 +155,10 @@ def retry_on_server_errors_and_timeout_filter(exception):
   if HttpError is not None and isinstance(exception, HttpError):
     if exception.status_code == 408:  # 408 Request Timeout
       return True
+  if GoogleAPICallError is not None and isinstance(exception,
+                                                   GoogleAPICallError):
+    if exception.code == 408:  # 408 Request Timeout
+      return True
   if S3ClientError is not None and isinstance(exception, S3ClientError):
     if exception.code == 408:  # 408 Request Timeout
       return True
@@ -155,6 +173,10 @@ def retry_on_server_errors_timeout_or_quota_issues_filter(exception):
   if HttpError is not None and isinstance(exception, HttpError):
     if exception.status_code == 403:
       return True
+  if GoogleAPICallError is not None and isinstance(exception,
+                                                   GoogleAPICallError):
+    if exception.code == 403:
+      return True
   if S3ClientError is not None and isinstance(exception, S3ClientError):
     if exception.code == 403:
       return True