You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/12 20:39:01 UTC
[beam] branch master updated: Merge pull request #17584 from [BEAM-14415] Exception handling tests and logging for partial failure BQIO
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6680261b629 Merge pull request #17584 from [BEAM-14415] Exception handling tests and logging for partial failure BQIO
6680261b629 is described below
commit 6680261b629e6de5a8a731b4f26401b9fa649314
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Thu May 12 16:38:55 2022 -0400
Merge pull request #17584 from [BEAM-14415] Exception handling tests and logging for partial failure BQIO
* [BEAM-14415] Exception handling tests and logging for partial failures in BQ IO
* fix DLQ integration test
* fix lint
* fix postcommit
* fix formatter
* Fixing tests and adding test info
* fix skipping tests
---
sdks/python/apache_beam/io/gcp/bigquery.py | 80 ++++++--
sdks/python/apache_beam/io/gcp/bigquery_test.py | 205 ++++++++++++++++++++-
.../apache_beam/io/gcp/bigquery_tools_test.py | 2 +-
.../apache_beam/io/gcp/bigquery_write_it_test.py | 83 +++++++++
4 files changed, 346 insertions(+), 24 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 2c21dca6047..4d7df85f905 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -378,6 +378,15 @@ Template for BigQuery jobs created by BigQueryIO. This template is:
NOTE: This job name template does not have backwards compatibility guarantees.
"""
BQ_JOB_NAME_TEMPLATE = "beam_bq_job_{job_type}_{job_id}_{step_id}{random}"
+"""
+The maximum number of times that a bundle of rows that errors out should be
+sent for insertion into BigQuery.
+
+The default is 10,000 with exponential backoffs, so a bundle of rows may be
+tried for a very long time. You may reduce this property to reduce the number
+of retries.
+"""
+MAX_INSERT_RETRIES = 10000
@deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference")
@@ -1492,6 +1501,7 @@ class BigQueryWriteFn(DoFn):
DEFAULT_MAX_BATCH_SIZE = 500
FAILED_ROWS = 'FailedRows'
+ FAILED_ROWS_WITH_ERRORS = 'FailedRowsWithErrors'
STREAMING_API_LOGGING_FREQUENCY_SEC = 300
def __init__(
@@ -1507,7 +1517,8 @@ class BigQueryWriteFn(DoFn):
additional_bq_parameters=None,
ignore_insert_ids=False,
with_batched_input=False,
- ignore_unknown_columns=False):
+ ignore_unknown_columns=False,
+ max_retries=MAX_INSERT_RETRIES):
"""Initialize a WriteToBigQuery transform.
Args:
@@ -1555,6 +1566,9 @@ class BigQueryWriteFn(DoFn):
the schema. The unknown values are ignored. Default is False,
which treats unknown values as errors. See reference:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
+ max_retries: The number of times that we will retry inserting a group of
+ rows into BigQuery. By default, we retry 10000 times with exponential
+ backoffs (effectively retry forever).
"""
self.schema = schema
@@ -1592,6 +1606,7 @@ class BigQueryWriteFn(DoFn):
self.streaming_api_logging_frequency_sec = (
BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC)
self.ignore_unknown_columns = ignore_unknown_columns
+ self._max_retries = max_retries
def display_data(self):
return {
@@ -1643,7 +1658,9 @@ class BigQueryWriteFn(DoFn):
self._backoff_calculator = iter(
retry.FuzzedExponentialIntervals(
- initial_delay_secs=0.2, num_retries=10000, max_delay_secs=1500))
+ initial_delay_secs=0.2,
+ num_retries=self._max_retries,
+ max_delay_secs=1500))
def _create_table_if_needed(self, table_reference, schema=None):
str_table_reference = '%s:%s.%s' % (
@@ -1754,41 +1771,57 @@ class BigQueryWriteFn(DoFn):
ignore_unknown_values=self.ignore_unknown_columns)
self.batch_latency_metric.update((time.time() - start) * 1000)
- failed_rows = [rows[entry['index']] for entry in errors]
+ failed_rows = [(rows[entry['index']], entry["errors"])
+ for entry in errors]
+ retry_backoff = next(self._backoff_calculator, None)
+
+ # If retry_backoff is None, then we will not retry and must log.
should_retry = any(
RetryStrategy.should_retry(
self._retry_strategy, entry['errors'][0]['reason'])
- for entry in errors)
+ for entry in errors) and retry_backoff is not None
+
if not passed:
self.failed_rows_metric.update(len(failed_rows))
message = (
'There were errors inserting to BigQuery. Will{} retry. '
'Errors were {}'.format(("" if should_retry else " not"), errors))
- if should_retry:
- _LOGGER.warning(message)
- else:
- _LOGGER.error(message)
- rows = failed_rows
+ # The log level is:
+ # - WARNING when we are continuing to retry, and have a deadline.
+ # - ERROR when we will no longer retry, or MAY retry forever.
+ log_level = (
+ logging.WARN if should_retry or
+ self._retry_strategy != RetryStrategy.RETRY_ALWAYS else
+ logging.ERROR)
+
+ _LOGGER.log(log_level, message)
if not should_retry:
break
else:
- retry_backoff = next(self._backoff_calculator)
_LOGGER.info(
'Sleeping %s seconds before retrying insertion.', retry_backoff)
time.sleep(retry_backoff)
+ rows = [fr[0] for fr in failed_rows]
self._throttled_secs.inc(retry_backoff)
self._total_buffered_rows -= len(self._rows_buffer[destination])
del self._rows_buffer[destination]
- return [
+ return itertools.chain([
pvalue.TaggedOutput(
- BigQueryWriteFn.FAILED_ROWS,
- GlobalWindows.windowed_value((destination, row)))
- for row in failed_rows
- ]
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+ GlobalWindows.windowed_value((destination, row, err))) for row,
+ err in failed_rows
+ ],
+ [
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS,
+ GlobalWindows.windowed_value(
+ (destination, row))) for row,
+ unused_err in failed_rows
+ ])
# The number of shards per destination when writing via streaming inserts.
@@ -1815,7 +1848,8 @@ class _StreamToBigQuery(PTransform):
ignore_insert_ids,
ignore_unknown_columns,
with_auto_sharding,
- test_client=None):
+ test_client=None,
+ max_retries=None):
self.table_reference = table_reference
self.table_side_inputs = table_side_inputs
self.schema_side_inputs = schema_side_inputs
@@ -1831,6 +1865,7 @@ class _StreamToBigQuery(PTransform):
self.ignore_insert_ids = ignore_insert_ids
self.ignore_unknown_columns = ignore_unknown_columns
self.with_auto_sharding = with_auto_sharding
+ self.max_retries = max_retries or MAX_INSERT_RETRIES
class InsertIdPrefixFn(DoFn):
def start_bundle(self):
@@ -1856,7 +1891,8 @@ class _StreamToBigQuery(PTransform):
additional_bq_parameters=self.additional_bq_parameters,
ignore_insert_ids=self.ignore_insert_ids,
ignore_unknown_columns=self.ignore_unknown_columns,
- with_batched_input=self.with_auto_sharding)
+ with_batched_input=self.with_auto_sharding,
+ max_retries=self.max_retries)
def _add_random_shard(element):
key = element[0]
@@ -1905,7 +1941,9 @@ class _StreamToBigQuery(PTransform):
| 'FromHashableTableRef' >> beam.Map(_restore_table_ref)
| 'StreamInsertRows' >> ParDo(
bigquery_write_fn, *self.schema_side_inputs).with_outputs(
- BigQueryWriteFn.FAILED_ROWS, main='main'))
+ BigQueryWriteFn.FAILED_ROWS,
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+ main='main'))
# Flag to be passed to WriteToBigQuery to force schema autodetection
@@ -2194,7 +2232,11 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
with_auto_sharding=self.with_auto_sharding,
test_client=self.test_client)
- return {BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS]}
+ return {
+ BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS],
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: outputs[
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS],
+ }
else:
if self._temp_file_format == bigquery_tools.FileFormat.AVRO:
if self.schema == SCHEMA_AUTODETECT:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 74722e4e538..ff2a95c7f8e 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -42,6 +42,7 @@ import apache_beam as beam
from apache_beam.internal import pickler
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
+from apache_beam.io.gcp import bigquery as beam_bq
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
from apache_beam.io.gcp.bigquery import WriteToBigQuery
@@ -91,6 +92,14 @@ except ImportError:
_LOGGER = logging.getLogger(__name__)
+def _load_or_default(filename):
+ try:
+ with open(filename) as f:
+ return json.load(f)
+ except: # pylint: disable=bare-except
+ return {}
+
+
@unittest.skipIf(
HttpError is None or gcp_bigquery is None,
'GCP dependencies are not installed')
@@ -838,6 +847,7 @@ class TestWriteToBigQuery(unittest.TestCase):
test_client=client))
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class BigQueryStreamingInsertsErrorHandling(unittest.TestCase):
# Using https://cloud.google.com/bigquery/docs/error-messages and
@@ -1233,7 +1243,8 @@ class BigQueryStreamingInsertTransformTests(unittest.TestCase):
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
- def test_failure_has_same_insert_ids(self):
+ @mock.patch('time.sleep')
+ def test_failure_has_same_insert_ids(self, unused_mock_sleep):
tempdir = '%s%s' % (self._new_tempdir(), os.sep)
file_name_1 = os.path.join(tempdir, 'file1')
file_name_2 = os.path.join(tempdir, 'file2')
@@ -1289,6 +1300,184 @@ class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
with open(file_name_1) as f1, open(file_name_2) as f2:
self.assertEqual(json.load(f1), json.load(f2))
+ @parameterized.expand([
+ param(retry_strategy=RetryStrategy.RETRY_ALWAYS),
+ param(retry_strategy=RetryStrategy.RETRY_NEVER),
+ param(retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR),
+ ])
+ def test_failure_in_some_rows_does_not_duplicate(self, retry_strategy=None):
+ with mock.patch('time.sleep'):
+ # In this test we simulate a failure to write out two out of three rows.
+ # Row 0 and row 2 fail to be written on the first attempt, and then
+ # succeed on the next attempt (if there is one).
+ tempdir = '%s%s' % (self._new_tempdir(), os.sep)
+ file_name_1 = os.path.join(tempdir, 'file1_partial')
+ file_name_2 = os.path.join(tempdir, 'file2_partial')
+
+ def store_callback(table, **kwargs):
+ insert_ids = [r for r in kwargs['row_ids']]
+ colA_values = [r['columnA'] for r in kwargs['json_rows']]
+
+ # The first time this function is called, all rows are included
+ # so we need to filter out 'failed' rows.
+ json_output_1 = {
+ 'insertIds': [insert_ids[1]], 'colA_values': [colA_values[1]]
+ }
+ # The second time this function is called, only rows 0 and 2 are incl
+ # so we don't need to filter any of them. We just write them all out.
+ json_output_2 = {'insertIds': insert_ids, 'colA_values': colA_values}
+
+ # The first time we try to insert, we save those insertions in
+ # file insert_calls1.
+ if not os.path.exists(file_name_1):
+ with open(file_name_1, 'w') as f:
+ json.dump(json_output_1, f)
+ return [
+ {
+ 'index': 0,
+ 'errors': [{
+ 'reason': 'i dont like this row'
+ }, {
+ 'reason': 'its bad'
+ }]
+ },
+ {
+ 'index': 2,
+ 'errors': [{
+ 'reason': 'i het this row'
+ }, {
+ 'reason': 'its no gud'
+ }]
+ },
+ ]
+ else:
+ with open(file_name_2, 'w') as f:
+ json.dump(json_output_2, f)
+ return []
+
+ client = mock.Mock()
+ client.insert_rows_json = mock.Mock(side_effect=store_callback)
+
+ # The expected rows to be inserted according to the insert strategy
+ if retry_strategy == RetryStrategy.RETRY_NEVER:
+ result = ['value3']
+ else: # RETRY_ALWAYS and RETRY_ON_TRANSIENT_ERRORS should insert all rows
+ result = ['value1', 'value3', 'value5']
+
+ # Using the bundle based direct runner to avoid pickling problems
+ # with mocks.
+ with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+ bq_write_out = (
+ p
+ | beam.Create([{
+ 'columnA': 'value1', 'columnB': 'value2'
+ }, {
+ 'columnA': 'value3', 'columnB': 'value4'
+ }, {
+ 'columnA': 'value5', 'columnB': 'value6'
+ }])
+ | _StreamToBigQuery(
+ table_reference='project:dataset.table',
+ table_side_inputs=[],
+ schema_side_inputs=[],
+ schema='anyschema',
+ batch_size=None,
+ triggering_frequency=None,
+ create_disposition='CREATE_NEVER',
+ write_disposition=None,
+ kms_key=None,
+ retry_strategy=retry_strategy,
+ additional_bq_parameters=[],
+ ignore_insert_ids=False,
+ ignore_unknown_columns=False,
+ with_auto_sharding=False,
+ test_client=client))
+
+ failed_values = (
+ bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+ | beam.Map(lambda x: x[1]['columnA']))
+
+ assert_that(
+ failed_values,
+ equal_to(list({'value1', 'value3', 'value5'}.difference(result))))
+
+ data1 = _load_or_default(file_name_1)
+ data2 = _load_or_default(file_name_2)
+
+ self.assertListEqual(
+ sorted(data1.get('colA_values', []) + data2.get('colA_values', [])),
+ result)
+ self.assertEqual(len(data1['colA_values']), 1)
+
+ @parameterized.expand([
+ param(retry_strategy=RetryStrategy.RETRY_ALWAYS),
+ param(retry_strategy=RetryStrategy.RETRY_NEVER),
+ param(retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR),
+ ])
+ def test_permanent_failure_in_some_rows_does_not_duplicate(
+ self, unused_sleep_mock=None, retry_strategy=None):
+ with mock.patch('time.sleep'):
+
+ def store_callback(table, **kwargs):
+ return [
+ {
+ 'index': 0,
+ 'errors': [{
+ 'reason': 'invalid'
+ }, {
+ 'reason': 'its bad'
+ }]
+ },
+ ]
+
+ client = mock.Mock()
+ client.insert_rows_json = mock.Mock(side_effect=store_callback)
+
+ # The expected rows to be inserted according to the insert strategy
+ if retry_strategy == RetryStrategy.RETRY_NEVER:
+ inserted_rows = ['value3', 'value5']
+ else: # RETRY_ALWAYS and RETRY_ON_TRANSIENT_ERRORS should insert all rows
+ inserted_rows = ['value3', 'value5']
+
+ # Using the bundle based direct runner to avoid pickling problems
+ # with mocks.
+ with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
+ bq_write_out = (
+ p
+ | beam.Create([{
+ 'columnA': 'value1', 'columnB': 'value2'
+ }, {
+ 'columnA': 'value3', 'columnB': 'value4'
+ }, {
+ 'columnA': 'value5', 'columnB': 'value6'
+ }])
+ | _StreamToBigQuery(
+ table_reference='project:dataset.table',
+ table_side_inputs=[],
+ schema_side_inputs=[],
+ schema='anyschema',
+ batch_size=None,
+ triggering_frequency=None,
+ create_disposition='CREATE_NEVER',
+ write_disposition=None,
+ kms_key=None,
+ retry_strategy=retry_strategy,
+ additional_bq_parameters=[],
+ ignore_insert_ids=False,
+ ignore_unknown_columns=False,
+ with_auto_sharding=False,
+ test_client=client,
+ max_retries=10))
+
+ failed_values = (
+ bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
+ | beam.Map(lambda x: x[1]['columnA']))
+
+ assert_that(
+ failed_values,
+ equal_to(
+ list({'value1', 'value3', 'value5'}.difference(inserted_rows))))
+
@parameterized.expand([
param(with_auto_sharding=False),
param(with_auto_sharding=True),
@@ -1353,6 +1542,7 @@ class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
self.assertEqual(out2['colA_values'], ['value5'])
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
BIG_QUERY_DATASET_ID = 'python_bq_streaming_inserts_'
@@ -1538,9 +1728,15 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
method='STREAMING_INSERTS'))
assert_that(
- r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS],
+ r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+ | beam.Map(lambda elm: (elm[0], elm[1])),
equal_to([(full_output_table_1, bad_record)]))
+ assert_that(
+ r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS],
+ equal_to([(full_output_table_1, bad_record)]),
+ label='FailedRowsMatch')
+
def tearDown(self):
request = bigquery.BigqueryDatasetsDeleteRequest(
projectId=self.project, datasetId=self.dataset_id, deleteContents=True)
@@ -1646,6 +1842,7 @@ class PubSubBigQueryIT(unittest.TestCase):
WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=20)
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class BigQueryFileLoadsIntegrationTests(unittest.TestCase):
BIG_QUERY_DATASET_ID = 'python_bq_file_loads_'
@@ -1676,12 +1873,12 @@ class BigQueryFileLoadsIntegrationTests(unittest.TestCase):
bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = 100
elements = [
{
- 'name': u'Negative infinity',
+ 'name': 'Negative infinity',
'value': -float('inf'),
'timestamp': datetime.datetime(1970, 1, 1, tzinfo=pytz.utc),
},
{
- 'name': u'Not a number',
+ 'name': 'Not a number',
'value': float('nan'),
'timestamp': datetime.datetime(2930, 12, 9, tzinfo=pytz.utc),
},
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index e4ff6082cab..3ce8d0ff7de 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -223,7 +223,7 @@ class TestBigQueryWrapper(unittest.TestCase):
self.assertTrue(client.datasets.Delete.called)
@unittest.skipIf(
- google and not hasattr(google.cloud, '_http'),
+ google and not hasattr(google.cloud, '_http'), # pylint: disable=c-extension-no-member
'Dependencies not installed')
@mock.patch('time.sleep', return_value=None)
@mock.patch('google.cloud._http.JSONConnection.http')
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index dd2283eb71d..e75b698c651 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -36,14 +36,18 @@ from parameterized import param
from parameterized import parameterized
import apache_beam as beam
+from apache_beam.io.gcp.bigquery import BigQueryWriteFn
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.bigquery_tools import FileFormat
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
+
try:
from apitools.base.py.exceptions import HttpError
except ImportError:
@@ -373,6 +377,85 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
temp_file_format=FileFormat.JSON))
+ @pytest.mark.it_postcommit
+ def test_big_query_write_insert_errors_reporting(self):
+ """
+ Test that errors returned by beam.io.WriteToBigQuery
+ contain both the failed rows amd the reason for it failing.
+ """
+ table_name = 'python_write_table'
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ input_data = [{
+ 'number': 1,
+ 'str': 'some_string',
+ }, {
+ 'number': 2
+ },
+ {
+ 'number': 3,
+ 'str': 'some_string',
+ 'additional_field_str': 'some_string',
+ }]
+
+ table_schema = {
+ "fields": [{
+ "name": "number", "type": "INTEGER", 'mode': 'REQUIRED'
+ }, {
+ "name": "str", "type": "STRING", 'mode': 'REQUIRED'
+ }]
+ }
+
+ bq_result_errors = [(
+ {
+ "number": 2
+ },
+ [{
+ "reason": "invalid",
+ "location": "",
+ "debugInfo": "",
+ "message": "Missing required field: Msg_0_CLOUD_QUERY_TABLE.str."
+ }],
+ ),
+ ({
+ "number": 3,
+ "str": "some_string",
+ "additional_field_str": "some_string"
+ },
+ [{
+ "reason": "invalid",
+ "location": "additional_field_str",
+ "debugInfo": "",
+ "message": "no such field: additional_field_str."
+ }])]
+
+ pipeline_verifiers = [
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT number, str FROM %s" % table_id,
+ data=[(1, 'some_string')]),
+ ]
+
+ args = self.test_pipeline.get_full_options_as_args(
+ on_success_matcher=hc.all_of(*pipeline_verifiers))
+
+ with beam.Pipeline(argv=args) as p:
+ # pylint: disable=expression-not-assigned
+ errors = (
+ p | 'create' >> beam.Create(input_data)
+ | 'write' >> beam.io.WriteToBigQuery(
+ table_id,
+ schema=table_schema,
+ method='STREAMING_INSERTS',
+ insert_retry_strategy='RETRY_NEVER',
+ create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+
+ assert_that(
+ errors[BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
+ | 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2])),
+ equal_to(bq_result_errors))
+
@pytest.mark.it_postcommit
@parameterized.expand([
param(file_format=FileFormat.AVRO),