You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Renato Martins Leite (Jira)" <ji...@apache.org> on 2020/08/05 12:39:00 UTC

[jira] [Created] (BEAM-10640) Return ERROR details from apache_beam.io.gcp.bigquery.WriteToBigQuery failed inserts

Renato Martins Leite created BEAM-10640:
-------------------------------------------

             Summary: Return ERROR details from apache_beam.io.gcp.bigquery.WriteToBigQuery failed inserts
                 Key: BEAM-10640
                 URL: https://issues.apache.org/jira/browse/BEAM-10640
             Project: Beam
          Issue Type: Improvement
          Components: beam-community, io-py-gcp
    Affects Versions: 2.23.0
         Environment: LocalRunner, Beam v2.23
            Reporter: Renato Martins Leite
            Assignee: Aizhamal Nurmamat kyzy


In:
class BigQueryWriteFn(DoFn):
 - def _flush_batch(self, destination):
 
Return an additional pvalue.TaggedOutput with the detailed ERROR from failed insertion to BigQuery.
 
Today the error returns only the row (payload) of the error, like this:
{code:java}
// return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination, row)))
for row in failed_rows
]
{code}
 
For error analysis it is super important to understand WHAT is causing the error.
In this same function, we only need to return the error from BigQuery in an additional pvalue.TaggedOutput:
 
{code:java}
// passed, errors = self.bigquery_wrapper.insert_rows(
project_id=table_reference.projectId,          dataset_id=table_reference.datasetId,
table_id=table_reference.tableId,
rows=rows,
insert_ids=insert_ids,
skip_invalid_rows=True)
{code}
The new return would look like this:
 
{code:java}
// return [ 
pvalue.TaggedOutput( 
BigQueryWriteFn.FAILED_ROWS, 
GlobalWindows.windowed_value((destination, row, error))) 
for row in failed_rows 
]{code}
 Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)