You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 18:03:30 UTC

[GitHub] [beam] damccorm opened a new issue, #20522: Add error context to the BigQuery FAILED_ROWS tagged output

damccorm opened a new issue, #20522:
URL: https://github.com/apache/beam/issues/20522

   When using WriteToBigQuery in the Python SDK, a tagged output of FAILED_ROWS is returned with a list of tuples, each tuple having two elements: a string with the table destination and a dict with the row key value pairs.
   
   This tuple does not, however, contain any error context for why the error failed. I propose adding a third value to the tuple which contains an instance of InsertErrorsValueListEntry to easily provide context for the error in question.
   
   Below is a patch to implement this change (from [https://github.com/apache/beam/blob/e39294dfcab25e2fab250a4691c8ee3ac390976d/sdks/python/apache_beam/io/gcp/bigquery.py#L1186)](https://github.com/apache/beam/blob/e39294dfcab25e2fab250a4691c8ee3ac390976d/sdks/python/apache_beam/io/gcp/bigquery.py#L1186)) - I've made a patch locally and can create a PR if it helps (just need access)
   
    
   ```
   
   while True:
     passed, errors = self.bigquery_wrapper.insert_rows(
         project_id=table_reference.projectId,
   
        dataset_id=table_reference.datasetId,
         table_id=table_reference.tableId,
         rows=rows,
   
        insert_ids=insert_ids,
         skip_invalid_rows=True)
   
     failed_rows = [(rows[entry.index], entry)
   for entry in errors]
     should_retry = any(
         bigquery_tools.RetryStrategy.should_retry(
       
        self._retry_strategy, entry.errors[0].reason) for entry in errors)
     if not passed:
       message
   = (
           'There were errors inserting to BigQuery. Will{} retry. '
           'Errors were {}'.format((""
   if should_retry else " not"), errors))
       if should_retry:
         _LOGGER.warning(message)
       else:
   
        _LOGGER.error(message)
   
     rows = [rows[entry.index] for entry in errors]
   
     if not should_retry:
   
      break
     else:
       retry_backoff = next(self._backoff_calculator)
       _LOGGER.info(
           'Sleeping
   %s seconds before retrying insertion.', retry_backoff)
       time.sleep(retry_backoff)
   
   self._total_buffered_rows
   -= len(self._rows_buffer[destination])
   del self._rows_buffer[destination]
   
   return [
       pvalue.TaggedOutput(
   
          BigQueryWriteFn.FAILED_ROWS,
           GlobalWindows.windowed_value((destination, row[0], row[1])))
   
      for row in failed_rows
   ]
   
   ```
   
   
   Imported from Jira [BEAM-10585](https://issues.apache.org/jira/browse/BEAM-10585). Original Jira may contain additional context.
   Reported by: nfbuckley.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org