You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Sreela Das (Jira)" <ji...@apache.org> on 2022/04/26 15:43:00 UTC
[jira] [Commented] (BEAM-10585) Add error context to the BigQuery FAILED_ROWS tagged output
[ https://issues.apache.org/jira/browse/BEAM-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528256#comment-17528256 ]
Sreela Das commented on BEAM-10585:
-----------------------------------
This is an important feature that we really need to be able to use the python SDK
> Add error context to the BigQuery FAILED_ROWS tagged output
> -----------------------------------------------------------
>
> Key: BEAM-10585
> URL: https://issues.apache.org/jira/browse/BEAM-10585
> Project: Beam
> Issue Type: New Feature
> Components: io-py-gcp
> Reporter: Neil Buckley
> Priority: P3
> Original Estimate: 0h
> Remaining Estimate: 0h
>
> When using WriteToBigQuery in the Python SDK, a tagged output of FAILED_ROWS is returned with a list of tuples, each tuple having two elements: a string with the table destination and a dict with the row key value pairs.
> This tuple does not, however, contain any error context for why the error failed. I propose adding a third value to the tuple which contains an instance of InsertErrorsValueListEntry to easily provide context for the error in question.
> Below is a patch to implement this change (from [https://github.com/apache/beam/blob/e39294dfcab25e2fab250a4691c8ee3ac390976d/sdks/python/apache_beam/io/gcp/bigquery.py#L1186)] - I've made a patch locally and can create a PR if it helps (just need access)
>
> {code:java}
> while True:
> passed, errors = self.bigquery_wrapper.insert_rows(
> project_id=table_reference.projectId,
> dataset_id=table_reference.datasetId,
> table_id=table_reference.tableId,
> rows=rows,
> insert_ids=insert_ids,
> skip_invalid_rows=True)
> failed_rows = [(rows[entry.index], entry) for entry in errors]
> should_retry = any(
> bigquery_tools.RetryStrategy.should_retry(
> self._retry_strategy, entry.errors[0].reason) for entry in errors)
> if not passed:
> message = (
> 'There were errors inserting to BigQuery. Will{} retry. '
> 'Errors were {}'.format(("" if should_retry else " not"), errors))
> if should_retry:
> _LOGGER.warning(message)
> else:
> _LOGGER.error(message)
> rows = [rows[entry.index] for entry in errors]
> if not should_retry:
> break
> else:
> retry_backoff = next(self._backoff_calculator)
> _LOGGER.info(
> 'Sleeping %s seconds before retrying insertion.', retry_backoff)
> time.sleep(retry_backoff)
> self._total_buffered_rows -= len(self._rows_buffer[destination])
> del self._rows_buffer[destination]
> return [
> pvalue.TaggedOutput(
> BigQueryWriteFn.FAILED_ROWS,
> GlobalWindows.windowed_value((destination, row[0], row[1])))
> for row in failed_rows
> ]
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)