You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/05/27 22:23:17 UTC
[beam] branch master updated: Add to/from_runner_api_parameters to
WriteToBigQuery
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6cf105a Add to/from_runner_api_parameters to WriteToBigQuery
new 6658d62 Merge pull request #11745 from [BEAM-9692] Add to/from_runner_api_parameters to WriteToBigQuery
6cf105a is described below
commit 6cf105ae5823e01b793fb4ddbb346342cd606634
Author: Sam Rohde <ro...@gmail.com>
AuthorDate: Mon May 18 12:40:43 2020 -0700
Add to/from_runner_api_parameters to WriteToBigQuery
Change-Id: Ifd969174d5a7744766173f35fd6b65a72ebfd924
---
sdks/python/apache_beam/io/gcp/bigquery.py | 72 +++++++++++++++++++++++++
sdks/python/apache_beam/io/gcp/bigquery_test.py | 67 +++++++++++++++++++++++
2 files changed, 139 insertions(+)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index fd6b370..f0ad457 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -271,6 +271,8 @@ from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
from apache_beam.transforms import PTransform
from apache_beam.transforms.display import DisplayDataItem
+from apache_beam.transforms.sideinputs import SIDE_INPUT_PREFIX
+from apache_beam.transforms.sideinputs import get_sideinput_index
from apache_beam.transforms.window import GlobalWindows
from apache_beam.utils import retry
from apache_beam.utils.annotations import deprecated
@@ -1390,6 +1392,9 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
and
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json.
"""
+ self._table = table
+ self._dataset = dataset
+ self._project = project
self.table_reference = bigquery_tools.parse_table_reference(
table, dataset, project)
self.create_disposition = BigQueryDisposition.validate_create(
@@ -1517,6 +1522,73 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
res['table'] = DisplayDataItem(tableSpec, label='Table')
return res
+ def to_runner_api_parameter(self, context):
+ from apache_beam.internal import pickler
+
+ # It'd be nice to name these according to their actual
+ # names/positions in the orignal argument list, but such a
+ # transformation is currently irreversible given how
+ # remove_objects_from_args and insert_values_in_args
+ # are currently implemented.
+ def serialize(side_inputs):
+ return {(SIDE_INPUT_PREFIX + '%s') % ix:
+ si.to_runner_api(context).SerializeToString()
+ for ix,
+ si in enumerate(side_inputs)}
+
+ table_side_inputs = serialize(self.table_side_inputs)
+ schema_side_inputs = serialize(self.schema_side_inputs)
+
+ config = {
+ 'table': self._table,
+ 'dataset': self._dataset,
+ 'project': self._project,
+ 'schema': self.schema,
+ 'create_disposition': self.create_disposition,
+ 'write_disposition': self.write_disposition,
+ 'kms_key': self.kms_key,
+ 'batch_size': self.batch_size,
+ 'max_file_size': self.max_file_size,
+ 'max_files_per_bundle': self.max_files_per_bundle,
+ 'custom_gcs_temp_location': self.custom_gcs_temp_location,
+ 'method': self.method,
+ 'insert_retry_strategy': self.insert_retry_strategy,
+ 'additional_bq_parameters': self.additional_bq_parameters,
+ 'table_side_inputs': table_side_inputs,
+ 'schema_side_inputs': schema_side_inputs,
+ 'triggering_frequency': self.triggering_frequency,
+ 'validate': self._validate,
+ 'temp_file_format': self._temp_file_format,
+ }
+ return 'beam:transform:write_to_big_query:v0', pickler.dumps(config)
+
+ @PTransform.register_urn('beam:transform:write_to_big_query:v0', bytes)
+ def from_runner_api(unused_ptransform, payload, context):
+ from apache_beam.internal import pickler
+ from apache_beam.portability.api.beam_runner_api_pb2 import SideInput
+
+ config = pickler.loads(payload)
+
+ def deserialize(side_inputs):
+ deserialized_side_inputs = {}
+ for k, v in side_inputs.items():
+ side_input = SideInput()
+ side_input.ParseFromString(v)
+ deserialized_side_inputs[k] = side_input
+
+ # This is an ordered list stored as a dict (see the comments in
+ # to_runner_api_parameter above).
+ indexed_side_inputs = [(
+ get_sideinput_index(tag),
+ pvalue.AsSideInput.from_runner_api(si, context)) for tag,
+ si in deserialized_side_inputs.items()]
+ return [si for _, si in sorted(indexed_side_inputs)]
+
+ config['table_side_inputs'] = deserialize(config['table_side_inputs'])
+ config['schema_side_inputs'] = deserialize(config['schema_side_inputs'])
+
+ return WriteToBigQuery(**config)
+
class _PassThroughThenCleanup(PTransform):
"""A PTransform that invokes a DoFn after the input PCollection has been
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 8c2bfe8..5c05978 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -584,6 +584,73 @@ class TestWriteToBigQuery(unittest.TestCase):
schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT,
temp_file_format=bigquery_tools.FileFormat.AVRO))
+ def test_to_from_runner_api(self):
+ """Tests that serialization of WriteToBigQuery is correct.
+
+ This is not intended to be a change-detector test. As such, this only tests
+ the more complicated serialization logic of parameters: ValueProviders,
+ callables, and side inputs.
+ """
+ FULL_OUTPUT_TABLE = 'test_project:output_table'
+
+ p = TestPipeline(
+ additional_pipeline_args=["--experiments=use_beam_bq_sink"])
+
+ # Used for testing side input parameters.
+ table_record_pcv = beam.pvalue.AsDict(
+ p | "MakeTable" >> beam.Create([('table', FULL_OUTPUT_TABLE)]))
+
+ # Used for testing value provider parameters.
+ schema = value_provider.StaticValueProvider(str, '"a:str"')
+
+ original = WriteToBigQuery(
+ table=lambda _,
+ side_input: side_input['table'],
+ table_side_inputs=(table_record_pcv, ),
+ schema=schema)
+
+ # pylint: disable=expression-not-assigned
+ p | 'MyWriteToBigQuery' >> original
+
+ # Run the pipeline through to generate a pipeline proto from an empty
+ # context. This ensures that the serialization code ran.
+ pipeline_proto, context = TestPipeline.from_runner_api(
+ p.to_runner_api(), p.runner, p.get_pipeline_options()).to_runner_api(
+ return_context=True)
+
+ # Find the transform from the context.
+ write_to_bq_id = [
+ k for k,
+ v in pipeline_proto.components.transforms.items()
+ if v.unique_name == 'MyWriteToBigQuery'
+ ][0]
+ deserialized_node = context.transforms.get_by_id(write_to_bq_id)
+ deserialized = deserialized_node.transform
+ self.assertIsInstance(deserialized, WriteToBigQuery)
+
+ # Test that the serialization of a value provider is correct.
+ self.assertEqual(original.schema, deserialized.schema)
+
+ # Test that the serialization of a callable is correct.
+ self.assertEqual(
+ deserialized._table(None, {'table': FULL_OUTPUT_TABLE}),
+ FULL_OUTPUT_TABLE)
+
+ # Test that the serialization of a side input is correct.
+ self.assertEqual(
+ len(original.table_side_inputs), len(deserialized.table_side_inputs))
+ original_side_input_data = original.table_side_inputs[0]._side_input_data()
+ deserialized_side_input_data = deserialized.table_side_inputs[
+ 0]._side_input_data()
+ self.assertEqual(
+ original_side_input_data.access_pattern,
+ deserialized_side_input_data.access_pattern)
+ self.assertEqual(
+ original_side_input_data.window_mapping_fn,
+ deserialized_side_input_data.window_mapping_fn)
+ self.assertEqual(
+ original_side_input_data.view_fn, deserialized_side_input_data.view_fn)
+
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class BigQueryStreamingInsertTransformTests(unittest.TestCase):