You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/16 01:47:20 UTC

[beam] branch master updated: [BEAM-6828] Adding ValueProvider support for BQ transforms.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c28082  [BEAM-6828] Adding ValueProvider support for BQ transforms.
     new 10d4839  Merge pull request #8045 from pabloem/vp-bq
1c28082 is described below

commit 1c280820a23ad662788183f0a75fab9865acbec0
Author: pabloem <pa...@apache.org>
AuthorDate: Tue Mar 12 16:11:56 2019 -0700

    [BEAM-6828] Adding ValueProvider support for BQ transforms.
---
 .../examples/cookbook/bigquery_tornadoes.py        |  2 +-
 sdks/python/apache_beam/io/gcp/bigquery.py         | 40 +++++++++-----------
 .../apache_beam/io/gcp/bigquery_file_loads.py      |  5 ++-
 .../apache_beam/io/gcp/bigquery_file_loads_test.py | 43 ++++++++++++++++------
 sdks/python/apache_beam/io/gcp/bigquery_test.py    | 39 ++++++++++++++++++++
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 27 ++++++++++++--
 6 files changed, 117 insertions(+), 39 deletions(-)

diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index 9db0f73..9be3f89 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -75,7 +75,7 @@ def run(argv=None):
        'or DATASET.TABLE.'))
 
   parser.add_argument('--gcs_location',
-                      required=True,
+                      required=False,
                       help=('GCS Location to store files to load '
                             'data into Bigquery'))
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index a1b6f99..27ee67a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -626,6 +626,8 @@ class BigQueryWriteFn(DoFn):
     """
     if schema is None:
       return schema
+    elif isinstance(schema, (str, unicode)):
+      return bigquery_tools.parse_table_schema_from_json(schema)
     elif isinstance(schema, dict):
       return bigquery_tools.parse_table_schema_from_json(json.dumps(schema))
     else:
@@ -644,7 +646,7 @@ class BigQueryWriteFn(DoFn):
         num_retries=10000,
         max_delay_secs=1500))
 
-  def _create_table_if_needed(self, schema, table_reference):
+  def _create_table_if_needed(self, table_reference, schema=None):
     str_table_reference = '%s:%s.%s' % (
         table_reference.projectId,
         table_reference.datasetId,
@@ -673,8 +675,8 @@ class BigQueryWriteFn(DoFn):
       schema = destination[1]
       destination = destination[0]
       self._create_table_if_needed(
-          schema,
-          bigquery_tools.parse_table_reference(destination))
+          bigquery_tools.parse_table_reference(destination),
+          schema)
 
     row = element[1]
     self._rows_buffer[destination].append(row)
@@ -766,7 +768,7 @@ class WriteToBigQuery(PTransform):
     """Initialize a WriteToBigQuery transform.
 
     Args:
-      table (str, callable): The ID of the table, or a callable
+      table (str, callable, ValueProvider): The ID of the table, or a callable
          that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
          numbers ``0-9``, or underscores ``_``. If dataset argument is
          :data:`None` then the table argument must contain the entire table
@@ -782,10 +784,11 @@ class WriteToBigQuery(PTransform):
       project (str): The ID of the project containing this table or
         :data:`None` if the table reference is specified entirely by the table
         argument.
-      schema (str): The schema to be used if the BigQuery table to write has to
-        be created. This can be either specified as a
-        :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
-bigquery_v2_messages.TableSchema`
+      schema (str,dict,ValueProvider): The schema to be used if the
+        BigQuery table to write has to be created. This can be either specified
+        as a :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
+        or a python dictionary, or the string or dictionary itself.
         object or a single string  of the form
         ``'field1:type1,field2:type2,field3:type3'`` that defines a comma
         separated list of fields. Here ``'type'`` should specify the BigQuery
@@ -925,6 +928,8 @@ bigquery_v2_messages.TableSchema):
       return WriteToBigQuery.table_schema_to_dict(table_schema)
     elif isinstance(schema, bigquery.TableSchema):
       return WriteToBigQuery.table_schema_to_dict(schema)
+    elif isinstance(schema, vp.ValueProvider):
+      return schema
     else:
       raise TypeError('Unexpected schema argument: %s.' % schema)
 
@@ -934,7 +939,7 @@ bigquery_v2_messages.TableSchema):
     # TODO(pabloem): Use a different method to determine if streaming or batch.
     standard_options = p.options.view_as(StandardOptions)
 
-    if (not callable(self.table_reference)
+    if (isinstance(self.table_reference, bigquery.TableReference)
         and self.table_reference.projectId is None):
       self.table_reference.projectId = pcoll.pipeline.options.view_as(
           GoogleCloudOptions).project
@@ -950,11 +955,10 @@ bigquery_v2_messages.TableSchema):
           retry_strategy=self.insert_retry_strategy,
           test_client=self.test_client)
 
-      # TODO: Use utility functions from BQTools
-      table_fn = self._get_table_fn()
-
       outputs = (pcoll
-                 | 'AppendDestination' >> beam.Map(lambda x: (table_fn(x), x))
+                 | 'AppendDestination' >> beam.ParDo(
+                     bigquery_tools.AppendDestinationsFn(
+                         destination=self.table_reference, schema=self.schema))
                  | 'StreamInsertRows' >> ParDo(bigquery_write_fn).with_outputs(
                      BigQueryWriteFn.FAILED_ROWS, main='main'))
 
@@ -967,7 +971,7 @@ bigquery_v2_messages.TableSchema):
       from apache_beam.io.gcp import bigquery_file_loads
       return pcoll | bigquery_file_loads.BigQueryBatchFileLoads(
           destination=self.table_reference,
-          schema=self.get_dict_table_schema(self.schema),
+          schema=self.schema,
           create_disposition=self.create_disposition,
           write_disposition=self.write_disposition,
           max_file_size=self.max_file_size,
@@ -975,14 +979,6 @@ bigquery_v2_messages.TableSchema):
           gs_location=self.gs_location,
           test_client=self.test_client)
 
-  def _get_table_fn(self):
-    if callable(self.table_reference):
-      return self.table_reference
-    elif not callable(self.table_reference) and self.schema is not None:
-      return lambda x: (self.table_reference, self.schema)
-    else:
-      return lambda x: self.table_reference
-
   def display_data(self):
     res = {}
     if self.table_reference is not None:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 93fbf6f..7395370 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -162,7 +162,8 @@ class WriteRecordsToFile(beam.DoFn):
         'coder': self.coder.__class__.__name__
     }
 
-  def _get_hashable_destination(self, destination):
+  @staticmethod
+  def get_hashable_destination(destination):
     if isinstance(destination, bigquery_api.TableReference):
       return '%s:%s.%s' % (
           destination.projectId, destination.datasetId, destination.tableId)
@@ -177,7 +178,7 @@ class WriteRecordsToFile(beam.DoFn):
 
     Destination may be a ``TableReference`` or a string, and row is a
     Python dictionary for a row to be inserted to BigQuery."""
-    destination = self._get_hashable_destination(element[0])
+    destination = WriteRecordsToFile.get_hashable_destination(element[0])
     row = element[1]
 
     if destination in self._destination_to_file_writer:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 039fbc5..44d378b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -23,7 +23,6 @@ import json
 import logging
 import os
 import random
-import sys
 import time
 import unittest
 
@@ -118,7 +117,10 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
           lambda x: hamcrest_assert(os.path.exists(x), is_(True)))
       assert_that(file_count, equal_to([3]), label='check file count')
 
-      destinations = dest_file_pc | "GetDests" >> beam.Map(lambda x: x[0])
+      destinations = (
+          dest_file_pc
+          | "GetDests" >> beam.Map(
+              lambda x: bqfl.WriteRecordsToFile.get_hashable_destination(x[0])))
       assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)),
                   label='check destinations ')
 
@@ -141,6 +143,12 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
                         | beam.Map(lambda x: x).with_output_types(
                             beam.typehints.KV[str, str])
                         | beam.combiners.Count.PerKey())
+      files_per_dest = (
+          files_per_dest
+          | "GetDests" >> beam.Map(
+              lambda x: (bqfl.WriteRecordsToFile.get_hashable_destination(x[0]),
+                         x[1]))
+      )
       assert_that(files_per_dest,
                   equal_to([('project1:dataset1.table1', 4),
                             ('project1:dataset1.table2', 2),
@@ -176,6 +184,11 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
                         | beam.Map(lambda x: x).with_output_types(
                             beam.typehints.KV[str, str])
                         | beam.combiners.Count.PerKey())
+      files_per_dest = (
+          files_per_dest
+          | "GetDests" >> beam.Map(
+              lambda x: (bqfl.WriteRecordsToFile.get_hashable_destination(x[0]),
+                         x[1])))
 
       # Only table1 and table3 get files. table2 records get spilled.
       assert_that(files_per_dest,
@@ -220,7 +233,10 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
           lambda x: hamcrest_assert(os.path.exists(x), is_(True)))
       assert_that(file_count, equal_to([3]), label='check file count')
 
-      destinations = output_pc | "GetDests" >> beam.Map(lambda x: x[0])
+      destinations = (
+          output_pc
+          | "GetDests" >> beam.Map(
+              lambda x: bqfl.WriteRecordsToFile.get_hashable_destination(x[0])))
       assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)),
                   label='check destinations ')
 
@@ -238,6 +254,11 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
 
     def check_multiple_files(output_pc):
       files_per_dest = output_pc | beam.combiners.Count.PerKey()
+      files_per_dest = (
+          files_per_dest
+          | "GetDests" >> beam.Map(
+              lambda x: (bqfl.WriteRecordsToFile.get_hashable_destination(x[0]),
+                         x[1])))
       assert_that(files_per_dest,
                   equal_to([('project1:dataset1.table1', 4),
                             ('project1:dataset1.table2', 2),
@@ -252,10 +273,6 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3'
-                   'TODO: BEAM-6711')
   def test_records_traverse_transform_with_mocks(self):
     destination = 'project1:dataset1.table1'
 
@@ -290,9 +307,13 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
       jobs = dest_job | "GetJobs" >> beam.Map(lambda x: x[1])
 
       files = dest_files | "GetFiles" >> beam.Map(lambda x: x[1])
-      destinations = (dest_files
-                      | "GetUniques" >> beam.combiners.Count.PerKey()
-                      | "GetDests" >> beam.Map(lambda x: x[0]))
+      destinations = (
+          dest_files
+          | "GetDests" >> beam.Map(
+              lambda x: (
+                  bqfl.WriteRecordsToFile.get_hashable_destination(x[0]), x[1]))
+          | "GetUniques" >> beam.combiners.Count.PerKey()
+          | "GetFinalDests" >>beam.Keys())
 
       # All files exist
       _ = (files | beam.Map(
@@ -304,7 +325,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
                   label='CountFiles')
 
       assert_that(destinations,
-                  equal_to([bigquery_tools.parse_table_reference(destination)]),
+                  equal_to([destination]),
                   label='CheckDestinations')
 
       assert_that(jobs,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 69717d2..67b2bef 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -38,6 +38,7 @@ from apache_beam.io.gcp.bigquery_file_loads_test import _ELEMENTS
 from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
+from apache_beam.options import value_provider
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
@@ -492,6 +493,44 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
                  self.dataset_id, self.project)
 
   @attr('IT')
+  def test_value_provider_transform(self):
+    output_table_1 = '%s%s' % (self.output_table, 1)
+    output_table_2 = '%s%s' % (self.output_table, 2)
+    schema = {'fields': [
+        {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
+        {'name': 'language', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % output_table_1,
+            data=[(d['name'], d['language'])
+                  for d in _ELEMENTS
+                  if 'language' in d]),
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % output_table_2,
+            data=[(d['name'], d['language'])
+                  for d in _ELEMENTS
+                  if 'language' in d])]
+
+    args = self.test_pipeline.get_full_options_as_args(
+        on_success_matcher=hc.all_of(*pipeline_verifiers))
+
+    with beam.Pipeline(argv=args) as p:
+      input = p | beam.Create([row for row in _ELEMENTS if 'language' in row])
+
+      _ = (input
+           | "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery(
+               table=value_provider.StaticValueProvider(str, output_table_1),
+               schema=value_provider.StaticValueProvider(dict, schema),
+               method='STREAMING_INSERTS'))
+      _ = (input
+           | "WriteWithMultipleDests2" >> beam.io.gcp.bigquery.WriteToBigQuery(
+               table=value_provider.StaticValueProvider(str, output_table_2),
+               method='FILE_LOADS'))
+
+  @attr('IT')
   def test_multiple_destinations_transform(self):
     output_table_1 = '%s%s' % (self.output_table, 1)
     output_table_2 = '%s%s' % (self.output_table, 2)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 5f97cf5..82ea8ea 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -45,6 +45,7 @@ from apache_beam.internal.gcp.json_value import from_json_value
 from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.internal.http_client import get_new_http
 from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.options import value_provider
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms import DoFn
@@ -142,6 +143,8 @@ def parse_table_reference(table, dataset=None, project=None):
     return table
   elif callable(table):
     return table
+  elif isinstance(table, value_provider.ValueProvider):
+    return table
 
   table_reference = bigquery.TableReference()
   # If dataset argument is not specified, the expectation is that the
@@ -994,11 +997,29 @@ class AppendDestinationsFn(DoFn):
   Experimental; no backwards compatibility guarantees.
   """
 
-  def __init__(self, destination):
+  def __init__(self, destination, schema=None):
+    self.destination = AppendDestinationsFn._get_table_fn(destination, schema)
+
+  @staticmethod
+  def _value_provider_or_static_val(elm):
+    if isinstance(elm, value_provider.ValueProvider):
+      return elm
+    else:
+      # The type argument is a NoOp, because we assume the argument already has
+      # the proper formatting.
+      return value_provider.StaticValueProvider(lambda x: x, value=elm)
+
+  @staticmethod
+  def _get_table_fn(destination, schema=None):
     if callable(destination):
-      self.destination = destination
+      return destination
+    elif not callable(destination) and schema is not None:
+      return lambda x: (
+          AppendDestinationsFn._value_provider_or_static_val(destination).get(),
+          AppendDestinationsFn._value_provider_or_static_val(schema).get())
     else:
-      self.destination = lambda x: destination
+      return lambda x: AppendDestinationsFn._value_provider_or_static_val(
+          destination).get()
 
   def process(self, element):
     yield (self.destination(element), element)