You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/09/26 17:08:02 UTC
[jira] [Commented] (BEAM-10585) Add error context to the BigQuery
FAILED_ROWS tagged output
[ https://issues.apache.org/jira/browse/BEAM-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17202636#comment-17202636 ]
Beam JIRA Bot commented on BEAM-10585:
--------------------------------------
This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.
Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.
> Add error context to the BigQuery FAILED_ROWS tagged output
> -----------------------------------------------------------
>
> Key: BEAM-10585
> URL: https://issues.apache.org/jira/browse/BEAM-10585
> Project: Beam
> Issue Type: New Feature
> Components: sdk-py-core
> Reporter: Neil Buckley
> Priority: P2
> Labels: stale-P2
> Original Estimate: 0h
> Remaining Estimate: 0h
>
> When using WriteToBigQuery in the Python SDK, a tagged output of FAILED_ROWS is returned with a list of tuples, each tuple having two elements: a string with the table destination and a dict with the row key value pairs.
> This tuple does not, however, contain any error context for why the error failed. I propose adding a third value to the tuple which contains an instance of InsertErrorsValueListEntry to easily provide context for the error in question.
> Below is a patch to implement this change (from [https://github.com/apache/beam/blob/e39294dfcab25e2fab250a4691c8ee3ac390976d/sdks/python/apache_beam/io/gcp/bigquery.py#L1186)] - I've made a patch locally and can create a PR if it helps (just need access)
>
> {code:java}
> while True:
> passed, errors = self.bigquery_wrapper.insert_rows(
> project_id=table_reference.projectId,
> dataset_id=table_reference.datasetId,
> table_id=table_reference.tableId,
> rows=rows,
> insert_ids=insert_ids,
> skip_invalid_rows=True)
> failed_rows = [(rows[entry.index], entry) for entry in errors]
> should_retry = any(
> bigquery_tools.RetryStrategy.should_retry(
> self._retry_strategy, entry.errors[0].reason) for entry in errors)
> if not passed:
> message = (
> 'There were errors inserting to BigQuery. Will{} retry. '
> 'Errors were {}'.format(("" if should_retry else " not"), errors))
> if should_retry:
> _LOGGER.warning(message)
> else:
> _LOGGER.error(message)
> rows = [rows[entry.index] for entry in errors]
> if not should_retry:
> break
> else:
> retry_backoff = next(self._backoff_calculator)
> _LOGGER.info(
> 'Sleeping %s seconds before retrying insertion.', retry_backoff)
> time.sleep(retry_backoff)
> self._total_buffered_rows -= len(self._rows_buffer[destination])
> del self._rows_buffer[destination]
> return [
> pvalue.TaggedOutput(
> BigQueryWriteFn.FAILED_ROWS,
> GlobalWindows.windowed_value((destination, row[0], row[1])))
> for row in failed_rows
> ]
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)