You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Sreela Das (Jira)" <ji...@apache.org> on 2022/04/26 15:43:00 UTC

[jira] [Commented] (BEAM-10585) Add error context to the BigQuery FAILED_ROWS tagged output

    [ https://issues.apache.org/jira/browse/BEAM-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528256#comment-17528256 ] 

Sreela Das commented on BEAM-10585:
-----------------------------------

This is an important feature that we really need to be able to use the python SDK 

> Add error context to the BigQuery FAILED_ROWS tagged output
> -----------------------------------------------------------
>
>                 Key: BEAM-10585
>                 URL: https://issues.apache.org/jira/browse/BEAM-10585
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-py-gcp
>            Reporter: Neil Buckley
>            Priority: P3
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> When using WriteToBigQuery in the Python SDK, a tagged output of FAILED_ROWS is returned with a list of tuples, each tuple having two elements: a string with the table destination and a dict with the row key value pairs.
> This tuple does not, however, contain any error context for why the error failed. I propose adding a third value to the tuple which contains an instance of InsertErrorsValueListEntry to easily provide context for the error in question.
> Below is a patch to implement this change (from [https://github.com/apache/beam/blob/e39294dfcab25e2fab250a4691c8ee3ac390976d/sdks/python/apache_beam/io/gcp/bigquery.py#L1186)] - I've made a patch locally and can create a PR if it helps (just need access)
>  
> {code:java}
> while True:
>   passed, errors = self.bigquery_wrapper.insert_rows(
>       project_id=table_reference.projectId,
>       dataset_id=table_reference.datasetId,
>       table_id=table_reference.tableId,
>       rows=rows,
>       insert_ids=insert_ids,
>       skip_invalid_rows=True)
>   failed_rows = [(rows[entry.index], entry) for entry in errors]
>   should_retry = any(
>       bigquery_tools.RetryStrategy.should_retry(
>           self._retry_strategy, entry.errors[0].reason) for entry in errors)
>   if not passed:
>     message = (
>         'There were errors inserting to BigQuery. Will{} retry. '
>         'Errors were {}'.format(("" if should_retry else " not"), errors))
>     if should_retry:
>       _LOGGER.warning(message)
>     else:
>       _LOGGER.error(message)
>   rows = [rows[entry.index] for entry in errors]
>   if not should_retry:
>     break
>   else:
>     retry_backoff = next(self._backoff_calculator)
>     _LOGGER.info(
>         'Sleeping %s seconds before retrying insertion.', retry_backoff)
>     time.sleep(retry_backoff)
> self._total_buffered_rows -= len(self._rows_buffer[destination])
> del self._rows_buffer[destination]
> return [
>     pvalue.TaggedOutput(
>         BigQueryWriteFn.FAILED_ROWS,
>         GlobalWindows.windowed_value((destination, row[0], row[1])))
>     for row in failed_rows
> ]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)