You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "ajdub508 (via GitHub)" <gi...@apache.org> on 2023/08/22 10:22:18 UTC

[GitHub] [beam] ajdub508 opened a new pull request, #28091: Return errors for insert_rows_json execptions (#21080)

ajdub508 opened a new pull request, #28091:
URL: https://github.com/apache/beam/pull/28091

   Would like to get feedback on this approach. Marking the MR as draft because more tests would need to be added and I haven't run existing tests yet, either.
   
   The purpose of this change is to get exceptions caught and re-raised [here](https://github.com/apache/beam/blob/26b77723445cf383365098b296b9db77409af94c/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L732-L740) to output rows to failed_rows rather than just causing a pipeline error. Outputting to failed_rows tag is necessary to avoid streaming pipelines that infinitely retry messages that result in these pipeline errors.
   
   These exceptions cause infinite retries because [this self.bigquery_wrapper.insert_rows call](https://github.com/apache/beam/blob/26b77723445cf383365098b296b9db77409af94c/sdks/python/apache_beam/io/gcp/bigquery.py#L1635-L1643) ends up raising an exception rather than returning errors due to the exception raised by [self.gcp_bq_client.insert_rows_json](https://github.com/apache/beam/blob/26b77723445cf383365098b296b9db77409af94c/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L720) that prevents the errors list from being instantiated. The exception will bubble up (raised [here](https://github.com/apache/beam/blob/26b77723445cf383365098b296b9db77409af94c/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L732-L740)) to the [_flush_batch method](https://github.com/apache/beam/blob/26b77723445cf383365098b296b9db77409af94c/sdks/python/apache_beam/io/gcp/bigquery.py#L1645-L1653) where it is unhandled and will produce a pipeline error.
   
   Getting those rows to the failed_rows tag may require doing some things that may or may not be acceptable:
   
   - The exception has to be packaged up with the required fields
   - A reason has to be assigned to these errors. I went with 'invalid' to make sure that these types of errors are not retried
   - All rows in a batch are output to the failed_rows tag. Looking back at this comment on [BEAM-12362](https://issues.apache.org/jira/browse/BEAM-12362?focusedCommentId=17347262&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17347262), I think the original intent of [raising these exceptions](https://github.com/apache/beam/blob/26b77723445cf383365098b296b9db77409af94c/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L732-L740) was to get the rows into the failed_rows tag and it seems likely that most/all rows in a batch would be affected by the types of exceptions being caught there.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 merged pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 merged PR #28091:
URL: https://github.com/apache/beam/pull/28091


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1324715997


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),

Review Comment:
   I may be missing something here. I see `max_retries=len(insert_response) - 1`, which means 0 retries for this case right? why is there a second attempt



##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),

Review Comment:
   Likewise, these exceptions are important to keep. There was a large effort last year to add wide exception testing coverage to make sure these exceptions are handled properly.



##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),

Review Comment:
   I'd say the same with the rest of the current exception tests that we have. You can maybe try integrating those exceptions with your tests



##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=True),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_unstructured_retriable(
+  def test_insert_rows_json_exception_retry_never(
       self,
       mock_send,
       unused_mock_sleep,
-      exception_type=None,
-      error_message=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+      exception_type,
+      error_reason,
+      streaming=False):
+    # In this test, a pipeline will never retry caught exception types
+    # since RetryStrategy is set to RETRY_NEVER
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
     mock_send.side_effect = [
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
+        exception_type('some exception', response=mock_response)
     ]
+    opt = StandardOptions()
+    opt.streaming = streaming
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
-            p
-            | beam.Create([{
-                'columnA': 'value1'
-            }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
-                create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn(error_message, exc.exception.args[0])
+      assert_that(failed_values, equal_to(['value1', 'value2']))
+
+    self.assertEqual(1, mock_send.call_count)
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2,
+          streaming=False),
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1,
+          streaming=False),
       param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2,
+          streaming=True),
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1,
+          streaming=True),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_unretriable_errors(
-      self, mock_send, unused_mock_sleep, exception_type=None, error_args=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+  def test_insert_rows_json_exception_retry_on_transient_error(
+      self,
+      mock_send,
+      unused_mock_sleep,
+      exception_type,
+      error_reason,
+      failed_values,
+      expected_call_count,
+      streaming=False):
+    # In this test, a pipeline will only retry caught exception types
+    # with reasons that are not in _NON_TRANSIENT_ERRORS since RetryStrategy is
+    # set to RETRY_ON_TRANSIENT_ERROR
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
     mock_send.side_effect = [
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
+        exception_type('some exception', response=mock_response),
+        # Return no exception and no errors on 2nd call, if there is a 2nd call
+        []
     ]
+    opt = StandardOptions()
+    opt.streaming = streaming
 
-    with self.assertRaises(Exception):
-      with beam.Pipeline() as p:
-        _ = (
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR))
+      failed_values_out = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values_out, equal_to(failed_values))
+    self.assertEqual(expected_call_count, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st run
+      # row 1 sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error succeeds on second attempt, 0 rows sent to failed rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[]),
+  ])
+  def test_insert_rows_json_errors_retry_always(
+      self, insert_response, failed_rows, unused_sleep_mock=None):
+    # In this test, a pipeline will always retry all errors
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        response = insert_response[call_counter]
+        call_counter += 1
+        return response
+
+      client = mock.Mock()
+      client.insert_rows_json = mock.Mock(side_effect=store_callback)
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(1, mock_send.call_count)
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=False),
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          streaming=True),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=True),
+  ])
+  @mock.patch('time.sleep')
+  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+  def test_insert_rows_json_errors_retry_never(
+      self, mock_send, unused_mock_sleep, insert_response, streaming):
+    # In this test, a pipeline will never retry errors since RetryStrategy is
+    # set to RETRY_NEVER
+    mock_send.side_effect = insert_response
+    opt = StandardOptions()
+    opt.streaming = streaming
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values, equal_to(['value1']))
+
+    self.assertEqual(1, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on 1st attempt
+      # transient error succeeds on 2nd attempt, 0 rows sent to failed rows
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st and 2nd attempt
+      # all rows with errors are retried when any row has a retriable error
+      # row 1 sent to failed_rows after final attempt
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_args=('some connection error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [
+                  {
+                      'index': 0, 'errors': [{
+                          'reason': 'invalid'
+                      }]
+                  },
+              ],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_args=('some timeout error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=True),

Review Comment:
   I see this is the same test case as the first one in this sequence. What is the significance of `streaming=True`?



##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),

Review Comment:
   These are valuable test cases, let's not erase them



##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1491,7 +1493,29 @@ class RetryStrategy(object):
   RETRY_NEVER = 'RETRY_NEVER'
   RETRY_ON_TRANSIENT_ERROR = 'RETRY_ON_TRANSIENT_ERROR'
 
-  _NON_TRANSIENT_ERRORS = {'invalid', 'invalidQuery', 'notImplemented'}
+  # Values below may be found in reasons provided either in an
+  # error returned by a client method or by an http response as
+  # defined in google.api_core.exceptions
+  _NON_TRANSIENT_ERRORS = {
+      'invalid',
+      'invalidQuery',
+      'notImplemented',
+      'Moved Permanently',
+      'Not Modified',
+      'Temporary Redirect',
+      'Resume Incomplete',
+      'Bad Request',
+      'Unauthorized',
+      'Forbidden',
+      'Not Found',
+      'Method Not Allowed',
+      'Conflict',
+      'Length Required',
+      'Precondition Failed',
+      'Not Implemented',
+      'Bad Gateway',
+      'Gateway Timeout',
+  }

Review Comment:
   Also error reasons from BQ seem to appear in camel case format (e.g. `notFound` instead of `Not Found`).
   Is it different for GRPC errors? 



##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1491,7 +1493,29 @@ class RetryStrategy(object):
   RETRY_NEVER = 'RETRY_NEVER'
   RETRY_ON_TRANSIENT_ERROR = 'RETRY_ON_TRANSIENT_ERROR'
 
-  _NON_TRANSIENT_ERRORS = {'invalid', 'invalidQuery', 'notImplemented'}
+  # Values below may be found in reasons provided either in an
+  # error returned by a client method or by an http response as
+  # defined in google.api_core.exceptions
+  _NON_TRANSIENT_ERRORS = {
+      'invalid',
+      'invalidQuery',
+      'notImplemented',
+      'Moved Permanently',
+      'Not Modified',
+      'Temporary Redirect',
+      'Resume Incomplete',
+      'Bad Request',
+      'Unauthorized',
+      'Forbidden',
+      'Not Found',
+      'Method Not Allowed',
+      'Conflict',
+      'Length Required',
+      'Precondition Failed',
+      'Not Implemented',
+      'Bad Gateway',
+      'Gateway Timeout',
+  }

Review Comment:
   Thanks for adding this comprehensive list! I'm a little unfamiliar with a few of these so will need another pair of eyes to check.
   
   From a cursory search though, it appears "bad gateway" may be a transient error? Same with "gateway timeout". I would defer to https://cloud.google.com/bigquery/docs/error-messages for a good list to take from.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1693066590

   R: @liferoad - would you be able to provide some feedback or suggest a reviewer?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327119144


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),

Review Comment:
   Looking closer at this - I can add these back, but I still think they might be redundant. I think the tests in this change cover the possible code paths. This is my line of thinking, let me know where I'm off track:
   - All of the existing exception tests are mocking `google.cloud.bigquery.Client.insert_rows_json`
   - There 2 scenarios we need to handle when calling that method with regard to errors/exceptions:
     - the method returns an error list response [here](https://github.com/apache/beam/blob/dbb657371ca6a23b8cd1216c10686a144439c91e/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L720)
     - the method raises a `GoogleAPICallError` exception [here](https://github.com/apache/beam/blob/dbb657371ca6a23b8cd1216c10686a144439c91e/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L732)
   
   In each of those scenarios, a different path can be taken based on whether the reason is in `_NON_TRANSIENT_ERRORS`, so the tests in this change exercise both transient/non-transient to cover both paths.
   
   The tests are re-organized in this change to try to make it clear which path is being tested, with `test_insert_rows_json_exception_*` tests for the first scenario and `test_insert_rows_json_errors_*` tests for the 2nd scenario. Also exercising retry strategies and transient/non-transient errors for each scenario to cover those paths for each scenario.
   
   I don't mean to be a pain on this and certainly may be missing something, so I will defer to project committers on this, just wanted to see if I can understand what I'm missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1696565025

   > @liferoad @Abacn - Just looking for early feedback on the approach. Provided some background in [this comment](https://github.com/apache/beam/pull/28091#issue-1861152554).
   
   I see. Thanks. 
   
   R: @ahmedabu98 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1308057818


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -732,12 +732,23 @@ def _insert_all_rows(
     except (ClientError, GoogleAPICallError) as e:
       # e.code contains the numeric http status code.
       service_call_metric.call(e.code)
-      # Re-reise the exception so that we re-try appropriately.
-      raise
-    except HttpError as e:
+      # Package exception up with required fields
+      # Set reason to 'invalid' to consider these execptions as RetryStrategy._NON_TRANSIENT_ERRORS

Review Comment:
   If these errors are retry-able, the new changes will not be able to insert these rows. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1312219629


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -732,12 +732,23 @@ def _insert_all_rows(
     except (ClientError, GoogleAPICallError) as e:
       # e.code contains the numeric http status code.
       service_call_metric.call(e.code)
-      # Re-reise the exception so that we re-try appropriately.
-      raise
-    except HttpError as e:
+      # Package exception up with required fields
+      # Set reason to 'invalid' to consider these execptions as RetryStrategy._NON_TRANSIENT_ERRORS

Review Comment:
   Thanks. In general, for any batch job, Dataflow will retry the failed work items 4 times; for streaming jobs, it will retry forever. So if one error is retry-able, we could just raise the exception and let Dataflow handle it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1725251364

   @ahmedabu98 Squashed commits, added `CHANGES.md` entry, and added a couple tests ([here](https://github.com/apache/beam/pull/28091/files#diff-947eeae716a2b4f80d7d970b44c78b278ffb01adf7f85fd1ec53ce1739352440R1206) and [here](https://github.com/apache/beam/pull/28091/files#diff-947eeae716a2b4f80d7d970b44c78b278ffb01adf7f85fd1ec53ce1739352440R1251)) for the exceptions you mentioned. Ready to be merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1721095440

   > I think a lot of them probably weren't included originally because they'll actually surface a GoogleAPICallError exception and wouldn't be returned in the error list response of the self.gcp_bq_client.insert_rows_json call
   
   Ahh I see, that clarifies it for me. Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327061603


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),

Review Comment:
   Thanks, will add back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1693068453

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1726021050

   Run Python 3.9 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1712500551

   Restructured tests to cover the 2 types of issues that can arise from insert_rows_json calls:
   1. `errors` list is returned from the function call with error reasons described [here](https://cloud.google.com/bigquery/docs/error-messages)
   2. an exception is thrown and caught with error reasons as described in [api-core exceptions](https://googleapis.dev/python/google-api-core/latest/_modules/google/api_core/exceptions.html)
   
   Separate tests added for each retry strategy for both of those types of issues. Will cleanup any linter or test issues, incorporate any additional feedback, and get ready to move out of draft status next.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327951245


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=True),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_unstructured_retriable(
+  def test_insert_rows_json_exception_retry_never(
       self,
       mock_send,
       unused_mock_sleep,
-      exception_type=None,
-      error_message=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+      exception_type,
+      error_reason,
+      streaming=False):
+    # In this test, a pipeline will never retry caught exception types
+    # since RetryStrategy is set to RETRY_NEVER
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
     mock_send.side_effect = [
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
+        exception_type('some exception', response=mock_response)
     ]
+    opt = StandardOptions()
+    opt.streaming = streaming
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
-            p
-            | beam.Create([{
-                'columnA': 'value1'
-            }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
-                create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn(error_message, exc.exception.args[0])
+      assert_that(failed_values, equal_to(['value1', 'value2']))
+
+    self.assertEqual(1, mock_send.call_count)
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2,
+          streaming=False),
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1,
+          streaming=False),
       param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2,
+          streaming=True),
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1,
+          streaming=True),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_unretriable_errors(
-      self, mock_send, unused_mock_sleep, exception_type=None, error_args=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+  def test_insert_rows_json_exception_retry_on_transient_error(
+      self,
+      mock_send,
+      unused_mock_sleep,
+      exception_type,
+      error_reason,
+      failed_values,
+      expected_call_count,
+      streaming=False):
+    # In this test, a pipeline will only retry caught exception types
+    # with reasons that are not in _NON_TRANSIENT_ERRORS since RetryStrategy is
+    # set to RETRY_ON_TRANSIENT_ERROR
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
     mock_send.side_effect = [
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
+        exception_type('some exception', response=mock_response),
+        # Return no exception and no errors on 2nd call, if there is a 2nd call
+        []
     ]
+    opt = StandardOptions()
+    opt.streaming = streaming
 
-    with self.assertRaises(Exception):
-      with beam.Pipeline() as p:
-        _ = (
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR))
+      failed_values_out = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values_out, equal_to(failed_values))
+    self.assertEqual(expected_call_count, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st run
+      # row 1 sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error succeeds on second attempt, 0 rows sent to failed rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[]),
+  ])
+  def test_insert_rows_json_errors_retry_always(
+      self, insert_response, failed_rows, unused_sleep_mock=None):
+    # In this test, a pipeline will always retry all errors
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        response = insert_response[call_counter]
+        call_counter += 1
+        return response
+
+      client = mock.Mock()
+      client.insert_rows_json = mock.Mock(side_effect=store_callback)
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(1, mock_send.call_count)
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=False),
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          streaming=True),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=True),
+  ])
+  @mock.patch('time.sleep')
+  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+  def test_insert_rows_json_errors_retry_never(
+      self, mock_send, unused_mock_sleep, insert_response, streaming):
+    # In this test, a pipeline will never retry errors since RetryStrategy is
+    # set to RETRY_NEVER
+    mock_send.side_effect = insert_response
+    opt = StandardOptions()
+    opt.streaming = streaming
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values, equal_to(['value1']))
+
+    self.assertEqual(1, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on 1st attempt
+      # transient error succeeds on 2nd attempt, 0 rows sent to failed rows
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st and 2nd attempt
+      # all rows with errors are retried when any row has a retriable error
+      # row 1 sent to failed_rows after final attempt
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_args=('some connection error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [
+                  {
+                      'index': 0, 'errors': [{
+                          'reason': 'invalid'
+                      }]
+                  },
+              ],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_args=('some timeout error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=True),

Review Comment:
   Fixed with [this commit](https://github.com/apache/beam/pull/28091/commits/7c114617d460fd4584e05002f53a41d42f3991f7)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1311933308


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -732,12 +732,23 @@ def _insert_all_rows(
     except (ClientError, GoogleAPICallError) as e:
       # e.code contains the numeric http status code.
       service_call_metric.call(e.code)
-      # Re-reise the exception so that we re-try appropriately.
-      raise
-    except HttpError as e:
+      # Package exception up with required fields
+      # Set reason to 'invalid' to consider these execptions as RetryStrategy._NON_TRANSIENT_ERRORS

Review Comment:
   @liferoad - I'm giving that some thought, will report back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1313806276


##########
sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py:
##########
@@ -454,6 +454,56 @@ def test_big_query_write_insert_errors_reporting(self):
           | 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2])),
           equal_to(bq_result_errors))
 
+
+  @pytest.mark.it_postcommit
+  def test_big_query_write_insert_not_found_errors(self):
+    """
+    Test that NotFound errors returned by beam.io.WriteToBigQuery
+    contain both the failed rows amd the reason for it failing.

Review Comment:
   Thanks for the feedback, @ahmedabu98. I think we could raise 429, 500, and 503 errors to let the pipeline errors be retried indefinitely and route all the others to the failed rows tag. This would be consistent with [errors defined as transient in API Core Retry](https://github.com/googleapis/python-api-core/blob/main/google/api_core/retry.py#L113-L135) and [AIP-194](https://google.aip.dev/194)
   
   Those retryable errors would all be caught in the first except clause. The second clause catching [HttpError](https://github.com/google/apitools/blob/master/apitools/base/py/exceptions.py#L111-L117) type doesn't contain anything that is typically considered transient and they'll go straight to failed rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1313022965


##########
sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py:
##########
@@ -454,6 +454,56 @@ def test_big_query_write_insert_errors_reporting(self):
           | 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2])),
           equal_to(bq_result_errors))
 
+
+  @pytest.mark.it_postcommit
+  def test_big_query_write_insert_not_found_errors(self):
+    """
+    Test that NotFound errors returned by beam.io.WriteToBigQuery
+    contain both the failed rows amd the reason for it failing.

Review Comment:
   nit
   ```suggestion
       contain both the failed rows and the reason for it failing.
   ```



##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -732,12 +732,23 @@ def _insert_all_rows(
     except (ClientError, GoogleAPICallError) as e:
       # e.code contains the numeric http status code.
       service_call_metric.call(e.code)
-      # Re-reise the exception so that we re-try appropriately.
-      raise
-    except HttpError as e:
+      # Package exception up with required fields
+      # Set reason to 'invalid' to consider these execptions as RetryStrategy._NON_TRANSIENT_ERRORS

Review Comment:
   > So if one error is retry-able, we could just raise the exception and let Dataflow handle it
   
   IIUC this is the current behavior already. When this exception is raised, Dataflow sees it as a failed work item and retries it (batch 4 times, streaming infinitely). 



##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -732,12 +732,23 @@ def _insert_all_rows(
     except (ClientError, GoogleAPICallError) as e:
       # e.code contains the numeric http status code.
       service_call_metric.call(e.code)
-      # Re-reise the exception so that we re-try appropriately.
-      raise
-    except HttpError as e:
+      # Package exception up with required fields
+      # Set reason to 'invalid' to consider these execptions as RetryStrategy._NON_TRANSIENT_ERRORS
+      error = {
+        'message': e.message,
+        'reason': 'invalid'

Review Comment:
   I'd prefer a more granular approach than setting all these exceptions to `invalid`. Are we sure these errors will always be non-transient? Perhaps we can have an if-tree that sets an appropriate reason based on the error code. Or maybe better to pass in the actual error reason and update [`_NON_TRANSIENT_ERRORS`](https://github.com/apache/beam/blob/0b4302e5f95f2dc9b6658c13d5d1aa798cfba668/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L1494) to include more reasons.
   
   For example the code `400` indicates an invalid/invalidQuery error so we can say that's non-transient. But if we get something like a `500` or `503` indicates a temporary error and BQ suggests retrying. More info in this [error messages documentation](https://cloud.google.com/bigquery/docs/error-messages).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1696554241

   @liferoad @Abacn - Just looking for early feedback on the approach. Provided some background in [this comment](https://github.com/apache/beam/pull/28091#issue-1861152554).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327951113


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),

Review Comment:
   Added [this commit](https://github.com/apache/beam/pull/28091/commits/7c114617d460fd4584e05002f53a41d42f3991f7) which adds a variety of exceptions and stops setting the streaming option in the tests. I added test cases for the `test_insert_rows_json_exception_retry_on_transient_error` test to cover all expected retriable exceptions and all listed non-transient exceptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327060993


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),

Review Comment:
   > I may be missing something here. I see `max_retries=len(insert_response) - 1`, which means 0 retries for this case right? why is there a second attempt
   
   There are actually 2 responses there with the second one being `None`, but the formatting is not consistent and makes it difficult to read. I'll get that on its own line to make it easier to read.



##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),

Review Comment:
   Thanks, will add back.



##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),

Review Comment:
   Thanks, will add back.



##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=True),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_unstructured_retriable(
+  def test_insert_rows_json_exception_retry_never(
       self,
       mock_send,
       unused_mock_sleep,
-      exception_type=None,
-      error_message=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+      exception_type,
+      error_reason,
+      streaming=False):
+    # In this test, a pipeline will never retry caught exception types
+    # since RetryStrategy is set to RETRY_NEVER
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
     mock_send.side_effect = [
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
+        exception_type('some exception', response=mock_response)
     ]
+    opt = StandardOptions()
+    opt.streaming = streaming
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
-            p
-            | beam.Create([{
-                'columnA': 'value1'
-            }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
-                create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn(error_message, exc.exception.args[0])
+      assert_that(failed_values, equal_to(['value1', 'value2']))
+
+    self.assertEqual(1, mock_send.call_count)
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2,
+          streaming=False),
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1,
+          streaming=False),
       param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2,
+          streaming=True),
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1,
+          streaming=True),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_unretriable_errors(
-      self, mock_send, unused_mock_sleep, exception_type=None, error_args=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+  def test_insert_rows_json_exception_retry_on_transient_error(
+      self,
+      mock_send,
+      unused_mock_sleep,
+      exception_type,
+      error_reason,
+      failed_values,
+      expected_call_count,
+      streaming=False):
+    # In this test, a pipeline will only retry caught exception types
+    # with reasons that are not in _NON_TRANSIENT_ERRORS since RetryStrategy is
+    # set to RETRY_ON_TRANSIENT_ERROR
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
     mock_send.side_effect = [
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
+        exception_type('some exception', response=mock_response),
+        # Return no exception and no errors on 2nd call, if there is a 2nd call
+        []
     ]
+    opt = StandardOptions()
+    opt.streaming = streaming
 
-    with self.assertRaises(Exception):
-      with beam.Pipeline() as p:
-        _ = (
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR))
+      failed_values_out = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values_out, equal_to(failed_values))
+    self.assertEqual(expected_call_count, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st run
+      # row 1 sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error succeeds on second attempt, 0 rows sent to failed rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[]),
+  ])
+  def test_insert_rows_json_errors_retry_always(
+      self, insert_response, failed_rows, unused_sleep_mock=None):
+    # In this test, a pipeline will always retry all errors
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        response = insert_response[call_counter]
+        call_counter += 1
+        return response
+
+      client = mock.Mock()
+      client.insert_rows_json = mock.Mock(side_effect=store_callback)
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(1, mock_send.call_count)
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=False),
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          streaming=True),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=True),
+  ])
+  @mock.patch('time.sleep')
+  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+  def test_insert_rows_json_errors_retry_never(
+      self, mock_send, unused_mock_sleep, insert_response, streaming):
+    # In this test, a pipeline will never retry errors since RetryStrategy is
+    # set to RETRY_NEVER
+    mock_send.side_effect = insert_response
+    opt = StandardOptions()
+    opt.streaming = streaming
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values, equal_to(['value1']))
+
+    self.assertEqual(1, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on 1st attempt
+      # transient error succeeds on 2nd attempt, 0 rows sent to failed rows
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st and 2nd attempt
+      # all rows with errors are retried when any row has a retriable error
+      # row 1 sent to failed_rows after final attempt
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_args=('some connection error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [
+                  {
+                      'index': 0, 'errors': [{
+                          'reason': 'invalid'
+                      }]
+                  },
+              ],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_args=('some timeout error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=True),

Review Comment:
   I noticed that there were some separate tests, some with streaming true/false since the behavior with pipeline errors is different. I added them to demonstrate/check behavior for streaming true/false. Let me know if you think that's unnecessary, I can remove one or the other.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1719112795

   @ahmedabu98 The test failed but looks like the failures were unrelated.
   
   I still need to add to CHANGES.md. Should I add under Bugfixes since the #21080 issue is tagged as a bug or is there a chance this should be considered a breaking change for anyone depending on error handling logic?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1313822761


##########
sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py:
##########
@@ -454,6 +454,56 @@ def test_big_query_write_insert_errors_reporting(self):
           | 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2])),
           equal_to(bq_result_errors))
 
+
+  @pytest.mark.it_postcommit
+  def test_big_query_write_insert_not_found_errors(self):
+    """
+    Test that NotFound errors returned by beam.io.WriteToBigQuery
+    contain both the failed rows amd the reason for it failing.

Review Comment:
   Thinking about this some more along with the suggestion to add to `_NON_TRANSIENT_ERRORS`, should be able to use the response reasons to let [RetryStrategy](https://github.com/apache/beam/blob/26b77723445cf383365098b296b9db77409af94c/sdks/python/apache_beam/io/gcp/bigquery.py#L1650-L1653) do its thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1329158203


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1491,7 +1493,19 @@ class RetryStrategy(object):
   RETRY_NEVER = 'RETRY_NEVER'
   RETRY_ON_TRANSIENT_ERROR = 'RETRY_ON_TRANSIENT_ERROR'
 
-  _NON_TRANSIENT_ERRORS = {'invalid', 'invalidQuery', 'notImplemented'}
+  # Values below may be found in reasons provided either in an
+  # error returned by a client method or by an http response as
+  # defined in google.api_core.exceptions
+  _NON_TRANSIENT_ERRORS = {
+      'invalid',
+      'invalidQuery',
+      'notImplemented',
+      'Bad Request',
+      'Unauthorized',
+      'Forbidden',
+      'Not Found',

Review Comment:
   Not found could be transient, in that it could show up later. I don't think we should treat it as transient though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327144446


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),

Review Comment:
   You're certainly right that fundamentally we should be focusing on those two code paths. I agree with not bringing those tests back, but maybe the exceptions themselves should be spread out across your tests. For example, we can replace one of the many `exceptions.TooManyRequests` in your test cases with  `exceptions.Timeout`, and in another test case replace it with `exceptions.BadGateway` etc. So the tests you wrote can stay where they are, but just the parameters can change to include a wider variety of exceptions.
   
   Although these exceptions ultimately take the same code path in the end, the variety of exceptions tested can guard against future unwanted changes to `_NON_TRANSIENT_ERRORS`. (e.g. someone making `Timeout` a non-transient error)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1717796956

   Run Python 3.8 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327193991


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),

Review Comment:
   Thanks, that's a great point and good idea, I'll switch up some of the exceptions in those tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327196119


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1491,7 +1493,29 @@ class RetryStrategy(object):
   RETRY_NEVER = 'RETRY_NEVER'
   RETRY_ON_TRANSIENT_ERROR = 'RETRY_ON_TRANSIENT_ERROR'
 
-  _NON_TRANSIENT_ERRORS = {'invalid', 'invalidQuery', 'notImplemented'}
+  # Values below may be found in reasons provided either in an
+  # error returned by a client method or by an http response as
+  # defined in google.api_core.exceptions
+  _NON_TRANSIENT_ERRORS = {
+      'invalid',
+      'invalidQuery',
+      'notImplemented',
+      'Moved Permanently',
+      'Not Modified',
+      'Temporary Redirect',
+      'Resume Incomplete',
+      'Bad Request',
+      'Unauthorized',
+      'Forbidden',
+      'Not Found',
+      'Method Not Allowed',
+      'Conflict',
+      'Length Required',
+      'Precondition Failed',
+      'Not Implemented',
+      'Bad Gateway',
+      'Gateway Timeout',
+  }

Review Comment:
   Gotcha, I'll review that and trim that list down a bit, seems like retrying more of the 5XX errors would make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1696539328

   R: @Abacn 
   
   Sorry, this was lost in my notifications.
   Is this PR ready to be reviewed since it is in a draft?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1312997867


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -732,12 +732,23 @@ def _insert_all_rows(
     except (ClientError, GoogleAPICallError) as e:
       # e.code contains the numeric http status code.
       service_call_metric.call(e.code)
-      # Re-reise the exception so that we re-try appropriately.
-      raise
-    except HttpError as e:
+      # Package exception up with required fields
+      # Set reason to 'invalid' to consider these execptions as RetryStrategy._NON_TRANSIENT_ERRORS
+      error = {
+        'message': e.message,
+        'reason': 'invalid'

Review Comment:
   I'd prefer a more granular approach than setting all these exceptions to `invalid`. Are we sure these errors will always be non-transient? Perhaps we can have an if-tree that sets an appropriate reason based on the error code. Or maybe better to pass in the actual error reason and update [`_NON_TRANSIENT_ERRORS`](https://github.com/apache/beam/blob/0b4302e5f95f2dc9b6658c13d5d1aa798cfba668/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L1494) to include more reasons.
   
   For example the code `400` indicates an invalid error so we can say that's non-transient. But if we get something like a `500` or `503` indicates a temporary error and BQ suggests retrying. More info in this [error messages documentation](https://cloud.google.com/bigquery/docs/error-messages).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1726323243

   Tests are passing, will merge now.
   
   Thanks for sticking this one out @ajdub508!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327135779


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1491,7 +1493,29 @@ class RetryStrategy(object):
   RETRY_NEVER = 'RETRY_NEVER'
   RETRY_ON_TRANSIENT_ERROR = 'RETRY_ON_TRANSIENT_ERROR'
 
-  _NON_TRANSIENT_ERRORS = {'invalid', 'invalidQuery', 'notImplemented'}
+  # Values below may be found in reasons provided either in an
+  # error returned by a client method or by an http response as
+  # defined in google.api_core.exceptions
+  _NON_TRANSIENT_ERRORS = {
+      'invalid',
+      'invalidQuery',
+      'notImplemented',
+      'Moved Permanently',
+      'Not Modified',
+      'Temporary Redirect',
+      'Resume Incomplete',
+      'Bad Request',
+      'Unauthorized',
+      'Forbidden',
+      'Not Found',
+      'Method Not Allowed',
+      'Conflict',
+      'Length Required',
+      'Precondition Failed',
+      'Not Implemented',
+      'Bad Gateway',
+      'Gateway Timeout',
+  }

Review Comment:
   Yeah let's be cautious which errors we label as "non-transient", because that could break many current pipelines that rely on Beam to successfully retry these errors. Previously these GRPC errors would be raised and the runner can retry the whole bundle.
   
   I would lean towards only listing the errors we are sure to be non-transient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327051890


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1491,7 +1493,29 @@ class RetryStrategy(object):
   RETRY_NEVER = 'RETRY_NEVER'
   RETRY_ON_TRANSIENT_ERROR = 'RETRY_ON_TRANSIENT_ERROR'
 
-  _NON_TRANSIENT_ERRORS = {'invalid', 'invalidQuery', 'notImplemented'}
+  # Values below may be found in reasons provided either in an
+  # error returned by a client method or by an http response as
+  # defined in google.api_core.exceptions
+  _NON_TRANSIENT_ERRORS = {
+      'invalid',
+      'invalidQuery',
+      'notImplemented',
+      'Moved Permanently',
+      'Not Modified',
+      'Temporary Redirect',
+      'Resume Incomplete',
+      'Bad Request',
+      'Unauthorized',
+      'Forbidden',
+      'Not Found',
+      'Method Not Allowed',
+      'Conflict',
+      'Length Required',
+      'Precondition Failed',
+      'Not Implemented',
+      'Bad Gateway',
+      'Gateway Timeout',
+  }

Review Comment:
   > Thanks for adding this comprehensive list! I'm a little unfamiliar with a few of these so will need another pair of eyes to check.
   > 
   > From a cursory search though, it appears "bad gateway" may be a transient error? Same with "gateway timeout". I would defer to https://cloud.google.com/bigquery/docs/error-messages for a good list to take from.
   
   I see what you mean about some of the one I added possibly being transient, we certainly could take some of the 5XX errors out of that list to broaden it a bit.
   
   I went narrow, initially, based on the narrow set of api core exceptions considered transient in api core retry [here](https://github.com/googleapis/python-api-core/blob/main/google/api_core/retry.py#L125-L128), where it's only 429 Too Many Requests, 500 Internal Server Error, and 503 Service Unavailable that are considered transient. Other than those 3, all remaining 4XX, 5XX, and 3XX errors are not considered transient.
   
   Let me know if you think we should remove some 5XX exceptions from the list or if we should stick with just retrying 429, 500, and 503.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1723651431

   Thanks for the approval, @ahmedabu98, should I add a CHANGE.md entry and squash commits now?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1313822761


##########
sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py:
##########
@@ -454,6 +454,56 @@ def test_big_query_write_insert_errors_reporting(self):
           | 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2])),
           equal_to(bq_result_errors))
 
+
+  @pytest.mark.it_postcommit
+  def test_big_query_write_insert_not_found_errors(self):
+    """
+    Test that NotFound errors returned by beam.io.WriteToBigQuery
+    contain both the failed rows amd the reason for it failing.

Review Comment:
   Thinking about this some more along with the suggestion to add to `_NON_TRANSIENT_ERRORS`, should be able to use the response reasons to let [RetryStrategy](https://github.com/apache/beam/blob/26b77723445cf383365098b296b9db77409af94c/sdks/python/apache_beam/io/gcp/bigquery.py#L1650-L1653) do its thing, may not need to raise and cause a pipeline error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327052981


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1491,7 +1493,29 @@ class RetryStrategy(object):
   RETRY_NEVER = 'RETRY_NEVER'
   RETRY_ON_TRANSIENT_ERROR = 'RETRY_ON_TRANSIENT_ERROR'
 
-  _NON_TRANSIENT_ERRORS = {'invalid', 'invalidQuery', 'notImplemented'}
+  # Values below may be found in reasons provided either in an
+  # error returned by a client method or by an http response as
+  # defined in google.api_core.exceptions
+  _NON_TRANSIENT_ERRORS = {
+      'invalid',
+      'invalidQuery',
+      'notImplemented',
+      'Moved Permanently',
+      'Not Modified',
+      'Temporary Redirect',
+      'Resume Incomplete',
+      'Bad Request',
+      'Unauthorized',
+      'Forbidden',
+      'Not Found',
+      'Method Not Allowed',
+      'Conflict',
+      'Length Required',
+      'Precondition Failed',
+      'Not Implemented',
+      'Bad Gateway',
+      'Gateway Timeout',
+  }

Review Comment:
   > Also error reasons from BQ seem to appear in camel case format (e.g. `notFound` instead of `Not Found`). Is it different for GRPC errors?
   
   Posted this in a different comment but will include here to make it easy to follow in this thread - 
   
   There are 2 types of error reasons in that list:
   1. Reasons returned in the error list response of the `self.gcp_bq_client.insert_rows_json` call. Those are in camel case and are the ones found here in this doc - https://cloud.google.com/bigquery/docs/error-messages.
   2. Reasons returned by the `GoogleAPICalError` exceptions. Those follow the HTTP standard reason format. A 404, for example will return a reason of `Not Found`, which is verified with [this integration test](https://github.com/apache/beam/pull/28091/files#diff-b6e14063dc720a5be1a68c935803f399ab5301bc3748c7f46f156b0c80f5d691R484).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327192937


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=True),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_unstructured_retriable(
+  def test_insert_rows_json_exception_retry_never(
       self,
       mock_send,
       unused_mock_sleep,
-      exception_type=None,
-      error_message=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+      exception_type,
+      error_reason,
+      streaming=False):
+    # In this test, a pipeline will never retry caught exception types
+    # since RetryStrategy is set to RETRY_NEVER
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
     mock_send.side_effect = [
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
+        exception_type('some exception', response=mock_response)
     ]
+    opt = StandardOptions()
+    opt.streaming = streaming
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
-            p
-            | beam.Create([{
-                'columnA': 'value1'
-            }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
-                create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn(error_message, exc.exception.args[0])
+      assert_that(failed_values, equal_to(['value1', 'value2']))
+
+    self.assertEqual(1, mock_send.call_count)
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2,
+          streaming=False),
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1,
+          streaming=False),
       param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2,
+          streaming=True),
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1,
+          streaming=True),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_unretriable_errors(
-      self, mock_send, unused_mock_sleep, exception_type=None, error_args=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+  def test_insert_rows_json_exception_retry_on_transient_error(
+      self,
+      mock_send,
+      unused_mock_sleep,
+      exception_type,
+      error_reason,
+      failed_values,
+      expected_call_count,
+      streaming=False):
+    # In this test, a pipeline will only retry caught exception types
+    # with reasons that are not in _NON_TRANSIENT_ERRORS since RetryStrategy is
+    # set to RETRY_ON_TRANSIENT_ERROR
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
     mock_send.side_effect = [
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
+        exception_type('some exception', response=mock_response),
+        # Return no exception and no errors on 2nd call, if there is a 2nd call
+        []
     ]
+    opt = StandardOptions()
+    opt.streaming = streaming
 
-    with self.assertRaises(Exception):
-      with beam.Pipeline() as p:
-        _ = (
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR))
+      failed_values_out = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values_out, equal_to(failed_values))
+    self.assertEqual(expected_call_count, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st run
+      # row 1 sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error succeeds on second attempt, 0 rows sent to failed rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[]),
+  ])
+  def test_insert_rows_json_errors_retry_always(
+      self, insert_response, failed_rows, unused_sleep_mock=None):
+    # In this test, a pipeline will always retry all errors
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        response = insert_response[call_counter]
+        call_counter += 1
+        return response
+
+      client = mock.Mock()
+      client.insert_rows_json = mock.Mock(side_effect=store_callback)
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(1, mock_send.call_count)
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=False),
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          streaming=True),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=True),
+  ])
+  @mock.patch('time.sleep')
+  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+  def test_insert_rows_json_errors_retry_never(
+      self, mock_send, unused_mock_sleep, insert_response, streaming):
+    # In this test, a pipeline will never retry errors since RetryStrategy is
+    # set to RETRY_NEVER
+    mock_send.side_effect = insert_response
+    opt = StandardOptions()
+    opt.streaming = streaming
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values, equal_to(['value1']))
+
+    self.assertEqual(1, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on 1st attempt
+      # transient error succeeds on 2nd attempt, 0 rows sent to failed rows
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st and 2nd attempt
+      # all rows with errors are retried when any row has a retriable error
+      # row 1 sent to failed_rows after final attempt
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_args=('some connection error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [
+                  {
+                      'index': 0, 'errors': [{
+                          'reason': 'invalid'
+                      }]
+                  },
+              ],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_args=('some timeout error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=True),

Review Comment:
   The old [test_insert_all_unretriable_errors_streaming](https://github.com/apache/beam/pull/28091/files#diff-947eeae716a2b4f80d7d970b44c78b278ffb01adf7f85fd1ec53ce1739352440L1166) test used to set [opt.streaming = True](https://github.com/apache/beam/pull/28091/files#diff-947eeae716a2b4f80d7d970b44c78b278ffb01adf7f85fd1ec53ce1739352440L1177).
   
   I will take those out, agree that they aren't needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327950847


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1491,7 +1493,29 @@ class RetryStrategy(object):
   RETRY_NEVER = 'RETRY_NEVER'
   RETRY_ON_TRANSIENT_ERROR = 'RETRY_ON_TRANSIENT_ERROR'
 
-  _NON_TRANSIENT_ERRORS = {'invalid', 'invalidQuery', 'notImplemented'}
+  # Values below may be found in reasons provided either in an
+  # error returned by a client method or by an http response as
+  # defined in google.api_core.exceptions
+  _NON_TRANSIENT_ERRORS = {
+      'invalid',
+      'invalidQuery',
+      'notImplemented',
+      'Moved Permanently',
+      'Not Modified',
+      'Temporary Redirect',
+      'Resume Incomplete',
+      'Bad Request',
+      'Unauthorized',
+      'Forbidden',
+      'Not Found',
+      'Method Not Allowed',
+      'Conflict',
+      'Length Required',
+      'Precondition Failed',
+      'Not Implemented',
+      'Bad Gateway',
+      'Gateway Timeout',
+  }

Review Comment:
   Added [this commit](https://github.com/apache/beam/pull/28091/commits/ec629474e8539b9ca577eefccf8623a503df343e) which removed a few exceptions based on our conversation and a review of the [bq error table](https://cloud.google.com/bigquery/docs/error-messages#errortable):
   
   - Errors that don't seem to be relevant:
     - 3XX Errors - Moved Permanently, Not Modified, Temporary Redirect, Resume Incomplete
     - Some 4XX Errors - Method Not Allowed, Length Required, Precondition Failed
   - Errors that should be retried:
     - 409 Conflict
     - 502 Bad Gateway
     - 504 Gateway Timeout



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327120534


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=True),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_unstructured_retriable(
+  def test_insert_rows_json_exception_retry_never(
       self,
       mock_send,
       unused_mock_sleep,
-      exception_type=None,
-      error_message=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+      exception_type,
+      error_reason,
+      streaming=False):
+    # In this test, a pipeline will never retry caught exception types
+    # since RetryStrategy is set to RETRY_NEVER
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
     mock_send.side_effect = [
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
+        exception_type('some exception', response=mock_response)
     ]
+    opt = StandardOptions()
+    opt.streaming = streaming
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
-            p
-            | beam.Create([{
-                'columnA': 'value1'
-            }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
-                create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn(error_message, exc.exception.args[0])
+      assert_that(failed_values, equal_to(['value1', 'value2']))
+
+    self.assertEqual(1, mock_send.call_count)
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2,
+          streaming=False),
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1,
+          streaming=False),
       param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2,
+          streaming=True),
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1,
+          streaming=True),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_unretriable_errors(
-      self, mock_send, unused_mock_sleep, exception_type=None, error_args=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+  def test_insert_rows_json_exception_retry_on_transient_error(
+      self,
+      mock_send,
+      unused_mock_sleep,
+      exception_type,
+      error_reason,
+      failed_values,
+      expected_call_count,
+      streaming=False):
+    # In this test, a pipeline will only retry caught exception types
+    # with reasons that are not in _NON_TRANSIENT_ERRORS since RetryStrategy is
+    # set to RETRY_ON_TRANSIENT_ERROR
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
     mock_send.side_effect = [
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
+        exception_type('some exception', response=mock_response),
+        # Return no exception and no errors on 2nd call, if there is a 2nd call
+        []
     ]
+    opt = StandardOptions()
+    opt.streaming = streaming
 
-    with self.assertRaises(Exception):
-      with beam.Pipeline() as p:
-        _ = (
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR))
+      failed_values_out = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values_out, equal_to(failed_values))
+    self.assertEqual(expected_call_count, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st run
+      # row 1 sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error succeeds on second attempt, 0 rows sent to failed rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[]),
+  ])
+  def test_insert_rows_json_errors_retry_always(
+      self, insert_response, failed_rows, unused_sleep_mock=None):
+    # In this test, a pipeline will always retry all errors
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        response = insert_response[call_counter]
+        call_counter += 1
+        return response
+
+      client = mock.Mock()
+      client.insert_rows_json = mock.Mock(side_effect=store_callback)
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(1, mock_send.call_count)
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=False),
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          streaming=True),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=True),
+  ])
+  @mock.patch('time.sleep')
+  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+  def test_insert_rows_json_errors_retry_never(
+      self, mock_send, unused_mock_sleep, insert_response, streaming):
+    # In this test, a pipeline will never retry errors since RetryStrategy is
+    # set to RETRY_NEVER
+    mock_send.side_effect = insert_response
+    opt = StandardOptions()
+    opt.streaming = streaming
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values, equal_to(['value1']))
+
+    self.assertEqual(1, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on 1st attempt
+      # transient error succeeds on 2nd attempt, 0 rows sent to failed rows
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st and 2nd attempt
+      # all rows with errors are retried when any row has a retriable error
+      # row 1 sent to failed_rows after final attempt
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_args=('some connection error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [
+                  {
+                      'index': 0, 'errors': [{
+                          'reason': 'invalid'
+                      }]
+                  },
+              ],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_args=('some timeout error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=True),

Review Comment:
   Do you remember which tests? 
   I think it should be safe to remove them, the same code path is taken either way. I also don't see the connector checking `streaming` besides for validation reasons.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327115677


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),

Review Comment:
   Ahh gotchu, thanks for clarifying. Beam will want its own formatting in the end (otherwise it will complain with a failing test)  so no worries



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1696430911

   @liferoad could you check or suggest a reviewer ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1313807363


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -732,12 +732,23 @@ def _insert_all_rows(
     except (ClientError, GoogleAPICallError) as e:
       # e.code contains the numeric http status code.
       service_call_metric.call(e.code)
-      # Re-reise the exception so that we re-try appropriately.
-      raise
-    except HttpError as e:
+      # Package exception up with required fields
+      # Set reason to 'invalid' to consider these execptions as RetryStrategy._NON_TRANSIENT_ERRORS
+      error = {
+        'message': e.message,
+        'reason': 'invalid'

Review Comment:
   I'll look at adding to `_NON_TRANSIENT_ERRORS`, that's a good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1308634631


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -732,12 +732,23 @@ def _insert_all_rows(
     except (ClientError, GoogleAPICallError) as e:
       # e.code contains the numeric http status code.
       service_call_metric.call(e.code)
-      # Re-reise the exception so that we re-try appropriately.
-      raise
-    except HttpError as e:
+      # Package exception up with required fields
+      # Set reason to 'invalid' to consider these execptions as RetryStrategy._NON_TRANSIENT_ERRORS

Review Comment:
   Thanks @liferoad, I'll take a closer look at that. Let me know if I'm off on any of this, but my thinking had been - as the code currently stands the `ClientError`, `GoogleAPICallError`, and `HttpError` exceptions never get a chance to be retried anyway. So this doesn't take away a chance to be retried, just makes sure the rows can be captured in failed_rows and provide a way to disposition the message and ack a pubsub message.
   
   The reason I had thought that the ClientError, GoogleAPICallError, and HttpError exceptions never get a chance to be retried is:
   - the exceptions gets re-raised in `_insert_all_rows` [here](https://github.com/apache/beam/blob/26b77723445cf383365098b296b9db77409af94c/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L732-L740)
   - the exceptions is not caught by `insert_rows` [here](https://github.com/apache/beam/blob/26b77723445cf383365098b296b9db77409af94c/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L1267-L1271)
   - and the `_flush_batch` method [here](https://github.com/apache/beam/blob/26b77723445cf383365098b296b9db77409af94c/sdks/python/apache_beam/io/gcp/bigquery.py#L1635-L1647) isn't catching them either
   
   End result of all that for `ClientError`, `GoogleAPICallError`, and `HttpError` exceptions is that the exception results in a pipeline error rather than producing a usable `errors` list, and RetryStrategy never gets a chance to be evaluated.
   
   My thought was that it would be better to at least route those rows to the failed rows tag where users can choose what to do with them and avoid issues I've seen with pubsub messages that are never acked.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1309003843


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -732,12 +732,23 @@ def _insert_all_rows(
     except (ClientError, GoogleAPICallError) as e:
       # e.code contains the numeric http status code.
       service_call_metric.call(e.code)
-      # Re-reise the exception so that we re-try appropriately.
-      raise
-    except HttpError as e:
+      # Package exception up with required fields
+      # Set reason to 'invalid' to consider these execptions as RetryStrategy._NON_TRANSIENT_ERRORS

Review Comment:
   Shall we fix the code path to retry these errors (at least the errors are retry-able) instead of returning them as the error PColl?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1712502862

   ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/28091?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#28091](https://app.codecov.io/gh/apache/beam/pull/28091?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (b5b863d) into [master](https://app.codecov.io/gh/apache/beam/commit/cbe2e0f5ad5e45a7f6d380b1b1ac36d5dede8f54?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (cbe2e0f) will **decrease** coverage by `0.31%`.
   > Report is 546 commits behind head on master.
   > The diff coverage is `60.00%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #28091      +/-   ##
   ==========================================
   - Coverage   71.10%   70.79%   -0.31%     
   ==========================================
     Files         859      869      +10     
     Lines      104545   105934    +1389     
   ==========================================
   + Hits        74332    74997     +665     
   - Misses      28655    29379     +724     
     Partials     1558     1558              
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/beam/pull/28091/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [python](https://app.codecov.io/gh/apache/beam/pull/28091/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `79.64% <60.00%> (-0.65%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Files Changed](https://app.codecov.io/gh/apache/beam/pull/28091?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://app.codecov.io/gh/apache/beam/pull/28091?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `74.89% <60.00%> (+0.06%)` | :arrow_up: |
   
   ... and [76 files with indirect coverage changes](https://app.codecov.io/gh/apache/beam/pull/28091/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1724194676

   I think this could be added as a bug fix on `CHANGES.md`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1329174097


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1491,7 +1493,19 @@ class RetryStrategy(object):
   RETRY_NEVER = 'RETRY_NEVER'
   RETRY_ON_TRANSIENT_ERROR = 'RETRY_ON_TRANSIENT_ERROR'
 
-  _NON_TRANSIENT_ERRORS = {'invalid', 'invalidQuery', 'notImplemented'}
+  # Values below may be found in reasons provided either in an
+  # error returned by a client method or by an http response as
+  # defined in google.api_core.exceptions
+  _NON_TRANSIENT_ERRORS = {
+      'invalid',
+      'invalidQuery',
+      'notImplemented',
+      'Bad Request',
+      'Unauthorized',
+      'Forbidden',
+      'Not Found',

Review Comment:
   Right, I think we catch that earlier when validating the create disposition. In Java SDK we also tell the user to make sure the table is created before processing the writes. And if the table gets dropped halfway thru I think it makes sense to treat it as non-transient 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1717395699

   @liferoad @ahmedabu98 Moving this out of draft status, ready for review.
   
   I did not change handling of HttpErrors because they are different from GoogleAPICallError exceptions. Exceptions from `apitools.base.py.exceptions` do not provide a reason value. I'm thinking we probably don't want to maintain a mapping from HttpError codes into usable reasons and the apitools library is deprecated, too.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on PR #28091:
URL: https://github.com/apache/beam/pull/28091#issuecomment-1720973189

   > Thanks for the comments documenting the test cases! was really helpful when following along. I have a few clarifying questions there and a couple requests:
   > 
   > 1. Can we keep the types of exceptions we currently have? We wouldn't want to lose all that coverage.
   > 2. I see that the reasons added to `_NON_TRANSIENT_ERRORS` may be geared towards GRPC errors? Can we add a few reasons that BQ would return to us? for example `notFound` (this particular one is mentioned in [WriteToBigQuery ignores insert_retry_strategy on HttpErrors #21080](https://github.com/apache/beam/issues/21080)) or `accessDenied`
   
   I kept the original 3, `'invalid', 'invalidQuery', 'notImplemented'`, and have those at the top of the list. 
   
   There are 2 types of error reasons in that list:
   1. Reasons returned in the error list response of the `self.gcp_bq_client.insert_rows_json` call. Those are in camel case and are the ones found here in this doc - https://cloud.google.com/bigquery/docs/error-messages.
   2. Reasons returned by the `GoogleAPICalError` exceptions. Those follow the HTTP standard reason format. A 404, for example will return a reason of `Not Found`, which is verified with [this integration test](https://github.com/apache/beam/pull/28091/files#diff-b6e14063dc720a5be1a68c935803f399ab5301bc3748c7f46f156b0c80f5d691R484).
   
   I can definitely add some errors from the bq docs to the list, wouldn't hurt anything, let me know if you think we should go that route.
   
   I think a lot of them probably weren't included originally because they'll actually surface a `GoogleAPICallError` exception and wouldn't be returned in the error list response of the `self.gcp_bq_client.insert_rows_json` call. When that happens, like with the 404 example, we won't see the bq camel case `notFound`, we'll end up reading the `Not Found` reason from the `GoogleAPICallError` exception that we're catching instead. That's why [this integration test](https://github.com/apache/beam/pull/28091/files#diff-b6e14063dc720a5be1a68c935803f399ab5301bc3748c7f46f156b0c80f5d691R484) works the way it does, ending up with a `Not Found` reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ajdub508 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ajdub508 (via GitHub)" <gi...@apache.org>.
ajdub508 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327055222


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),

Review Comment:
   Thanks, I think I misunderstood purpose of these tests and thought they were redundant, will add them back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #28091: Return errors for insert_rows_json exceptions (#21080)

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #28091:
URL: https://github.com/apache/beam/pull/28091#discussion_r1327120534


##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -931,255 +929,583 @@ def test_copy_load_job_exception(self, exception_type, error_message):
     'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
-  # Using https://cloud.google.com/bigquery/docs/error-messages and
-  # https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error retried and succeeds on second attempt, 0 rows sent to
+      # failed rows
       param(
-          exception_type=exceptions.Forbidden if exceptions else None,
-          error_reason='rateLimitExceeded'),
-      param(
-          exception_type=exceptions.DeadlineExceeded if exceptions else None,
-          error_reason='somereason'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None, None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=[]),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed rows after hitting max_retries
       param(
-          exception_type=exceptions.ServiceUnavailable if exceptions else None,
-          error_reason='backendError'),
-      param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='internalError'),
+          insert_response=[
+            exceptions.TooManyRequests if exceptions else None,
+            exceptions.TooManyRequests if exceptions else None],
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to
+      # failed_rows after hitting max_retries
       param(
-          exception_type=exceptions.InternalServerError if exceptions else None,
-          error_reason='backendError'),
+          insert_response=[
+            exceptions.Forbidden if exceptions else None,
+            exceptions.Forbidden if exceptions else None],
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_rows=['value1', 'value3', 'value5']),
   ])
-  @mock.patch('time.sleep')
-  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_structured_retriable(
-      self,
-      mock_send,
-      unused_mock_sleep,
-      exception_type=None,
-      error_reason=None):
-    # In this test, a BATCH pipeline will retry the known RETRIABLE errors.
-    mock_send.side_effect = [
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-        exception_type(
-            'some retriable exception', errors=[{
-                'reason': error_reason
-            }]),
-    ]
+  def test_insert_rows_json_exception_retry_always(
+      self, insert_response, error_reason, failed_rows):
+    # In this test, a pipeline will always retry all caught exception types
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+      mock_response = mock.Mock()
+      mock_response.reason = error_reason
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        # raise exception if insert_response element is an exception
+        if insert_response[call_counter]:
+          exception_type = insert_response[call_counter]
+          call_counter += 1
+          raise exception_type('some exception', response=mock_response)
+        # return empty list if not insert_response element, indicating
+        # successful call to insert_rows_json
+        else:
+          call_counter += 1
+          return []
+
+      client = mock.Mock()
+      client.insert_rows_json.side_effect = store_callback
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn('some retriable exception', exc.exception.args[0])
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_message='some connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_message='some timeout error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=False),
       param(
-          exception_type=ConnectionError,
-          error_message='some py connection error'),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          streaming=True),
       param(
-          exception_type=exceptions.BadGateway if exceptions else None,
-          error_message='some badgateway error'),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          streaming=True),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_retries_if_unstructured_retriable(
+  def test_insert_rows_json_exception_retry_never(
       self,
       mock_send,
       unused_mock_sleep,
-      exception_type=None,
-      error_message=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+      exception_type,
+      error_reason,
+      streaming=False):
+    # In this test, a pipeline will never retry caught exception types
+    # since RetryStrategy is set to RETRY_NEVER
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
     mock_send.side_effect = [
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
-        exception_type(error_message),
+        exception_type('some exception', response=mock_response)
     ]
+    opt = StandardOptions()
+    opt.streaming = streaming
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
 
-    with self.assertRaises(Exception) as exc:
-      with beam.Pipeline() as p:
-        _ = (
-            p
-            | beam.Create([{
-                'columnA': 'value1'
-            }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
-                create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(4, mock_send.call_count)
-    self.assertIn(error_message, exc.exception.args[0])
+      assert_that(failed_values, equal_to(['value1', 'value2']))
+
+    self.assertEqual(1, mock_send.call_count)
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #   /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of exceptions from  https://googleapis.dev
+  #     /python/google-api-core/latest/_modules/google/api_core/exceptions.html.
+  # Choosing some exceptions that produce reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2,
+          streaming=False),
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
-      param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1,
+          streaming=False),
       param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          exception_type=exceptions.TooManyRequests if exceptions else None,
+          error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS
+          failed_values=[],
+          expected_call_count=2,
+          streaming=True),
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          exception_type=exceptions.Forbidden if exceptions else None,
+          error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS
+          failed_values=['value1', 'value2'],
+          expected_call_count=1,
+          streaming=True),
   ])
   @mock.patch('time.sleep')
   @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
-  def test_insert_all_unretriable_errors(
-      self, mock_send, unused_mock_sleep, exception_type=None, error_args=None):
-    # In this test, a BATCH pipeline will retry the unknown RETRIABLE errors.
+  def test_insert_rows_json_exception_retry_on_transient_error(
+      self,
+      mock_send,
+      unused_mock_sleep,
+      exception_type,
+      error_reason,
+      failed_values,
+      expected_call_count,
+      streaming=False):
+    # In this test, a pipeline will only retry caught exception types
+    # with reasons that are not in _NON_TRANSIENT_ERRORS since RetryStrategy is
+    # set to RETRY_ON_TRANSIENT_ERROR
+    mock_response = mock.Mock()
+    mock_response.reason = error_reason
     mock_send.side_effect = [
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
-        exception_type(*error_args),
+        exception_type('some exception', response=mock_response),
+        # Return no exception and no errors on 2nd call, if there is a 2nd call
+        []
     ]
+    opt = StandardOptions()
+    opt.streaming = streaming
 
-    with self.assertRaises(Exception):
-      with beam.Pipeline() as p:
-        _ = (
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR))
+      failed_values_out = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values_out, equal_to(failed_values))
+    self.assertEqual(expected_call_count, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st run
+      # row 1 sent to failed_rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1']),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt
+      # transient error succeeds on second attempt, 0 rows sent to failed rows
+      param(
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[]),
+  ])
+  def test_insert_rows_json_errors_retry_always(
+      self, insert_response, failed_rows, unused_sleep_mock=None):
+    # In this test, a pipeline will always retry all errors
+    # since RetryStrategy is not set and defaults to RETRY_ALWAYS
+    with mock.patch('time.sleep'):
+      call_counter = 0
+
+      def store_callback(table, **kwargs):
+        nonlocal call_counter
+        response = insert_response[call_counter]
+        call_counter += 1
+        return response
+
+      client = mock.Mock()
+      client.insert_rows_json = mock.Mock(side_effect=store_callback)
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
             p
             | beam.Create([{
-                'columnA': 'value1'
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
             }])
-            | WriteToBigQuery(
-                table='project:dataset.table',
-                schema={
-                    'fields': [{
-                        'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
-                    }]
-                },
+            # Using _StreamToBigQuery in order to be able to pass max_retries
+            # in order to limit run time of test with RETRY_ALWAYS
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
                 create_disposition='CREATE_NEVER',
-                method='STREAMING_INSERTS'))
-    self.assertEqual(1, mock_send.call_count)
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=RetryStrategy.RETRY_ALWAYS,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=len(insert_response) - 1,
+                num_streaming_keys=500))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(failed_values, equal_to(failed_rows))
 
-  # Using https://googleapis.dev/python/google-api-core/latest/_modules/google
-  #    /api_core/exceptions.html
-  # to determine error types and messages to try for retriables.
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
   @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=retry.PermanentException,
-          error_args=('nonretriable', )),
-      param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=(
-              'forbidden morbidden', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.BadRequest if exceptions else None,
-          error_args=('BAD REQUEST!', [{
-              'reason': 'nonretriablereason'
-          }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=False),
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=(
-              'method not allowed!', [{
-                  'reason': 'nonretriablereason'
-              }])),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          streaming=True),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.MethodNotAllowed if exceptions else None,
-          error_args=('method not allowed!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+          ],
+          streaming=True),
+  ])
+  @mock.patch('time.sleep')
+  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+  def test_insert_rows_json_errors_retry_never(
+      self, mock_send, unused_mock_sleep, insert_response, streaming):
+    # In this test, a pipeline will never retry errors since RetryStrategy is
+    # set to RETRY_NEVER
+    mock_send.side_effect = insert_response
+    opt = StandardOptions()
+    opt.streaming = streaming
+    with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
+      bq_write_out = (
+          p
+          | beam.Create([{
+              'columnA': 'value1'
+          }, {
+              'columnA': 'value2'
+          }])
+          | WriteToBigQuery(
+              table='project:dataset.table',
+              schema={
+                  'fields': [{
+                      'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE'
+                  }]
+              },
+              create_disposition='CREATE_NEVER',
+              method='STREAMING_INSERTS',
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER))
+      failed_values = (
+          bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda x: x[1]['columnA']))
+
+      assert_that(failed_values, equal_to(['value1']))
+
+    self.assertEqual(1, mock_send.call_count)
+
+  # Running tests with a variety of error reasons from
+  # https://cloud.google.com/bigquery/docs/error-messages
+  # This covers the scenario when
+  # the google.cloud.bigquery.Client.insert_rows_json call returns an error list
+  # rather than raising an exception.
+  # Choosing some error reasons that are included in
+  # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not
+  @parameterized.expand([
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=exceptions.Unknown if exceptions else None,
-          error_args=('unknown!', 'args')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason not in _NON_TRANSIENT_ERRORS for row 1 on 1st attempt
+      # transient error succeeds on 2nd attempt, 0 rows sent to failed rows
       param(
-          exception_type=exceptions.Aborted if exceptions else None,
-          error_args=('abortet!', 'abort')),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [],
+          ],
+          failed_rows=[],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1
+      # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st and 2nd attempt
+      # all rows with errors are retried when any row has a retriable error
+      # row 1 sent to failed_rows after final attempt
       param(
-          exception_type=requests.exceptions.ConnectionError,
-          error_args=('some connection error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }, {
+                  'index': 1, 'errors': [{
+                      'reason': 'internalError'
+                  }]
+              }],
+              [
+                  {
+                      'index': 0, 'errors': [{
+                          'reason': 'invalid'
+                      }]
+                  },
+              ],
+          ],
+          failed_rows=['value1'],
+          streaming=False),
+      # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows
       param(
-          exception_type=requests.exceptions.Timeout,
-          error_args=('some timeout error', )),
+          insert_response=[
+              [{
+                  'index': 0, 'errors': [{
+                      'reason': 'invalid'
+                  }]
+              }],
+          ],
+          failed_rows=['value1'],
+          streaming=True),

Review Comment:
   Do you remember which tests? 
   I think it should be safe to remove them, the same code path is taken either way and I don't see the connector checking `streaming` besides for validation reasons.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org