You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/07/06 22:33:02 UTC
[beam] branch master updated: Merge pull request #21872 from Standardizing output of WriteToBigQuery
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 74a2a1b071b Merge pull request #21872 from Standardizing output of WriteToBigQuery
74a2a1b071b is described below
commit 74a2a1b071b72d31199477af5abab8e9b04494f7
Author: Ahmed Abualsaud <65...@users.noreply.github.com>
AuthorDate: Wed Jul 6 18:32:56 2022 -0400
Merge pull request #21872 from Standardizing output of WriteToBigQuery
* writeresult prototype
* error message tweak
* type annotations and using property objects
* using property decorators instead
* deleting unnecessary schema and destination attributes
* added documentation
* added example
* documentation location change
* updated documentation
* updated documentation
* return types added
* putting JobReference import in try block. fixing short underline
---
sdks/python/apache_beam/io/gcp/bigquery.py | 215 ++++++++++++++++++++++++++++-
1 file changed, 208 insertions(+), 7 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 8d8433f13af..5ce7519d3f2 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -235,6 +235,73 @@ also take a callable that receives a table reference.
[2] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert
[3] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
+Chaining of operations after WriteToBigQuery
+--------------------------------------------
+WritToBigQuery returns an object with several PCollections that consist of
+metadata about the write operations. These are useful to inspect the write
+operation and follow with the results::
+
+ schema = {'fields': [
+ {'name': 'column', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+ error_schema = {'fields': [
+ {'name': 'destination', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'row', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'error_message', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+ with Pipeline() as p:
+ result = (p
+ | 'Create Columns' >> beam.Create([
+ {'column': 'value'},
+ {'bad_column': 'bad_value'}
+ ])
+ | 'Write Data' >> WriteToBigQuery(
+ method=WriteToBigQuery.Method.STREAMING_INSERTS,
+ table=my_table,
+ schema=schema,
+ insert_retry_strategy=RetryStrategy.RETRY_NEVER
+ ))
+
+ _ = (result.failed_rows_with_errors
+ | 'Get Errors' >> beam.Map(lambda e: {
+ "destination": e[0],
+ "row": json.dumps(e[1]),
+ "error_message": e[2][0]['message']
+ })
+ | 'Write Errors' >> WriteToBigQuery(
+ method=WriteToBigQuery.Method.STREAMING_INSERTS,
+ table=error_log_table,
+ schema=error_schema,
+ ))
+
+Often, the simplest use case is to chain an operation after writing data to
+BigQuery.To do this, one can chain the operation after one of the output
+PCollections. A generic way in which this operation (independent of write
+method) could look like::
+
+ def chain_after(result):
+ try:
+ # This works for FILE_LOADS, where we run load and possibly copy jobs.
+ return (result.load_jobid_pairs, result.copy_jobid_pairs) | beam.Flatten()
+ except AttributeError:
+ # Works for STREAMING_INSERTS, where we return the rows BigQuery rejected
+ return result.failed_rows
+
+ result = (pcoll | WriteToBigQuery(...))
+
+ _ = (chain_after(result)
+ | beam.Reshuffle() # Force a 'commit' of the intermediate date
+ | MyOperationAfterWriteToBQ())
+
+Attributes can be accessed using dot notation or bracket notation:
+```
+result.failed_rows <--> result['FailedRows']
+result.failed_rows_with_errors <--> result['FailedRowsWithErrors']
+result.destination_load_jobid_pairs <--> result['destination_load_jobid_pairs']
+result.destination_file_pairs <--> result['destination_file_pairs']
+result.destination_copy_jobid_pairs <--> result['destination_copy_jobid_pairs']
+```
+
*** Short introduction to BigQuery concepts ***
Tables have rows (TableRow) and each row has cells (TableCell).
@@ -287,6 +354,7 @@ from dataclasses import dataclass
from typing import Dict
from typing import List
from typing import Optional
+from typing import Tuple
from typing import Union
import fastavro
@@ -322,6 +390,7 @@ from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.value_provider import ValueProvider
from apache_beam.options.value_provider import check_accessible
+from apache_beam.pvalue import PCollection
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
@@ -338,9 +407,11 @@ from apache_beam.utils.annotations import experimental
try:
from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference
from apache_beam.io.gcp.internal.clients.bigquery import TableReference
+ from apache_beam.io.gcp.internal.clients.bigquery import JobReference
except ImportError:
DatasetReference = None
TableReference = None
+ JobReference = None
_LOGGER = logging.getLogger(__name__)
@@ -359,6 +430,7 @@ __all__ = [
'BigQuerySink',
'BigQueryQueryPriority',
'WriteToBigQuery',
+ 'WriteResult',
'ReadFromBigQuery',
'ReadFromBigQueryRequest',
'ReadAllFromBigQuery',
@@ -2239,11 +2311,11 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
with_auto_sharding=self.with_auto_sharding,
test_client=self.test_client)
- return {
- BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS],
- BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: outputs[
- BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS],
- }
+ return WriteResult(
+ method=WriteToBigQuery.Method.STREAMING_INSERTS,
+ failed_rows=outputs[BigQueryWriteFn.FAILED_ROWS],
+ failed_rows_with_errors=outputs[
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS])
else:
if self._temp_file_format == bigquery_tools.FileFormat.AVRO:
if self.schema == SCHEMA_AUTODETECT:
@@ -2273,14 +2345,14 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
find_in_nested_dict(self.schema)
- from apache_beam.io.gcp import bigquery_file_loads
+ from apache_beam.io.gcp.bigquery_file_loads import BigQueryBatchFileLoads
# Only cast to int when a value is given.
# We only use an int for BigQueryBatchFileLoads
if self.triggering_frequency is not None:
triggering_frequency = int(self.triggering_frequency)
else:
triggering_frequency = self.triggering_frequency
- return pcoll | bigquery_file_loads.BigQueryBatchFileLoads(
+ output = pcoll | BigQueryBatchFileLoads(
destination=self.table_reference,
schema=self.schema,
create_disposition=self.create_disposition,
@@ -2299,6 +2371,15 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
is_streaming_pipeline=is_streaming_pipeline,
load_job_project_id=self.load_job_project_id)
+ return WriteResult(
+ method=WriteToBigQuery.Method.FILE_LOADS,
+ destination_load_jobid_pairs=output[
+ BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS],
+ destination_file_pairs=output[
+ BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS],
+ destination_copy_jobid_pairs=output[
+ BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS])
+
def display_data(self):
res = {}
if self.table_reference is not None and isinstance(self.table_reference,
@@ -2380,6 +2461,126 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
return WriteToBigQuery(**config)
+class WriteResult:
+ """The result of a WriteToBigQuery transform.
+ """
+ def __init__(
+ self,
+ method: WriteToBigQuery.Method = None,
+ destination_load_jobid_pairs: PCollection[Tuple[str,
+ JobReference]] = None,
+ destination_file_pairs: PCollection[Tuple[str, Tuple[str, int]]] = None,
+ destination_copy_jobid_pairs: PCollection[Tuple[str,
+ JobReference]] = None,
+ failed_rows: PCollection[Tuple[str, dict]] = None,
+ failed_rows_with_errors: PCollection[Tuple[str, dict, list]] = None):
+
+ self._method = method
+ self._destination_load_jobid_pairs = destination_load_jobid_pairs
+ self._destination_file_pairs = destination_file_pairs
+ self._destination_copy_jobid_pairs = destination_copy_jobid_pairs
+ self._failed_rows = failed_rows
+ self._failed_rows_with_errors = failed_rows_with_errors
+
+ from apache_beam.io.gcp.bigquery_file_loads import BigQueryBatchFileLoads
+ self.attributes = {
+ BigQueryWriteFn.FAILED_ROWS: WriteResult.failed_rows,
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: WriteResult.
+ failed_rows_with_errors,
+ BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS: WriteResult.
+ destination_load_jobid_pairs,
+ BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS: WriteResult.
+ destination_file_pairs,
+ BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS: WriteResult.
+ destination_copy_jobid_pairs,
+ }
+
+ def validate(self, method, attribute):
+ if self._method != method:
+ raise AttributeError(
+ f'Cannot get {attribute} because it is not produced '
+ f'by the {self._method} write method. Note: only '
+ f'{method} produces this attribute.')
+
+ @property
+ def destination_load_jobid_pairs(
+ self) -> PCollection[Tuple[str, JobReference]]:
+ """A ``FILE_LOADS`` method attribute
+
+ Returns: A PCollection of the table destinations that were successfully
+ loaded to using the batch load API, along with the load job IDs.
+
+ Raises: AttributeError: if accessed with a write method
+ besides ``FILE_LOADS``."""
+ self.validate(WriteToBigQuery.Method.FILE_LOADS, 'DESTINATION_JOBID_PAIRS')
+
+ return self._destination_load_jobid_pairs
+
+ @property
+ def destination_file_pairs(self) -> PCollection[Tuple[str, Tuple[str, int]]]:
+ """A ``FILE_LOADS`` method attribute
+
+ Returns: A PCollection of the table destinations along with the
+ temp files used as sources to load from.
+
+ Raises: AttributeError: if accessed with a write method
+ besides ``FILE_LOADS``."""
+ self.validate(WriteToBigQuery.Method.FILE_LOADS, 'DESTINATION_FILE_PAIRS')
+
+ return self._destination_file_pairs
+
+ @property
+ def destination_copy_jobid_pairs(
+ self) -> PCollection[Tuple[str, JobReference]]:
+ """A ``FILE_LOADS`` method attribute
+
+ Returns: A PCollection of the table destinations that were successfully
+ copied to, along with the copy job ID.
+
+ Raises: AttributeError: if accessed with a write method
+ besides ``FILE_LOADS``."""
+ self.validate(
+ WriteToBigQuery.Method.FILE_LOADS, 'DESTINATION_COPY_JOBID_PAIRS')
+
+ return self._destination_copy_jobid_pairs
+
+ @property
+ def failed_rows(self) -> PCollection[Tuple[str, dict]]:
+ """A ``STREAMING_INSERTS`` method attribute
+
+ Returns: A PCollection of rows that failed when inserting to BigQuery.
+
+ Raises: AttributeError: if accessed with a write method
+ besides ``STREAMING_INSERTS``."""
+ self.validate(WriteToBigQuery.Method.STREAMING_INSERTS, 'FAILED_ROWS')
+
+ return self._failed_rows
+
+ @property
+ def failed_rows_with_errors(self) -> PCollection[Tuple[str, dict, list]]:
+ """A ``STREAMING_INSERTS`` method attribute
+
+ Returns:
+ A PCollection of rows that failed when inserting to BigQuery,
+ along with their errors.
+
+ Raises:
+ AttributeError: if accessed with a write method
+ besides ``STREAMING_INSERTS``."""
+ self.validate(
+ WriteToBigQuery.Method.STREAMING_INSERTS, 'FAILED_ROWS_WITH_ERRORS')
+
+ return self._failed_rows_with_errors
+
+ def __getitem__(self, key):
+ if key not in self.attributes:
+ raise AttributeError(
+ f'Error trying to access nonexistent attribute `{key}` in write '
+ 'result. Please see __documentation__ for available attributes.')
+
+ return self.attributes[key].__get__(self, WriteResult)
+
+
class ReadFromBigQuery(PTransform):
"""Read data from BigQuery.