You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/29 04:21:10 UTC

[beam] branch master updated: [BEAM-6914] Reverting behavior of Native BQ sink in Python (#8143)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 206d98b  [BEAM-6914] Reverting behavior of Native BQ sink in Python (#8143)
206d98b is described below

commit 206d98b0765ac662730edd28d669b3db24dd851d
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Thu Mar 28 21:20:49 2019 -0700

    [BEAM-6914] Reverting behavior of Native BQ sink in Python (#8143)
    
    * Reverting normal behavior of BQ sink in Python
    
    * Addressing comments
---
 sdks/python/apache_beam/io/gcp/bigquery.py         | 30 +++++++++++++++++-----
 .../apache_beam/io/gcp/bigquery_file_loads.py      | 10 +-------
 .../apache_beam/io/gcp/bigquery_file_loads_test.py | 20 ++++++++-------
 sdks/python/apache_beam/io/gcp/bigquery_test.py    |  6 +++--
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 19 ++++++++++++++
 .../runners/dataflow/dataflow_runner.py            | 29 +++++++++++++++++++++
 6 files changed, 88 insertions(+), 26 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index f94f31b..e295cca 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -138,6 +138,7 @@ from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.io.gcp import bigquery_tools
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.options import value_provider as vp
+from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
@@ -691,6 +692,8 @@ class BigQueryWriteFn(DoFn):
         bigquery_tools.parse_table_reference(destination),
         schema)
 
+    destination = bigquery_tools.get_hashable_destination(destination)
+
     row = element[1]
     self._rows_buffer[destination].append(row)
     self._total_buffered_rows += 1
@@ -952,19 +955,34 @@ bigquery_v2_messages.TableSchema):
     else:
       raise TypeError('Unexpected schema argument: %s.' % schema)
 
-  def expand(self, pcoll):
-    p = pcoll.pipeline
+  def _compute_method(self, pipeline, options):
+    experiments = options.view_as(DebugOptions).experiments or []
 
     # TODO(pabloem): Use a different method to determine if streaming or batch.
-    standard_options = p.options.view_as(StandardOptions)
+    streaming_pipeline = pipeline.options.view_as(StandardOptions).streaming
+
+    # If the new BQ sink is not activated for experiment flags, then we use
+    # streaming inserts by default (it gets overridden in dataflow_runner.py).
+    if 'use_beam_bq_sink' not in experiments:
+      return self.Method.STREAMING_INSERTS
+    elif self.method == self.Method.DEFAULT and streaming_pipeline:
+      return self.Method.STREAMING_INSERTS
+    elif self.method == self.Method.DEFAULT and not streaming_pipeline:
+      return self.Method.FILE_LOADS
+    else:
+      return self.method
+
+  def expand(self, pcoll):
+    p = pcoll.pipeline
 
     if (isinstance(self.table_reference, bigquery.TableReference)
         and self.table_reference.projectId is None):
       self.table_reference.projectId = pcoll.pipeline.options.view_as(
           GoogleCloudOptions).project
 
-    if (standard_options.streaming or
-        self.method == WriteToBigQuery.Method.STREAMING_INSERTS):
+    method_to_use = self._compute_method(p, p.options)
+
+    if method_to_use == WriteToBigQuery.Method.STREAMING_INSERTS:
       # TODO: Support load jobs for streaming pipelines.
       bigquery_write_fn = BigQueryWriteFn(
           schema=self.schema,
@@ -983,7 +1001,7 @@ bigquery_v2_messages.TableSchema):
 
       return {BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS]}
     else:
-      if standard_options.streaming:
+      if p.options.view_as(StandardOptions).streaming:
         raise NotImplementedError(
             'File Loads to BigQuery are only supported on Batch pipelines.')
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index f6d5c83..4bc51d1 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -176,14 +176,6 @@ class WriteRecordsToFile(beam.DoFn):
         'coder': self.coder.__class__.__name__
     }
 
-  @staticmethod
-  def get_hashable_destination(destination):
-    if isinstance(destination, bigquery_api.TableReference):
-      return '%s:%s.%s' % (
-          destination.projectId, destination.datasetId, destination.tableId)
-    else:
-      return destination
-
   def start_bundle(self):
     self._destination_to_file_writer = {}
 
@@ -192,7 +184,7 @@ class WriteRecordsToFile(beam.DoFn):
 
     Destination may be a ``TableReference`` or a string, and row is a
     Python dictionary for a row to be inserted to BigQuery."""
-    destination = WriteRecordsToFile.get_hashable_destination(element[0])
+    destination = bigquery_tools.get_hashable_destination(element[0])
     row = element[1]
 
     if destination in self._destination_to_file_writer:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 9391b66..651029a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -120,7 +120,7 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
       destinations = (
           dest_file_pc
           | "GetDests" >> beam.Map(
-              lambda x: bqfl.WriteRecordsToFile.get_hashable_destination(x[0])))
+              lambda x: bigquery_tools.get_hashable_destination(x[0])))
       assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)),
                   label='check destinations ')
 
@@ -146,7 +146,7 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
       files_per_dest = (
           files_per_dest
           | "GetDests" >> beam.Map(
-              lambda x: (bqfl.WriteRecordsToFile.get_hashable_destination(x[0]),
+              lambda x: (bigquery_tools.get_hashable_destination(x[0]),
                          x[1]))
       )
       assert_that(files_per_dest,
@@ -187,7 +187,7 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
       files_per_dest = (
           files_per_dest
           | "GetDests" >> beam.Map(
-              lambda x: (bqfl.WriteRecordsToFile.get_hashable_destination(x[0]),
+              lambda x: (bigquery_tools.get_hashable_destination(x[0]),
                          x[1])))
 
       # Only table1 and table3 get files. table2 records get spilled.
@@ -236,7 +236,7 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
       destinations = (
           output_pc
           | "GetDests" >> beam.Map(
-              lambda x: bqfl.WriteRecordsToFile.get_hashable_destination(x[0])))
+              lambda x: bigquery_tools.get_hashable_destination(x[0])))
       assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)),
                   label='check destinations ')
 
@@ -257,7 +257,7 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
       files_per_dest = (
           files_per_dest
           | "GetDests" >> beam.Map(
-              lambda x: (bqfl.WriteRecordsToFile.get_hashable_destination(x[0]),
+              lambda x: (bigquery_tools.get_hashable_destination(x[0]),
                          x[1])))
       assert_that(files_per_dest,
                   equal_to([('project1:dataset1.table1', 4),
@@ -292,7 +292,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
 
     bq_client.jobs.Insert.return_value = result_job
 
-    transform = bigquery.WriteToBigQuery(
+    transform = bqfl.BigQueryBatchFileLoads(
         destination,
         custom_gcs_temp_location=self._new_tempdir(),
         test_client=bq_client,
@@ -312,7 +312,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
           dest_files
           | "GetDests" >> beam.Map(
               lambda x: (
-                  bqfl.WriteRecordsToFile.get_hashable_destination(x[0]), x[1]))
+                  bigquery_tools.get_hashable_destination(x[0]), x[1]))
           | "GetUniques" >> beam.combiners.Count.PerKey()
           | "GetFinalDests" >>beam.Keys())
 
@@ -394,7 +394,8 @@ class BigQueryFileLoadsIT(unittest.TestCase):
                   if 'foundation' in d])]
 
     args = self.test_pipeline.get_full_options_as_args(
-        on_success_matcher=all_of(*pipeline_verifiers))
+        on_success_matcher=all_of(*pipeline_verifiers),
+        experiments='use_beam_bq_sink')
 
     with beam.Pipeline(argv=args) as p:
       input = p | beam.Create(_ELEMENTS)
@@ -451,7 +452,8 @@ class BigQueryFileLoadsIT(unittest.TestCase):
             query="SELECT * FROM %s" % output_table_2,
             data=[])]
 
-    args = self.test_pipeline.get_full_options_as_args()
+    args = self.test_pipeline.get_full_options_as_args(
+        experiments='use_beam_bq_sink')
 
     with self.assertRaises(Exception):
       with beam.Pipeline(argv=args) as p:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 5beb983..be8bace 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -515,7 +515,8 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
                   if 'language' in d])]
 
     args = self.test_pipeline.get_full_options_as_args(
-        on_success_matcher=hc.all_of(*pipeline_verifiers))
+        on_success_matcher=hc.all_of(*pipeline_verifiers),
+        experiments='use_beam_bq_sink')
 
     with beam.Pipeline(argv=args) as p:
       input = p | beam.Create([row for row in _ELEMENTS if 'language' in row])
@@ -564,7 +565,8 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
                   if 'foundation' in d])]
 
     args = self.test_pipeline.get_full_options_as_args(
-        on_success_matcher=hc.all_of(*pipeline_verifiers))
+        on_success_matcher=hc.all_of(*pipeline_verifiers),
+        experiments='use_beam_bq_sink')
 
     with beam.Pipeline(argv=args) as p:
       input = p | beam.Create(_ELEMENTS)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index ca7216a..8a79077 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -75,6 +75,25 @@ def default_encoder(obj):
       "Object of type '%s' is not JSON serializable" % type(obj).__name__)
 
 
+def get_hashable_destination(destination):
+  """Parses a table reference into a (project, dataset, table) tuple.
+
+  Args:
+    destination: Either a TableReference object from the bigquery API.
+      The object has the following attributes: projectId, datasetId, and
+      tableId. Or a string representing the destination containing
+      'PROJECT:DATASET.TABLE'.
+  Returns:
+    A string representing the destination containing
+    'PROJECT:DATASET.TABLE'.
+  """
+  if isinstance(destination, bigquery.TableReference):
+    return '%s:%s.%s' % (
+        destination.projectId, destination.datasetId, destination.tableId)
+  else:
+    return destination
+
+
 def parse_table_schema_from_json(schema_string):
   """Parse the Table Schema provided as string.
 
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 486268e..b4630af 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -23,6 +23,7 @@ to the Dataflow Service for remote execution by a worker.
 from __future__ import absolute_import
 from __future__ import division
 
+import json
 import logging
 import threading
 import time
@@ -590,6 +591,34 @@ class DataflowRunner(PipelineRunner):
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
 
+  def apply_WriteToBigQuery(self, transform, pcoll, options):
+    # Make sure this is the WriteToBigQuery class that we expected, and that
+    # users did not specifically request the new BQ sink by passing experiment
+    # flag.
+
+    # TODO(BEAM-6928): Remove this function for release 2.14.0.
+    experiments = options.view_as(DebugOptions).experiments or []
+    if (not isinstance(transform, beam.io.WriteToBigQuery)
+        or 'use_beam_bq_sink' in experiments):
+      return self.apply_PTransform(transform, pcoll, options)
+    standard_options = options.view_as(StandardOptions)
+    if standard_options.streaming:
+      if (transform.write_disposition ==
+          beam.io.BigQueryDisposition.WRITE_TRUNCATE):
+        raise RuntimeError('Can not use write truncation mode in streaming')
+      return self.apply_PTransform(transform, pcoll, options)
+    else:
+      from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
+      return pcoll  | 'WriteToBigQuery' >> beam.io.Write(
+          beam.io.BigQuerySink(
+              transform.table_reference.tableId,
+              transform.table_reference.datasetId,
+              transform.table_reference.projectId,
+              parse_table_schema_from_json(json.dumps(transform.schema)),
+              transform.create_disposition,
+              transform.write_disposition,
+              kms_key=transform.kms_key))
+
   def apply_GroupByKey(self, transform, pcoll, options):
     # Infer coder of parent.
     #