You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/12 20:39:01 UTC

[beam] branch master updated: Merge pull request #17584 from [BEAM-14415] Exception handling tests and logging for partial failure BQIO

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6680261b629 Merge pull request #17584 from [BEAM-14415] Exception handling tests and logging for partial failure BQIO
6680261b629 is described below

commit 6680261b629e6de5a8a731b4f26401b9fa649314
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Thu May 12 16:38:55 2022 -0400

    Merge pull request #17584 from [BEAM-14415] Exception handling tests and logging for partial failure BQIO
    
    * [BEAM-14415] Exception handling tests and logging for partial failures in BQ IO
    
    * fix DLQ integration test
    
    * fix lint
    
    * fix postcommit
    
    * fix formatter
    
    * Fixing tests and adding test info
    
    * fix skipping tests
---
 sdks/python/apache_beam/io/gcp/bigquery.py         |  80 ++++++--
 sdks/python/apache_beam/io/gcp/bigquery_test.py    | 205 ++++++++++++++++++++-
 .../apache_beam/io/gcp/bigquery_tools_test.py      |   2 +-
 .../apache_beam/io/gcp/bigquery_write_it_test.py   |  83 +++++++++
 4 files changed, 346 insertions(+), 24 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 2c21dca6047..4d7df85f905 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -378,6 +378,15 @@ Template for BigQuery jobs created by BigQueryIO. This template is:
 NOTE: This job name template does not have backwards compatibility guarantees.
 """
 BQ_JOB_NAME_TEMPLATE = "beam_bq_job_{job_type}_{job_id}_{step_id}{random}"
+"""
+The maximum number of times that a bundle of rows that errors out should be
+sent for insertion into BigQuery.
+
+The default is 10,000 with exponential backoffs, so a bundle of rows may be
+tried for a very long time. You may reduce this property to reduce the number
+of retries.
+"""
+MAX_INSERT_RETRIES = 10000
 
 
 @deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference")
@@ -1492,6 +1501,7 @@ class BigQueryWriteFn(DoFn):
   DEFAULT_MAX_BATCH_SIZE = 500
 
   FAILED_ROWS = 'FailedRows'
+  FAILED_ROWS_WITH_ERRORS = 'FailedRowsWithErrors'
   STREAMING_API_LOGGING_FREQUENCY_SEC = 300
 
   def __init__(
@@ -1507,7 +1517,8 @@ class BigQueryWriteFn(DoFn):
       additional_bq_parameters=None,
       ignore_insert_ids=False,
       with_batched_input=False,
-      ignore_unknown_columns=False):
+      ignore_unknown_columns=False,
+      max_retries=MAX_INSERT_RETRIES):
     """Initialize a WriteToBigQuery transform.
 
     Args:
@@ -1555,6 +1566,9 @@ class BigQueryWriteFn(DoFn):
         the schema. The unknown values are ignored. Default is False,
         which treats unknown values as errors. See reference:
         https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
+      max_retries: The number of times that we will retry inserting a group of
+        rows into BigQuery. By default, we retry 10000 times with exponential
+        backoffs (effectively retry forever).
 
     """
     self.schema = schema
@@ -1592,6 +1606,7 @@ class BigQueryWriteFn(DoFn):
     self.streaming_api_logging_frequency_sec = (
         BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC)
     self.ignore_unknown_columns = ignore_unknown_columns
+    self._max_retries = max_retries
 
   def display_data(self):
     return {
@@ -1643,7 +1658,9 @@ class BigQueryWriteFn(DoFn):
 
     self._backoff_calculator = iter(
         retry.FuzzedExponentialIntervals(
-            initial_delay_secs=0.2, num_retries=10000, max_delay_secs=1500))
+            initial_delay_secs=0.2,
+            num_retries=self._max_retries,
+            max_delay_secs=1500))
 
   def _create_table_if_needed(self, table_reference, schema=None):
     str_table_reference = '%s:%s.%s' % (
@@ -1754,41 +1771,57 @@ class BigQueryWriteFn(DoFn):
           ignore_unknown_values=self.ignore_unknown_columns)
       self.batch_latency_metric.update((time.time() - start) * 1000)
 
-      failed_rows = [rows[entry['index']] for entry in errors]
+      failed_rows = [(rows[entry['index']], entry["errors"])
+                     for entry in errors]
+      retry_backoff = next(self._backoff_calculator, None)
+
+      # If retry_backoff is None, then we will not retry and must log.
       should_retry = any(
           RetryStrategy.should_retry(
               self._retry_strategy, entry['errors'][0]['reason'])
-          for entry in errors)
+          for entry in errors) and retry_backoff is not None
+
       if not passed:
         self.failed_rows_metric.update(len(failed_rows))
         message = (
             'There were errors inserting to BigQuery. Will{} retry. '
             'Errors were {}'.format(("" if should_retry else " not"), errors))
-        if should_retry:
-          _LOGGER.warning(message)
-        else:
-          _LOGGER.error(message)
 
-      rows = failed_rows
+        # The log level is:
+        # - WARNING when we are continuing to retry, and have a deadline.
+        # - ERROR when we will no longer retry, or MAY retry forever.
+        log_level = (
+            logging.WARN if should_retry or
+            self._retry_strategy != RetryStrategy.RETRY_ALWAYS else
+            logging.ERROR)
+
+        _LOGGER.log(log_level, message)
 
       if not should_retry:
         break
       else:
-        retry_backoff = next(self._backoff_calculator)
         _LOGGER.info(
             'Sleeping %s seconds before retrying insertion.', retry_backoff)
         time.sleep(retry_backoff)
+        rows = [fr[0] for fr in failed_rows]
         self._throttled_secs.inc(retry_backoff)
 
     self._total_buffered_rows -= len(self._rows_buffer[destination])
     del self._rows_buffer[destination]
 
-    return [
+    return itertools.chain([
         pvalue.TaggedOutput(
-            BigQueryWriteFn.FAILED_ROWS,
-            GlobalWindows.windowed_value((destination, row)))
-        for row in failed_rows
-    ]
+            BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+            GlobalWindows.windowed_value((destination, row, err))) for row,
+        err in failed_rows
+    ],
+                           [
+                               pvalue.TaggedOutput(
+                                   BigQueryWriteFn.FAILED_ROWS,
+                                   GlobalWindows.windowed_value(
+                                       (destination, row))) for row,
+                               unused_err in failed_rows
+                           ])
 
 
 # The number of shards per destination when writing via streaming inserts.
@@ -1815,7 +1848,8 @@ class _StreamToBigQuery(PTransform):
       ignore_insert_ids,
       ignore_unknown_columns,
       with_auto_sharding,
-      test_client=None):
+      test_client=None,
+      max_retries=None):
     self.table_reference = table_reference
     self.table_side_inputs = table_side_inputs
     self.schema_side_inputs = schema_side_inputs
@@ -1831,6 +1865,7 @@ class _StreamToBigQuery(PTransform):
     self.ignore_insert_ids = ignore_insert_ids
     self.ignore_unknown_columns = ignore_unknown_columns
     self.with_auto_sharding = with_auto_sharding
+    self.max_retries = max_retries or MAX_INSERT_RETRIES
 
   class InsertIdPrefixFn(DoFn):
     def start_bundle(self):
@@ -1856,7 +1891,8 @@ class _StreamToBigQuery(PTransform):
         additional_bq_parameters=self.additional_bq_parameters,
         ignore_insert_ids=self.ignore_insert_ids,
         ignore_unknown_columns=self.ignore_unknown_columns,
-        with_batched_input=self.with_auto_sharding)
+        with_batched_input=self.with_auto_sharding,
+        max_retries=self.max_retries)
 
     def _add_random_shard(element):
       key = element[0]
@@ -1905,7 +1941,9 @@ class _StreamToBigQuery(PTransform):
         | 'FromHashableTableRef' >> beam.Map(_restore_table_ref)
         | 'StreamInsertRows' >> ParDo(
             bigquery_write_fn, *self.schema_side_inputs).with_outputs(
-                BigQueryWriteFn.FAILED_ROWS, main='main'))
+                BigQueryWriteFn.FAILED_ROWS,
+                BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+                main='main'))
 
 
 # Flag to be passed to WriteToBigQuery to force schema autodetection
@@ -2194,7 +2232,11 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
           with_auto_sharding=self.with_auto_sharding,
           test_client=self.test_client)
 
-      return {BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS]}
+      return {
+          BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS],
+          BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: outputs[
+              BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS],
+      }
     else:
       if self._temp_file_format == bigquery_tools.FileFormat.AVRO:
         if self.schema == SCHEMA_AUTODETECT:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 74722e4e538..ff2a95c7f8e 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -42,6 +42,7 @@ import apache_beam as beam
 from apache_beam.internal import pickler
 from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
+from apache_beam.io.gcp import bigquery as beam_bq
 from apache_beam.io.gcp import bigquery_tools
 from apache_beam.io.gcp.bigquery import TableRowJsonCoder
 from apache_beam.io.gcp.bigquery import WriteToBigQuery
@@ -91,6 +92,14 @@ except ImportError:
 _LOGGER = logging.getLogger(__name__)
 
 
+def _load_or_default(filename):
+  try:
+    with open(filename) as f:
+      return json.load(f)
+  except:  # pylint: disable=bare-except
+    return {}
+
+
 @unittest.skipIf(
     HttpError is None or gcp_bigquery is None,
     'GCP dependencies are not installed')
@@ -838,6 +847,7 @@ class TestWriteToBigQuery(unittest.TestCase):
               test_client=client))
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
 
   # Using https://cloud.google.com/bigquery/docs/error-messages and
@@ -1233,7 +1243,8 @@ class BigQueryStreamingInsertTransformTests(unittest.TestCase):
 
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
-  def test_failure_has_same_insert_ids(self):
+  @mock.patch('time.sleep')
+  def test_failure_has_same_insert_ids(self, unused_mock_sleep):
     tempdir = '%s%s' % (self._new_tempdir(), os.sep)
     file_name_1 = os.path.join(tempdir, 'file1')
     file_name_2 = os.path.join(tempdir, 'file2')
@@ -1289,6 +1300,184 @@ class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
     with open(file_name_1) as f1, open(file_name_2) as f2:
       self.assertEqual(json.load(f1), json.load(f2))
 
+  @parameterized.expand([
+      param(retry_strategy=RetryStrategy.RETRY_ALWAYS),
+      param(retry_strategy=RetryStrategy.RETRY_NEVER),
+      param(retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR),
+  ])
+  def test_failure_in_some_rows_does_not_duplicate(self, retry_strategy=None):
+    with mock.patch('time.sleep'):
+      # In this test we simulate a failure to write out two out of three rows.
+      # Row 0 and row 2 fail to be written on the first attempt, and then
+      # succeed on the next attempt (if there is one).
+      tempdir = '%s%s' % (self._new_tempdir(), os.sep)
+      file_name_1 = os.path.join(tempdir, 'file1_partial')
+      file_name_2 = os.path.join(tempdir, 'file2_partial')
+
+      def store_callback(table, **kwargs):
+        insert_ids = [r for r in kwargs['row_ids']]
+        colA_values = [r['columnA'] for r in kwargs['json_rows']]
+
+        # The first time this function is called, all rows are included
+        # so we need to filter out 'failed' rows.
+        json_output_1 = {
+            'insertIds': [insert_ids[1]], 'colA_values': [colA_values[1]]
+        }
+        # The second time this function is called, only rows 0 and 2 are incl
+        # so we don't need to filter any of them. We just write them all out.
+        json_output_2 = {'insertIds': insert_ids, 'colA_values': colA_values}
+
+        # The first time we try to insert, we save those insertions in
+        # file insert_calls1.
+        if not os.path.exists(file_name_1):
+          with open(file_name_1, 'w') as f:
+            json.dump(json_output_1, f)
+          return [
+              {
+                  'index': 0,
+                  'errors': [{
+                      'reason': 'i dont like this row'
+                  }, {
+                      'reason': 'its bad'
+                  }]
+              },
+              {
+                  'index': 2,
+                  'errors': [{
+                      'reason': 'i het this row'
+                  }, {
+                      'reason': 'its no gud'
+                  }]
+              },
+          ]
+        else:
+          with open(file_name_2, 'w') as f:
+            json.dump(json_output_2, f)
+            return []
+
+      client = mock.Mock()
+      client.insert_rows_json = mock.Mock(side_effect=store_callback)
+
+      # The expected rows to be inserted according to the insert strategy
+      if retry_strategy == RetryStrategy.RETRY_NEVER:
+        result = ['value3']
+      else:  # RETRY_ALWAYS and RETRY_ON_TRANSIENT_ERRORS should insert all rows
+        result = ['value1', 'value3', 'value5']
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
+            p
+            | beam.Create([{
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
+            }])
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
+                create_disposition='CREATE_NEVER',
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=retry_strategy,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(
+            failed_values,
+            equal_to(list({'value1', 'value3', 'value5'}.difference(result))))
+
+      data1 = _load_or_default(file_name_1)
+      data2 = _load_or_default(file_name_2)
+
+      self.assertListEqual(
+          sorted(data1.get('colA_values', []) + data2.get('colA_values', [])),
+          result)
+      self.assertEqual(len(data1['colA_values']), 1)
+
+  @parameterized.expand([
+      param(retry_strategy=RetryStrategy.RETRY_ALWAYS),
+      param(retry_strategy=RetryStrategy.RETRY_NEVER),
+      param(retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR),
+  ])
+  def test_permanent_failure_in_some_rows_does_not_duplicate(
+      self, unused_sleep_mock=None, retry_strategy=None):
+    with mock.patch('time.sleep'):
+
+      def store_callback(table, **kwargs):
+        return [
+            {
+                'index': 0,
+                'errors': [{
+                    'reason': 'invalid'
+                }, {
+                    'reason': 'its bad'
+                }]
+            },
+        ]
+
+      client = mock.Mock()
+      client.insert_rows_json = mock.Mock(side_effect=store_callback)
+
+      # The expected rows to be inserted according to the insert strategy
+      if retry_strategy == RetryStrategy.RETRY_NEVER:
+        inserted_rows = ['value3', 'value5']
+      else:  # RETRY_ALWAYS and RETRY_ON_TRANSIENT_ERRORS should insert all rows
+        inserted_rows = ['value3', 'value5']
+
+      # Using the bundle based direct runner to avoid pickling problems
+      # with mocks.
+      with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+        bq_write_out = (
+            p
+            | beam.Create([{
+                'columnA': 'value1', 'columnB': 'value2'
+            }, {
+                'columnA': 'value3', 'columnB': 'value4'
+            }, {
+                'columnA': 'value5', 'columnB': 'value6'
+            }])
+            | _StreamToBigQuery(
+                table_reference='project:dataset.table',
+                table_side_inputs=[],
+                schema_side_inputs=[],
+                schema='anyschema',
+                batch_size=None,
+                triggering_frequency=None,
+                create_disposition='CREATE_NEVER',
+                write_disposition=None,
+                kms_key=None,
+                retry_strategy=retry_strategy,
+                additional_bq_parameters=[],
+                ignore_insert_ids=False,
+                ignore_unknown_columns=False,
+                with_auto_sharding=False,
+                test_client=client,
+                max_retries=10))
+
+        failed_values = (
+            bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+            | beam.Map(lambda x: x[1]['columnA']))
+
+        assert_that(
+            failed_values,
+            equal_to(
+                list({'value1', 'value3', 'value5'}.difference(inserted_rows))))
+
   @parameterized.expand([
       param(with_auto_sharding=False),
       param(with_auto_sharding=True),
@@ -1353,6 +1542,7 @@ class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
       self.assertEqual(out2['colA_values'], ['value5'])
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
   BIG_QUERY_DATASET_ID = 'python_bq_streaming_inserts_'
 
@@ -1538,9 +1728,15 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
               method='STREAMING_INSERTS'))
 
       assert_that(
-          r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS],
+          r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | beam.Map(lambda elm: (elm[0], elm[1])),
           equal_to([(full_output_table_1, bad_record)]))
 
+      assert_that(
+          r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS],
+          equal_to([(full_output_table_1, bad_record)]),
+          label='FailedRowsMatch')
+
   def tearDown(self):
     request = bigquery.BigqueryDatasetsDeleteRequest(
         projectId=self.project, datasetId=self.dataset_id, deleteContents=True)
@@ -1646,6 +1842,7 @@ class PubSubBigQueryIT(unittest.TestCase):
         WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=20)
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class BigQueryFileLoadsIntegrationTests(unittest.TestCase):
   BIG_QUERY_DATASET_ID = 'python_bq_file_loads_'
 
@@ -1676,12 +1873,12 @@ class BigQueryFileLoadsIntegrationTests(unittest.TestCase):
     bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = 100
     elements = [
         {
-            'name': u'Negative infinity',
+            'name': 'Negative infinity',
             'value': -float('inf'),
             'timestamp': datetime.datetime(1970, 1, 1, tzinfo=pytz.utc),
         },
         {
-            'name': u'Not a number',
+            'name': 'Not a number',
             'value': float('nan'),
             'timestamp': datetime.datetime(2930, 12, 9, tzinfo=pytz.utc),
         },
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index e4ff6082cab..3ce8d0ff7de 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -223,7 +223,7 @@ class TestBigQueryWrapper(unittest.TestCase):
     self.assertTrue(client.datasets.Delete.called)
 
   @unittest.skipIf(
-      google and not hasattr(google.cloud, '_http'),
+      google and not hasattr(google.cloud, '_http'),  # pylint: disable=c-extension-no-member
       'Dependencies not installed')
   @mock.patch('time.sleep', return_value=None)
   @mock.patch('google.cloud._http.JSONConnection.http')
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index dd2283eb71d..e75b698c651 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -36,14 +36,18 @@ from parameterized import param
 from parameterized import parameterized
 
 import apache_beam as beam
+from apache_beam.io.gcp.bigquery import BigQueryWriteFn
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
 from apache_beam.io.gcp.bigquery_tools import FileFormat
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 # Protect against environments where bigquery library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
+
 try:
   from apitools.base.py.exceptions import HttpError
 except ImportError:
@@ -373,6 +377,85 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
               temp_file_format=FileFormat.JSON))
 
+  @pytest.mark.it_postcommit
+  def test_big_query_write_insert_errors_reporting(self):
+    """
+    Test that errors returned by beam.io.WriteToBigQuery
+    contain both the failed rows amd the reason for it failing.
+    """
+    table_name = 'python_write_table'
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    input_data = [{
+        'number': 1,
+        'str': 'some_string',
+    }, {
+        'number': 2
+    },
+                  {
+                      'number': 3,
+                      'str': 'some_string',
+                      'additional_field_str': 'some_string',
+                  }]
+
+    table_schema = {
+        "fields": [{
+            "name": "number", "type": "INTEGER", 'mode': 'REQUIRED'
+        }, {
+            "name": "str", "type": "STRING", 'mode': 'REQUIRED'
+        }]
+    }
+
+    bq_result_errors = [(
+        {
+            "number": 2
+        },
+        [{
+            "reason": "invalid",
+            "location": "",
+            "debugInfo": "",
+            "message": "Missing required field: Msg_0_CLOUD_QUERY_TABLE.str."
+        }],
+    ),
+                        ({
+                            "number": 3,
+                            "str": "some_string",
+                            "additional_field_str": "some_string"
+                        },
+                         [{
+                             "reason": "invalid",
+                             "location": "additional_field_str",
+                             "debugInfo": "",
+                             "message": "no such field: additional_field_str."
+                         }])]
+
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT number, str FROM %s" % table_id,
+            data=[(1, 'some_string')]),
+    ]
+
+    args = self.test_pipeline.get_full_options_as_args(
+        on_success_matcher=hc.all_of(*pipeline_verifiers))
+
+    with beam.Pipeline(argv=args) as p:
+      # pylint: disable=expression-not-assigned
+      errors = (
+          p | 'create' >> beam.Create(input_data)
+          | 'write' >> beam.io.WriteToBigQuery(
+              table_id,
+              schema=table_schema,
+              method='STREAMING_INSERTS',
+              insert_retry_strategy='RETRY_NEVER',
+              create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+
+      assert_that(
+          errors[BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+          | 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2])),
+          equal_to(bq_result_errors))
+
   @pytest.mark.it_postcommit
   @parameterized.expand([
       param(file_format=FileFormat.AVRO),