You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/09/11 17:08:03 UTC

[jira] [Assigned] (BEAM-10640) Return ERROR details from apache_beam.io.gcp.bigquery.WriteToBigQuery failed inserts

     [ https://issues.apache.org/jira/browse/BEAM-10640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot reassigned BEAM-10640:
------------------------------------

    Assignee:     (was: Aizhamal Nurmamat kyzy)

> Return ERROR details from apache_beam.io.gcp.bigquery.WriteToBigQuery failed inserts
> ------------------------------------------------------------------------------------
>
>                 Key: BEAM-10640
>                 URL: https://issues.apache.org/jira/browse/BEAM-10640
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-community, io-py-gcp
>    Affects Versions: 2.23.0
>         Environment: LocalRunner, Beam v2.23
>            Reporter: Renato Martins Leite
>            Priority: P1
>              Labels: stale-assigned
>
> In:
>  class BigQueryWriteFn(DoFn):
>   - def _flush_batch(self, destination):
>   
>  Return an additional pvalue.TaggedOutput with the detailed ERROR from failed insertion to BigQuery.
>   
>  Today the error returns only the row (payload) of the error, like this:
> {code:java}
> // Return Statement
> return [
> pvalue.TaggedOutput(
> BigQueryWriteFn.FAILED_ROWS,
> GlobalWindows.windowed_value((destination, row)))
> for row in failed_rows
> ]
> {code}
>  
>  For error analysis it is super important to understand WHAT is causing the error.
>  In this same function, we only need to return the error from BigQuery in an additional pvalue.TaggedOutput:
>   
> {code:java}
> // Function that captures the error
> passed, errors = self.bigquery_wrapper.insert_rows(
> project_id=table_reference.projectId,          
> dataset_id=table_reference.datasetId,
> table_id=table_reference.tableId,
> rows=rows,
> insert_ids=insert_ids,
> skip_invalid_rows=True)
> {code}
> The new return would look like this:
>   
> {code:java}
> // new return statement
> return [ 
> pvalue.TaggedOutput( 
> BigQueryWriteFn.FAILED_ROWS, 
> GlobalWindows.windowed_value((destination, row, error))) 
> for row in failed_rows 
> ]{code}
>  Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)