You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/16 01:47:20 UTC
[beam] branch master updated: [BEAM-6828] Adding ValueProvider
support for BQ transforms.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1c28082 [BEAM-6828] Adding ValueProvider support for BQ transforms.
new 10d4839 Merge pull request #8045 from pabloem/vp-bq
1c28082 is described below
commit 1c280820a23ad662788183f0a75fab9865acbec0
Author: pabloem <pa...@apache.org>
AuthorDate: Tue Mar 12 16:11:56 2019 -0700
[BEAM-6828] Adding ValueProvider support for BQ transforms.
---
.../examples/cookbook/bigquery_tornadoes.py | 2 +-
sdks/python/apache_beam/io/gcp/bigquery.py | 40 +++++++++-----------
.../apache_beam/io/gcp/bigquery_file_loads.py | 5 ++-
.../apache_beam/io/gcp/bigquery_file_loads_test.py | 43 ++++++++++++++++------
sdks/python/apache_beam/io/gcp/bigquery_test.py | 39 ++++++++++++++++++++
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 27 ++++++++++++--
6 files changed, 117 insertions(+), 39 deletions(-)
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index 9db0f73..9be3f89 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -75,7 +75,7 @@ def run(argv=None):
'or DATASET.TABLE.'))
parser.add_argument('--gcs_location',
- required=True,
+ required=False,
help=('GCS Location to store files to load '
'data into Bigquery'))
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index a1b6f99..27ee67a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -626,6 +626,8 @@ class BigQueryWriteFn(DoFn):
"""
if schema is None:
return schema
+ elif isinstance(schema, (str, unicode)):
+ return bigquery_tools.parse_table_schema_from_json(schema)
elif isinstance(schema, dict):
return bigquery_tools.parse_table_schema_from_json(json.dumps(schema))
else:
@@ -644,7 +646,7 @@ class BigQueryWriteFn(DoFn):
num_retries=10000,
max_delay_secs=1500))
- def _create_table_if_needed(self, schema, table_reference):
+ def _create_table_if_needed(self, table_reference, schema=None):
str_table_reference = '%s:%s.%s' % (
table_reference.projectId,
table_reference.datasetId,
@@ -673,8 +675,8 @@ class BigQueryWriteFn(DoFn):
schema = destination[1]
destination = destination[0]
self._create_table_if_needed(
- schema,
- bigquery_tools.parse_table_reference(destination))
+ bigquery_tools.parse_table_reference(destination),
+ schema)
row = element[1]
self._rows_buffer[destination].append(row)
@@ -766,7 +768,7 @@ class WriteToBigQuery(PTransform):
"""Initialize a WriteToBigQuery transform.
Args:
- table (str, callable): The ID of the table, or a callable
+ table (str, callable, ValueProvider): The ID of the table, or a callable
that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
numbers ``0-9``, or underscores ``_``. If dataset argument is
:data:`None` then the table argument must contain the entire table
@@ -782,10 +784,11 @@ class WriteToBigQuery(PTransform):
project (str): The ID of the project containing this table or
:data:`None` if the table reference is specified entirely by the table
argument.
- schema (str): The schema to be used if the BigQuery table to write has to
- be created. This can be either specified as a
- :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
-bigquery_v2_messages.TableSchema`
+ schema (str,dict,ValueProvider): The schema to be used if the
+ BigQuery table to write has to be created. This can be either specified
+ as a :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
+ or a python dictionary, or the string or dictionary itself.
object or a single string of the form
``'field1:type1,field2:type2,field3:type3'`` that defines a comma
separated list of fields. Here ``'type'`` should specify the BigQuery
@@ -925,6 +928,8 @@ bigquery_v2_messages.TableSchema):
return WriteToBigQuery.table_schema_to_dict(table_schema)
elif isinstance(schema, bigquery.TableSchema):
return WriteToBigQuery.table_schema_to_dict(schema)
+ elif isinstance(schema, vp.ValueProvider):
+ return schema
else:
raise TypeError('Unexpected schema argument: %s.' % schema)
@@ -934,7 +939,7 @@ bigquery_v2_messages.TableSchema):
# TODO(pabloem): Use a different method to determine if streaming or batch.
standard_options = p.options.view_as(StandardOptions)
- if (not callable(self.table_reference)
+ if (isinstance(self.table_reference, bigquery.TableReference)
and self.table_reference.projectId is None):
self.table_reference.projectId = pcoll.pipeline.options.view_as(
GoogleCloudOptions).project
@@ -950,11 +955,10 @@ bigquery_v2_messages.TableSchema):
retry_strategy=self.insert_retry_strategy,
test_client=self.test_client)
- # TODO: Use utility functions from BQTools
- table_fn = self._get_table_fn()
-
outputs = (pcoll
- | 'AppendDestination' >> beam.Map(lambda x: (table_fn(x), x))
+ | 'AppendDestination' >> beam.ParDo(
+ bigquery_tools.AppendDestinationsFn(
+ destination=self.table_reference, schema=self.schema))
| 'StreamInsertRows' >> ParDo(bigquery_write_fn).with_outputs(
BigQueryWriteFn.FAILED_ROWS, main='main'))
@@ -967,7 +971,7 @@ bigquery_v2_messages.TableSchema):
from apache_beam.io.gcp import bigquery_file_loads
return pcoll | bigquery_file_loads.BigQueryBatchFileLoads(
destination=self.table_reference,
- schema=self.get_dict_table_schema(self.schema),
+ schema=self.schema,
create_disposition=self.create_disposition,
write_disposition=self.write_disposition,
max_file_size=self.max_file_size,
@@ -975,14 +979,6 @@ bigquery_v2_messages.TableSchema):
gs_location=self.gs_location,
test_client=self.test_client)
- def _get_table_fn(self):
- if callable(self.table_reference):
- return self.table_reference
- elif not callable(self.table_reference) and self.schema is not None:
- return lambda x: (self.table_reference, self.schema)
- else:
- return lambda x: self.table_reference
-
def display_data(self):
res = {}
if self.table_reference is not None:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 93fbf6f..7395370 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -162,7 +162,8 @@ class WriteRecordsToFile(beam.DoFn):
'coder': self.coder.__class__.__name__
}
- def _get_hashable_destination(self, destination):
+ @staticmethod
+ def get_hashable_destination(destination):
if isinstance(destination, bigquery_api.TableReference):
return '%s:%s.%s' % (
destination.projectId, destination.datasetId, destination.tableId)
@@ -177,7 +178,7 @@ class WriteRecordsToFile(beam.DoFn):
Destination may be a ``TableReference`` or a string, and row is a
Python dictionary for a row to be inserted to BigQuery."""
- destination = self._get_hashable_destination(element[0])
+ destination = WriteRecordsToFile.get_hashable_destination(element[0])
row = element[1]
if destination in self._destination_to_file_writer:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 039fbc5..44d378b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -23,7 +23,6 @@ import json
import logging
import os
import random
-import sys
import time
import unittest
@@ -118,7 +117,10 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
lambda x: hamcrest_assert(os.path.exists(x), is_(True)))
assert_that(file_count, equal_to([3]), label='check file count')
- destinations = dest_file_pc | "GetDests" >> beam.Map(lambda x: x[0])
+ destinations = (
+ dest_file_pc
+ | "GetDests" >> beam.Map(
+ lambda x: bqfl.WriteRecordsToFile.get_hashable_destination(x[0])))
assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)),
label='check destinations ')
@@ -141,6 +143,12 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
| beam.Map(lambda x: x).with_output_types(
beam.typehints.KV[str, str])
| beam.combiners.Count.PerKey())
+ files_per_dest = (
+ files_per_dest
+ | "GetDests" >> beam.Map(
+ lambda x: (bqfl.WriteRecordsToFile.get_hashable_destination(x[0]),
+ x[1]))
+ )
assert_that(files_per_dest,
equal_to([('project1:dataset1.table1', 4),
('project1:dataset1.table2', 2),
@@ -176,6 +184,11 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
| beam.Map(lambda x: x).with_output_types(
beam.typehints.KV[str, str])
| beam.combiners.Count.PerKey())
+ files_per_dest = (
+ files_per_dest
+ | "GetDests" >> beam.Map(
+ lambda x: (bqfl.WriteRecordsToFile.get_hashable_destination(x[0]),
+ x[1])))
# Only table1 and table3 get files. table2 records get spilled.
assert_that(files_per_dest,
@@ -220,7 +233,10 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
lambda x: hamcrest_assert(os.path.exists(x), is_(True)))
assert_that(file_count, equal_to([3]), label='check file count')
- destinations = output_pc | "GetDests" >> beam.Map(lambda x: x[0])
+ destinations = (
+ output_pc
+ | "GetDests" >> beam.Map(
+ lambda x: bqfl.WriteRecordsToFile.get_hashable_destination(x[0])))
assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)),
label='check destinations ')
@@ -238,6 +254,11 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
def check_multiple_files(output_pc):
files_per_dest = output_pc | beam.combiners.Count.PerKey()
+ files_per_dest = (
+ files_per_dest
+ | "GetDests" >> beam.Map(
+ lambda x: (bqfl.WriteRecordsToFile.get_hashable_destination(x[0]),
+ x[1])))
assert_that(files_per_dest,
equal_to([('project1:dataset1.table1', 4),
('project1:dataset1.table2', 2),
@@ -252,10 +273,6 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3'
- 'TODO: BEAM-6711')
def test_records_traverse_transform_with_mocks(self):
destination = 'project1:dataset1.table1'
@@ -290,9 +307,13 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
jobs = dest_job | "GetJobs" >> beam.Map(lambda x: x[1])
files = dest_files | "GetFiles" >> beam.Map(lambda x: x[1])
- destinations = (dest_files
- | "GetUniques" >> beam.combiners.Count.PerKey()
- | "GetDests" >> beam.Map(lambda x: x[0]))
+ destinations = (
+ dest_files
+ | "GetDests" >> beam.Map(
+ lambda x: (
+ bqfl.WriteRecordsToFile.get_hashable_destination(x[0]), x[1]))
+ | "GetUniques" >> beam.combiners.Count.PerKey()
+ | "GetFinalDests" >>beam.Keys())
# All files exist
_ = (files | beam.Map(
@@ -304,7 +325,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
label='CountFiles')
assert_that(destinations,
- equal_to([bigquery_tools.parse_table_reference(destination)]),
+ equal_to([destination]),
label='CheckDestinations')
assert_that(jobs,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 69717d2..67b2bef 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -38,6 +38,7 @@ from apache_beam.io.gcp.bigquery_file_loads_test import _ELEMENTS
from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
+from apache_beam.options import value_provider
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
@@ -492,6 +493,44 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
self.dataset_id, self.project)
@attr('IT')
+ def test_value_provider_transform(self):
+ output_table_1 = '%s%s' % (self.output_table, 1)
+ output_table_2 = '%s%s' % (self.output_table, 2)
+ schema = {'fields': [
+ {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'language', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+ pipeline_verifiers = [
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT * FROM %s" % output_table_1,
+ data=[(d['name'], d['language'])
+ for d in _ELEMENTS
+ if 'language' in d]),
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT * FROM %s" % output_table_2,
+ data=[(d['name'], d['language'])
+ for d in _ELEMENTS
+ if 'language' in d])]
+
+ args = self.test_pipeline.get_full_options_as_args(
+ on_success_matcher=hc.all_of(*pipeline_verifiers))
+
+ with beam.Pipeline(argv=args) as p:
+ input = p | beam.Create([row for row in _ELEMENTS if 'language' in row])
+
+ _ = (input
+ | "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery(
+ table=value_provider.StaticValueProvider(str, output_table_1),
+ schema=value_provider.StaticValueProvider(dict, schema),
+ method='STREAMING_INSERTS'))
+ _ = (input
+ | "WriteWithMultipleDests2" >> beam.io.gcp.bigquery.WriteToBigQuery(
+ table=value_provider.StaticValueProvider(str, output_table_2),
+ method='FILE_LOADS'))
+
+ @attr('IT')
def test_multiple_destinations_transform(self):
output_table_1 = '%s%s' % (self.output_table, 1)
output_table_2 = '%s%s' % (self.output_table, 2)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 5f97cf5..82ea8ea 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -45,6 +45,7 @@ from apache_beam.internal.gcp.json_value import from_json_value
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.internal.http_client import get_new_http
from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.options import value_provider
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
from apache_beam.transforms import DoFn
@@ -142,6 +143,8 @@ def parse_table_reference(table, dataset=None, project=None):
return table
elif callable(table):
return table
+ elif isinstance(table, value_provider.ValueProvider):
+ return table
table_reference = bigquery.TableReference()
# If dataset argument is not specified, the expectation is that the
@@ -994,11 +997,29 @@ class AppendDestinationsFn(DoFn):
Experimental; no backwards compatibility guarantees.
"""
- def __init__(self, destination):
+ def __init__(self, destination, schema=None):
+ self.destination = AppendDestinationsFn._get_table_fn(destination, schema)
+
+ @staticmethod
+ def _value_provider_or_static_val(elm):
+ if isinstance(elm, value_provider.ValueProvider):
+ return elm
+ else:
+ # The type argument is a NoOp, because we assume the argument already has
+ # the proper formatting.
+ return value_provider.StaticValueProvider(lambda x: x, value=elm)
+
+ @staticmethod
+ def _get_table_fn(destination, schema=None):
if callable(destination):
- self.destination = destination
+ return destination
+ elif not callable(destination) and schema is not None:
+ return lambda x: (
+ AppendDestinationsFn._value_provider_or_static_val(destination).get(),
+ AppendDestinationsFn._value_provider_or_static_val(schema).get())
else:
- self.destination = lambda x: destination
+ return lambda x: AppendDestinationsFn._value_provider_or_static_val(
+ destination).get()
def process(self, element):
yield (self.destination(element), element)