You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/07/06 22:33:02 UTC

[beam] branch master updated: Merge pull request #21872 from Standardizing output of WriteToBigQuery

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 74a2a1b071b Merge pull request #21872 from Standardizing output of WriteToBigQuery
74a2a1b071b is described below

commit 74a2a1b071b72d31199477af5abab8e9b04494f7
Author: Ahmed Abualsaud <65...@users.noreply.github.com>
AuthorDate: Wed Jul 6 18:32:56 2022 -0400

    Merge pull request #21872 from Standardizing output of WriteToBigQuery
    
    * writeresult prototype
    
    * error message tweak
    
    * type annotations and using property objects
    
    * using property decorators instead
    
    * deleting unnecessary schema and destination attributes
    
    * added documentation
    
    * added example
    
    * documentation location change
    
    * updated documentation
    
    * updated documentation
    
    * return types added
    
    * putting JobReference import in try block. fixing short underline
---
 sdks/python/apache_beam/io/gcp/bigquery.py | 215 ++++++++++++++++++++++++++++-
 1 file changed, 208 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 8d8433f13af..5ce7519d3f2 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -235,6 +235,73 @@ also take a callable that receives a table reference.
 [2] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert
 [3] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
 
+Chaining of operations after WriteToBigQuery
+--------------------------------------------
+WritToBigQuery returns an object with several PCollections that consist of
+metadata about the write operations. These are useful to inspect the write
+operation and follow with the results::
+
+  schema = {'fields': [
+      {'name': 'column', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+  error_schema = {'fields': [
+      {'name': 'destination', 'type': 'STRING', 'mode': 'NULLABLE'},
+      {'name': 'row', 'type': 'STRING', 'mode': 'NULLABLE'},
+      {'name': 'error_message', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+  with Pipeline() as p:
+    result = (p
+      | 'Create Columns' >> beam.Create([
+              {'column': 'value'},
+              {'bad_column': 'bad_value'}
+            ])
+      | 'Write Data' >> WriteToBigQuery(
+              method=WriteToBigQuery.Method.STREAMING_INSERTS,
+              table=my_table,
+              schema=schema,
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER
+            ))
+
+    _ = (result.failed_rows_with_errors
+      | 'Get Errors' >> beam.Map(lambda e: {
+              "destination": e[0],
+              "row": json.dumps(e[1]),
+              "error_message": e[2][0]['message']
+            })
+      | 'Write Errors' >> WriteToBigQuery(
+              method=WriteToBigQuery.Method.STREAMING_INSERTS,
+              table=error_log_table,
+              schema=error_schema,
+            ))
+
+Often, the simplest use case is to chain an operation after writing data to
+BigQuery.To do this, one can chain the operation after one of the output
+PCollections. A generic way in which this operation (independent of write
+method) could look like::
+
+  def chain_after(result):
+    try:
+      # This works for FILE_LOADS, where we run load and possibly copy jobs.
+      return (result.load_jobid_pairs, result.copy_jobid_pairs) | beam.Flatten()
+    except AttributeError:
+      # Works for STREAMING_INSERTS, where we return the rows BigQuery rejected
+      return result.failed_rows
+
+  result = (pcoll | WriteToBigQuery(...))
+
+  _ = (chain_after(result)
+       | beam.Reshuffle() # Force a 'commit' of the intermediate date
+       | MyOperationAfterWriteToBQ())
+
+Attributes can be accessed using dot notation or bracket notation:
+```
+result.failed_rows                  <--> result['FailedRows']
+result.failed_rows_with_errors      <--> result['FailedRowsWithErrors']
+result.destination_load_jobid_pairs <--> result['destination_load_jobid_pairs']
+result.destination_file_pairs       <--> result['destination_file_pairs']
+result.destination_copy_jobid_pairs <--> result['destination_copy_jobid_pairs']
+```
+
 
 *** Short introduction to BigQuery concepts ***
 Tables have rows (TableRow) and each row has cells (TableCell).
@@ -287,6 +354,7 @@ from dataclasses import dataclass
 from typing import Dict
 from typing import List
 from typing import Optional
+from typing import Tuple
 from typing import Union
 
 import fastavro
@@ -322,6 +390,7 @@ from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.value_provider import StaticValueProvider
 from apache_beam.options.value_provider import ValueProvider
 from apache_beam.options.value_provider import check_accessible
+from apache_beam.pvalue import PCollection
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms import DoFn
 from apache_beam.transforms import ParDo
@@ -338,9 +407,11 @@ from apache_beam.utils.annotations import experimental
 try:
   from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference
   from apache_beam.io.gcp.internal.clients.bigquery import TableReference
+  from apache_beam.io.gcp.internal.clients.bigquery import JobReference
 except ImportError:
   DatasetReference = None
   TableReference = None
+  JobReference = None
 
 _LOGGER = logging.getLogger(__name__)
 
@@ -359,6 +430,7 @@ __all__ = [
     'BigQuerySink',
     'BigQueryQueryPriority',
     'WriteToBigQuery',
+    'WriteResult',
     'ReadFromBigQuery',
     'ReadFromBigQueryRequest',
     'ReadAllFromBigQuery',
@@ -2239,11 +2311,11 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
           with_auto_sharding=self.with_auto_sharding,
           test_client=self.test_client)
 
-      return {
-          BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS],
-          BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: outputs[
-              BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS],
-      }
+      return WriteResult(
+          method=WriteToBigQuery.Method.STREAMING_INSERTS,
+          failed_rows=outputs[BigQueryWriteFn.FAILED_ROWS],
+          failed_rows_with_errors=outputs[
+              BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS])
     else:
       if self._temp_file_format == bigquery_tools.FileFormat.AVRO:
         if self.schema == SCHEMA_AUTODETECT:
@@ -2273,14 +2345,14 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
 
         find_in_nested_dict(self.schema)
 
-      from apache_beam.io.gcp import bigquery_file_loads
+      from apache_beam.io.gcp.bigquery_file_loads import BigQueryBatchFileLoads
       # Only cast to int when a value is given.
       # We only use an int for BigQueryBatchFileLoads
       if self.triggering_frequency is not None:
         triggering_frequency = int(self.triggering_frequency)
       else:
         triggering_frequency = self.triggering_frequency
-      return pcoll | bigquery_file_loads.BigQueryBatchFileLoads(
+      output = pcoll | BigQueryBatchFileLoads(
           destination=self.table_reference,
           schema=self.schema,
           create_disposition=self.create_disposition,
@@ -2299,6 +2371,15 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
           is_streaming_pipeline=is_streaming_pipeline,
           load_job_project_id=self.load_job_project_id)
 
+      return WriteResult(
+          method=WriteToBigQuery.Method.FILE_LOADS,
+          destination_load_jobid_pairs=output[
+              BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS],
+          destination_file_pairs=output[
+              BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS],
+          destination_copy_jobid_pairs=output[
+              BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS])
+
   def display_data(self):
     res = {}
     if self.table_reference is not None and isinstance(self.table_reference,
@@ -2380,6 +2461,126 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
     return WriteToBigQuery(**config)
 
 
+class WriteResult:
+  """The result of a WriteToBigQuery transform.
+  """
+  def __init__(
+      self,
+      method: WriteToBigQuery.Method = None,
+      destination_load_jobid_pairs: PCollection[Tuple[str,
+                                                      JobReference]] = None,
+      destination_file_pairs: PCollection[Tuple[str, Tuple[str, int]]] = None,
+      destination_copy_jobid_pairs: PCollection[Tuple[str,
+                                                      JobReference]] = None,
+      failed_rows: PCollection[Tuple[str, dict]] = None,
+      failed_rows_with_errors: PCollection[Tuple[str, dict, list]] = None):
+
+    self._method = method
+    self._destination_load_jobid_pairs = destination_load_jobid_pairs
+    self._destination_file_pairs = destination_file_pairs
+    self._destination_copy_jobid_pairs = destination_copy_jobid_pairs
+    self._failed_rows = failed_rows
+    self._failed_rows_with_errors = failed_rows_with_errors
+
+    from apache_beam.io.gcp.bigquery_file_loads import BigQueryBatchFileLoads
+    self.attributes = {
+        BigQueryWriteFn.FAILED_ROWS: WriteResult.failed_rows,
+        BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: WriteResult.
+        failed_rows_with_errors,
+        BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS: WriteResult.
+        destination_load_jobid_pairs,
+        BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS: WriteResult.
+        destination_file_pairs,
+        BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS: WriteResult.
+        destination_copy_jobid_pairs,
+    }
+
+  def validate(self, method, attribute):
+    if self._method != method:
+      raise AttributeError(
+          f'Cannot get {attribute} because it is not produced '
+          f'by the {self._method} write method. Note: only '
+          f'{method} produces this attribute.')
+
+  @property
+  def destination_load_jobid_pairs(
+      self) -> PCollection[Tuple[str, JobReference]]:
+    """A ``FILE_LOADS`` method attribute
+
+    Returns: A PCollection of the table destinations that were successfully
+      loaded to using the batch load API, along with the load job IDs.
+
+    Raises: AttributeError: if accessed with a write method
+    besides ``FILE_LOADS``."""
+    self.validate(WriteToBigQuery.Method.FILE_LOADS, 'DESTINATION_JOBID_PAIRS')
+
+    return self._destination_load_jobid_pairs
+
+  @property
+  def destination_file_pairs(self) -> PCollection[Tuple[str, Tuple[str, int]]]:
+    """A ``FILE_LOADS`` method attribute
+
+    Returns: A PCollection of the table destinations along with the
+      temp files used as sources to load from.
+
+    Raises: AttributeError: if accessed with a write method
+    besides ``FILE_LOADS``."""
+    self.validate(WriteToBigQuery.Method.FILE_LOADS, 'DESTINATION_FILE_PAIRS')
+
+    return self._destination_file_pairs
+
+  @property
+  def destination_copy_jobid_pairs(
+      self) -> PCollection[Tuple[str, JobReference]]:
+    """A ``FILE_LOADS`` method attribute
+
+    Returns: A PCollection of the table destinations that were successfully
+      copied to, along with the copy job ID.
+
+    Raises: AttributeError: if accessed with a write method
+    besides ``FILE_LOADS``."""
+    self.validate(
+        WriteToBigQuery.Method.FILE_LOADS, 'DESTINATION_COPY_JOBID_PAIRS')
+
+    return self._destination_copy_jobid_pairs
+
+  @property
+  def failed_rows(self) -> PCollection[Tuple[str, dict]]:
+    """A ``STREAMING_INSERTS`` method attribute
+
+    Returns: A PCollection of rows that failed when inserting to BigQuery.
+
+    Raises: AttributeError: if accessed with a write method
+    besides ``STREAMING_INSERTS``."""
+    self.validate(WriteToBigQuery.Method.STREAMING_INSERTS, 'FAILED_ROWS')
+
+    return self._failed_rows
+
+  @property
+  def failed_rows_with_errors(self) -> PCollection[Tuple[str, dict, list]]:
+    """A ``STREAMING_INSERTS`` method attribute
+
+    Returns:
+      A PCollection of rows that failed when inserting to BigQuery,
+      along with their errors.
+
+    Raises:
+      AttributeError: if accessed with a write method
+      besides ``STREAMING_INSERTS``."""
+    self.validate(
+        WriteToBigQuery.Method.STREAMING_INSERTS, 'FAILED_ROWS_WITH_ERRORS')
+
+    return self._failed_rows_with_errors
+
+  def __getitem__(self, key):
+    if key not in self.attributes:
+      raise AttributeError(
+          f'Error trying to access nonexistent attribute `{key}` in write '
+          'result. Please see __documentation__ for available attributes.')
+
+    return self.attributes[key].__get__(self, WriteResult)
+
+
 class ReadFromBigQuery(PTransform):
   """Read data from BigQuery.