You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/29 04:21:10 UTC
[beam] branch master updated: [BEAM-6914] Reverting behavior of
Native BQ sink in Python (#8143)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 206d98b [BEAM-6914] Reverting behavior of Native BQ sink in Python (#8143)
206d98b is described below
commit 206d98b0765ac662730edd28d669b3db24dd851d
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Thu Mar 28 21:20:49 2019 -0700
[BEAM-6914] Reverting behavior of Native BQ sink in Python (#8143)
* Reverting normal behavior of BQ sink in Python
* Addressing comments
---
sdks/python/apache_beam/io/gcp/bigquery.py | 30 +++++++++++++++++-----
.../apache_beam/io/gcp/bigquery_file_loads.py | 10 +-------
.../apache_beam/io/gcp/bigquery_file_loads_test.py | 20 ++++++++-------
sdks/python/apache_beam/io/gcp/bigquery_test.py | 6 +++--
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 19 ++++++++++++++
.../runners/dataflow/dataflow_runner.py | 29 +++++++++++++++++++++
6 files changed, 88 insertions(+), 26 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index f94f31b..e295cca 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -138,6 +138,7 @@ from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options import value_provider as vp
+from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
@@ -691,6 +692,8 @@ class BigQueryWriteFn(DoFn):
bigquery_tools.parse_table_reference(destination),
schema)
+ destination = bigquery_tools.get_hashable_destination(destination)
+
row = element[1]
self._rows_buffer[destination].append(row)
self._total_buffered_rows += 1
@@ -952,19 +955,34 @@ bigquery_v2_messages.TableSchema):
else:
raise TypeError('Unexpected schema argument: %s.' % schema)
- def expand(self, pcoll):
- p = pcoll.pipeline
+ def _compute_method(self, pipeline, options):
+ experiments = options.view_as(DebugOptions).experiments or []
# TODO(pabloem): Use a different method to determine if streaming or batch.
- standard_options = p.options.view_as(StandardOptions)
+ streaming_pipeline = pipeline.options.view_as(StandardOptions).streaming
+
+ # If the new BQ sink is not activated for experiment flags, then we use
+ # streaming inserts by default (it gets overridden in dataflow_runner.py).
+ if 'use_beam_bq_sink' not in experiments:
+ return self.Method.STREAMING_INSERTS
+ elif self.method == self.Method.DEFAULT and streaming_pipeline:
+ return self.Method.STREAMING_INSERTS
+ elif self.method == self.Method.DEFAULT and not streaming_pipeline:
+ return self.Method.FILE_LOADS
+ else:
+ return self.method
+
+ def expand(self, pcoll):
+ p = pcoll.pipeline
if (isinstance(self.table_reference, bigquery.TableReference)
and self.table_reference.projectId is None):
self.table_reference.projectId = pcoll.pipeline.options.view_as(
GoogleCloudOptions).project
- if (standard_options.streaming or
- self.method == WriteToBigQuery.Method.STREAMING_INSERTS):
+ method_to_use = self._compute_method(p, p.options)
+
+ if method_to_use == WriteToBigQuery.Method.STREAMING_INSERTS:
# TODO: Support load jobs for streaming pipelines.
bigquery_write_fn = BigQueryWriteFn(
schema=self.schema,
@@ -983,7 +1001,7 @@ bigquery_v2_messages.TableSchema):
return {BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS]}
else:
- if standard_options.streaming:
+ if p.options.view_as(StandardOptions).streaming:
raise NotImplementedError(
'File Loads to BigQuery are only supported on Batch pipelines.')
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index f6d5c83..4bc51d1 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -176,14 +176,6 @@ class WriteRecordsToFile(beam.DoFn):
'coder': self.coder.__class__.__name__
}
- @staticmethod
- def get_hashable_destination(destination):
- if isinstance(destination, bigquery_api.TableReference):
- return '%s:%s.%s' % (
- destination.projectId, destination.datasetId, destination.tableId)
- else:
- return destination
-
def start_bundle(self):
self._destination_to_file_writer = {}
@@ -192,7 +184,7 @@ class WriteRecordsToFile(beam.DoFn):
Destination may be a ``TableReference`` or a string, and row is a
Python dictionary for a row to be inserted to BigQuery."""
- destination = WriteRecordsToFile.get_hashable_destination(element[0])
+ destination = bigquery_tools.get_hashable_destination(element[0])
row = element[1]
if destination in self._destination_to_file_writer:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 9391b66..651029a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -120,7 +120,7 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
destinations = (
dest_file_pc
| "GetDests" >> beam.Map(
- lambda x: bqfl.WriteRecordsToFile.get_hashable_destination(x[0])))
+ lambda x: bigquery_tools.get_hashable_destination(x[0])))
assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)),
label='check destinations ')
@@ -146,7 +146,7 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
files_per_dest = (
files_per_dest
| "GetDests" >> beam.Map(
- lambda x: (bqfl.WriteRecordsToFile.get_hashable_destination(x[0]),
+ lambda x: (bigquery_tools.get_hashable_destination(x[0]),
x[1]))
)
assert_that(files_per_dest,
@@ -187,7 +187,7 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
files_per_dest = (
files_per_dest
| "GetDests" >> beam.Map(
- lambda x: (bqfl.WriteRecordsToFile.get_hashable_destination(x[0]),
+ lambda x: (bigquery_tools.get_hashable_destination(x[0]),
x[1])))
# Only table1 and table3 get files. table2 records get spilled.
@@ -236,7 +236,7 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
destinations = (
output_pc
| "GetDests" >> beam.Map(
- lambda x: bqfl.WriteRecordsToFile.get_hashable_destination(x[0])))
+ lambda x: bigquery_tools.get_hashable_destination(x[0])))
assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)),
label='check destinations ')
@@ -257,7 +257,7 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
files_per_dest = (
files_per_dest
| "GetDests" >> beam.Map(
- lambda x: (bqfl.WriteRecordsToFile.get_hashable_destination(x[0]),
+ lambda x: (bigquery_tools.get_hashable_destination(x[0]),
x[1])))
assert_that(files_per_dest,
equal_to([('project1:dataset1.table1', 4),
@@ -292,7 +292,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
bq_client.jobs.Insert.return_value = result_job
- transform = bigquery.WriteToBigQuery(
+ transform = bqfl.BigQueryBatchFileLoads(
destination,
custom_gcs_temp_location=self._new_tempdir(),
test_client=bq_client,
@@ -312,7 +312,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
dest_files
| "GetDests" >> beam.Map(
lambda x: (
- bqfl.WriteRecordsToFile.get_hashable_destination(x[0]), x[1]))
+ bigquery_tools.get_hashable_destination(x[0]), x[1]))
| "GetUniques" >> beam.combiners.Count.PerKey()
| "GetFinalDests" >>beam.Keys())
@@ -394,7 +394,8 @@ class BigQueryFileLoadsIT(unittest.TestCase):
if 'foundation' in d])]
args = self.test_pipeline.get_full_options_as_args(
- on_success_matcher=all_of(*pipeline_verifiers))
+ on_success_matcher=all_of(*pipeline_verifiers),
+ experiments='use_beam_bq_sink')
with beam.Pipeline(argv=args) as p:
input = p | beam.Create(_ELEMENTS)
@@ -451,7 +452,8 @@ class BigQueryFileLoadsIT(unittest.TestCase):
query="SELECT * FROM %s" % output_table_2,
data=[])]
- args = self.test_pipeline.get_full_options_as_args()
+ args = self.test_pipeline.get_full_options_as_args(
+ experiments='use_beam_bq_sink')
with self.assertRaises(Exception):
with beam.Pipeline(argv=args) as p:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 5beb983..be8bace 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -515,7 +515,8 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
if 'language' in d])]
args = self.test_pipeline.get_full_options_as_args(
- on_success_matcher=hc.all_of(*pipeline_verifiers))
+ on_success_matcher=hc.all_of(*pipeline_verifiers),
+ experiments='use_beam_bq_sink')
with beam.Pipeline(argv=args) as p:
input = p | beam.Create([row for row in _ELEMENTS if 'language' in row])
@@ -564,7 +565,8 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
if 'foundation' in d])]
args = self.test_pipeline.get_full_options_as_args(
- on_success_matcher=hc.all_of(*pipeline_verifiers))
+ on_success_matcher=hc.all_of(*pipeline_verifiers),
+ experiments='use_beam_bq_sink')
with beam.Pipeline(argv=args) as p:
input = p | beam.Create(_ELEMENTS)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index ca7216a..8a79077 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -75,6 +75,25 @@ def default_encoder(obj):
"Object of type '%s' is not JSON serializable" % type(obj).__name__)
+def get_hashable_destination(destination):
+ """Parses a table reference into a (project, dataset, table) tuple.
+
+ Args:
+ destination: Either a TableReference object from the bigquery API.
+ The object has the following attributes: projectId, datasetId, and
+ tableId. Or a string representing the destination containing
+ 'PROJECT:DATASET.TABLE'.
+ Returns:
+ A string representing the destination containing
+ 'PROJECT:DATASET.TABLE'.
+ """
+ if isinstance(destination, bigquery.TableReference):
+ return '%s:%s.%s' % (
+ destination.projectId, destination.datasetId, destination.tableId)
+ else:
+ return destination
+
+
def parse_table_schema_from_json(schema_string):
"""Parse the Table Schema provided as string.
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 486268e..b4630af 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -23,6 +23,7 @@ to the Dataflow Service for remote execution by a worker.
from __future__ import absolute_import
from __future__ import division
+import json
import logging
import threading
import time
@@ -590,6 +591,34 @@ class DataflowRunner(PipelineRunner):
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+ def apply_WriteToBigQuery(self, transform, pcoll, options):
+ # Make sure this is the WriteToBigQuery class that we expected, and that
+ # users did not specifically request the new BQ sink by passing experiment
+ # flag.
+
+ # TODO(BEAM-6928): Remove this function for release 2.14.0.
+ experiments = options.view_as(DebugOptions).experiments or []
+ if (not isinstance(transform, beam.io.WriteToBigQuery)
+ or 'use_beam_bq_sink' in experiments):
+ return self.apply_PTransform(transform, pcoll, options)
+ standard_options = options.view_as(StandardOptions)
+ if standard_options.streaming:
+ if (transform.write_disposition ==
+ beam.io.BigQueryDisposition.WRITE_TRUNCATE):
+ raise RuntimeError('Can not use write truncation mode in streaming')
+ return self.apply_PTransform(transform, pcoll, options)
+ else:
+ from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
+ return pcoll | 'WriteToBigQuery' >> beam.io.Write(
+ beam.io.BigQuerySink(
+ transform.table_reference.tableId,
+ transform.table_reference.datasetId,
+ transform.table_reference.projectId,
+ parse_table_schema_from_json(json.dumps(transform.schema)),
+ transform.create_disposition,
+ transform.write_disposition,
+ kms_key=transform.kms_key))
+
def apply_GroupByKey(self, transform, pcoll, options):
# Infer coder of parent.
#