You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Renato Martins Leite (Jira)" <ji...@apache.org> on 2020/08/05 12:39:00 UTC
[jira] [Created] (BEAM-10640) Return ERROR details from
apache_beam.io.gcp.bigquery.WriteToBigQuery failed inserts
Renato Martins Leite created BEAM-10640:
-------------------------------------------
Summary: Return ERROR details from apache_beam.io.gcp.bigquery.WriteToBigQuery failed inserts
Key: BEAM-10640
URL: https://issues.apache.org/jira/browse/BEAM-10640
Project: Beam
Issue Type: Improvement
Components: beam-community, io-py-gcp
Affects Versions: 2.23.0
Environment: LocalRunner, Beam v2.23
Reporter: Renato Martins Leite
Assignee: Aizhamal Nurmamat kyzy
In:
class BigQueryWriteFn(DoFn):
- def _flush_batch(self, destination):
Return an additional pvalue.TaggedOutput with the detailed ERROR from failed insertion to BigQuery.
Today the error returns only the row (payload) of the error, like this:
{code:java}
// return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination, row)))
for row in failed_rows
]
{code}
For error analysis it is super important to understand WHAT is causing the error.
In this same function, we only need to return the error from BigQuery in an additional pvalue.TaggedOutput:
{code:java}
// passed, errors = self.bigquery_wrapper.insert_rows(
project_id=table_reference.projectId, dataset_id=table_reference.datasetId,
table_id=table_reference.tableId,
rows=rows,
insert_ids=insert_ids,
skip_invalid_rows=True)
{code}
The new return would look like this:
{code:java}
// return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination, row, error)))
for row in failed_rows
]{code}
Thank you!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)