You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/09/23 23:29:35 UTC
[beam] branch master updated: Merge pull request #12782 from
[BEAM-10950] Overriding Dataflow Native BQSource.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c0bde2b Merge pull request #12782 from [BEAM-10950] Overriding Dataflow Native BQSource.
c0bde2b is described below
commit c0bde2b99a5f62a220bdbf97b1df24ce6990783e
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Wed Sep 23 18:29:01 2020 -0500
Merge pull request #12782 from [BEAM-10950] Overriding Dataflow Native BQSource.
* Overriding Dataflow Native BQSource.
* Fixing tests
* Tryin to fix to_runner_proto transform
* fixup
* fixup
* fixup
* adding comment
---
.../apache_beam/examples/snippets/snippets_test.py | 1 +
.../io/gcp/big_query_query_to_table_it_test.py | 1 +
.../io/gcp/big_query_query_to_table_pipeline.py | 28 +++++--------
sdks/python/apache_beam/io/gcp/bigquery.py | 40 +++++++++++++++++-
sdks/python/apache_beam/io/gcp/bigquery_test.py | 48 +++++++++++++++-------
.../apache_beam/io/gcp/bigquery_tools_test.py | 38 +++++++++++------
sdks/python/apache_beam/io/iobase.py | 23 +++++++----
.../runners/dataflow/dataflow_runner_test.py | 22 +++++-----
8 files changed, 137 insertions(+), 64 deletions(-)
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 793ff05..df1d153 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -682,6 +682,7 @@ class SnippetsTest(unittest.TestCase):
snippets.model_bigqueryio(p, project, dataset, table)
else:
p = TestPipeline()
+ p.options.view_as(GoogleCloudOptions).temp_location = 'gs://mylocation'
snippets.model_bigqueryio(p)
def _run_test_pipeline_for_options(self, fn):
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index c2dc3cd..c806629 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -302,6 +302,7 @@ class BigQueryQueryToTableIT(unittest.TestCase):
'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
'use_standard_sql': False,
'native': True,
+ 'use_json_exports': True,
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
'on_success_matcher': all_of(*pipeline_verifiers),
'experiments': 'use_legacy_bq_sink',
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
index 50cd584..8e4b0df 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
@@ -91,24 +91,16 @@ def run_bq_pipeline(argv=None):
use_standard_sql=known_args.use_standard_sql,
use_json_exports=known_args.use_json_exports,
kms_key=kms_key)
- if known_args.native:
- _ = data | 'write' >> beam.io.Write(
- beam.io.BigQuerySink(
- known_args.output,
- schema=table_schema,
- create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
- kms_key=kms_key))
- else:
- temp_file_format = (
- 'NEWLINE_DELIMITED_JSON' if known_args.use_json_exports else 'AVRO')
- _ = data | 'write' >> beam.io.WriteToBigQuery(
- known_args.output,
- schema=table_schema,
- create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
- temp_file_format=temp_file_format,
- kms_key=kms_key)
+
+ temp_file_format = (
+ 'NEWLINE_DELIMITED_JSON' if known_args.use_json_exports else 'AVRO')
+ _ = data | 'write' >> beam.io.WriteToBigQuery(
+ known_args.output,
+ schema=table_schema,
+ create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
+ temp_file_format=temp_file_format,
+ kms_key=kms_key)
result = p.run()
result.wait_until_finish()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 9df427d..4c42ed3 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -426,7 +426,45 @@ class BigQueryDisposition(object):
# BigQuerySource, BigQuerySink.
-class BigQuerySource(dataflow_io.NativeSource):
+@deprecated(since='2.25.0', current="ReadFromBigQuery")
+def BigQuerySource(
+ table=None,
+ dataset=None,
+ project=None,
+ query=None,
+ validate=False,
+ coder=None,
+ use_standard_sql=False,
+ flatten_results=True,
+ kms_key=None,
+ use_dataflow_native_source=False):
+ if use_dataflow_native_source:
+ return _BigQuerySource(
+ table,
+ dataset,
+ project,
+ query,
+ validate,
+ coder,
+ use_standard_sql,
+ flatten_results,
+ kms_key)
+ else:
+ return ReadFromBigQuery(
+ table=table,
+ dataset=dataset,
+ project=project,
+ query=query,
+ validate=validate,
+ coder=coder,
+ use_standard_sql=use_standard_sql,
+ flatten_results=flatten_results,
+ use_json_exports=True,
+ kms_key=kms_key)
+
+
+@deprecated(since='2.25.0', current="ReadFromBigQuery")
+class _BigQuerySource(dataflow_io.NativeSource):
"""A source based on a BigQuery table."""
def __init__(
self,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 6d24f29..fa3e84b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -168,7 +168,8 @@ class TestTableRowJsonCoder(unittest.TestCase):
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQuerySource(unittest.TestCase):
def test_display_data_item_on_validate_true(self):
- source = beam.io.BigQuerySource('dataset.table', validate=True)
+ source = beam.io.BigQuerySource(
+ 'dataset.table', validate=True, use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
@@ -178,7 +179,8 @@ class TestBigQuerySource(unittest.TestCase):
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_table_reference_display_data(self):
- source = beam.io.BigQuerySource('dataset.table')
+ source = beam.io.BigQuerySource(
+ 'dataset.table', use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
@@ -186,7 +188,8 @@ class TestBigQuerySource(unittest.TestCase):
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
- source = beam.io.BigQuerySource('project:dataset.table')
+ source = beam.io.BigQuerySource(
+ 'project:dataset.table', use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
@@ -194,7 +197,8 @@ class TestBigQuerySource(unittest.TestCase):
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
- source = beam.io.BigQuerySource('xyz.com:project:dataset.table')
+ source = beam.io.BigQuerySource(
+ 'xyz.com:project:dataset.table', use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
@@ -203,27 +207,32 @@ class TestBigQuerySource(unittest.TestCase):
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_parse_table_reference(self):
- source = beam.io.BigQuerySource('dataset.table')
+ source = beam.io.BigQuerySource(
+ 'dataset.table', use_dataflow_native_source=True)
self.assertEqual(source.table_reference.datasetId, 'dataset')
self.assertEqual(source.table_reference.tableId, 'table')
- source = beam.io.BigQuerySource('project:dataset.table')
+ source = beam.io.BigQuerySource(
+ 'project:dataset.table', use_dataflow_native_source=True)
self.assertEqual(source.table_reference.projectId, 'project')
self.assertEqual(source.table_reference.datasetId, 'dataset')
self.assertEqual(source.table_reference.tableId, 'table')
- source = beam.io.BigQuerySource('xyz.com:project:dataset.table')
+ source = beam.io.BigQuerySource(
+ 'xyz.com:project:dataset.table', use_dataflow_native_source=True)
self.assertEqual(source.table_reference.projectId, 'xyz.com:project')
self.assertEqual(source.table_reference.datasetId, 'dataset')
self.assertEqual(source.table_reference.tableId, 'table')
- source = beam.io.BigQuerySource(query='my_query')
+ source = beam.io.BigQuerySource(
+ query='my_query', use_dataflow_native_source=True)
self.assertEqual(source.query, 'my_query')
self.assertIsNone(source.table_reference)
self.assertTrue(source.use_legacy_sql)
def test_query_only_display_data(self):
- source = beam.io.BigQuerySource(query='my_query')
+ source = beam.io.BigQuerySource(
+ query='my_query', use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
@@ -232,25 +241,36 @@ class TestBigQuerySource(unittest.TestCase):
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_specify_query_sql_format(self):
- source = beam.io.BigQuerySource(query='my_query', use_standard_sql=True)
+ source = beam.io.BigQuerySource(
+ query='my_query',
+ use_standard_sql=True,
+ use_dataflow_native_source=True)
self.assertEqual(source.query, 'my_query')
self.assertFalse(source.use_legacy_sql)
def test_specify_query_flattened_records(self):
- source = beam.io.BigQuerySource(query='my_query', flatten_results=False)
+ source = beam.io.BigQuerySource(
+ query='my_query',
+ flatten_results=False,
+ use_dataflow_native_source=True)
self.assertFalse(source.flatten_results)
def test_specify_query_unflattened_records(self):
- source = beam.io.BigQuerySource(query='my_query', flatten_results=True)
+ source = beam.io.BigQuerySource(
+ query='my_query', flatten_results=True, use_dataflow_native_source=True)
self.assertTrue(source.flatten_results)
def test_specify_query_without_table(self):
- source = beam.io.BigQuerySource(query='my_query')
+ source = beam.io.BigQuerySource(
+ query='my_query', use_dataflow_native_source=True)
self.assertEqual(source.query, 'my_query')
self.assertIsNone(source.table_reference)
def test_date_partitioned_table_name(self):
- source = beam.io.BigQuerySource('dataset.table$20030102', validate=True)
+ source = beam.io.BigQuerySource(
+ 'dataset.table$20030102',
+ validate=True,
+ use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', True),
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index a587b7d..82036e9 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -471,7 +471,9 @@ class TestBigQueryReader(unittest.TestCase):
client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse(
jobComplete=True, rows=table_rows, schema=schema)
actual_rows = []
- with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
+ with beam.io.BigQuerySource(
+ 'dataset.table',
+ use_dataflow_native_source=True).reader(client) as reader:
for row in reader:
actual_rows.append(row)
self.assertEqual(actual_rows, expected_rows)
@@ -485,7 +487,9 @@ class TestBigQueryReader(unittest.TestCase):
client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse(
jobComplete=True, rows=table_rows, schema=schema)
actual_rows = []
- with beam.io.BigQuerySource(query='query').reader(client) as reader:
+ with beam.io.BigQuerySource(
+ query='query',
+ use_dataflow_native_source=True).reader(client) as reader:
for row in reader:
actual_rows.append(row)
self.assertEqual(actual_rows, expected_rows)
@@ -501,8 +505,9 @@ class TestBigQueryReader(unittest.TestCase):
client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse(
jobComplete=True, rows=table_rows, schema=schema)
actual_rows = []
- with beam.io.BigQuerySource(query='query',
- use_standard_sql=True).reader(client) as reader:
+ with beam.io.BigQuerySource(
+ query='query', use_standard_sql=True,
+ use_dataflow_native_source=True).reader(client) as reader:
for row in reader:
actual_rows.append(row)
self.assertEqual(actual_rows, expected_rows)
@@ -518,8 +523,9 @@ class TestBigQueryReader(unittest.TestCase):
client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse(
jobComplete=True, rows=table_rows, schema=schema)
actual_rows = []
- with beam.io.BigQuerySource(query='query',
- flatten_results=False).reader(client) as reader:
+ with beam.io.BigQuerySource(
+ query='query', flatten_results=False,
+ use_dataflow_native_source=True).reader(client) as reader:
for row in reader:
actual_rows.append(row)
self.assertEqual(actual_rows, expected_rows)
@@ -532,12 +538,13 @@ class TestBigQueryReader(unittest.TestCase):
ValueError,
r'Both a BigQuery table and a query were specified\. Please specify '
r'only one of these'):
- beam.io.BigQuerySource(table='dataset.table', query='query')
+ beam.io.BigQuerySource(
+ table='dataset.table', query='query', use_dataflow_native_source=True)
def test_using_neither_query_nor_table_fails(self):
with self.assertRaisesRegex(
ValueError, r'A BigQuery table or a query must be specified'):
- beam.io.BigQuerySource()
+ beam.io.BigQuerySource(use_dataflow_native_source=True)
def test_read_from_table_as_tablerows(self):
client = mock.Mock()
@@ -550,7 +557,9 @@ class TestBigQueryReader(unittest.TestCase):
# We set the coder to TableRowJsonCoder, which is a signal that
# the caller wants to see the rows as TableRows.
with beam.io.BigQuerySource(
- 'dataset.table', coder=TableRowJsonCoder).reader(client) as reader:
+ 'dataset.table',
+ coder=TableRowJsonCoder,
+ use_dataflow_native_source=True).reader(client) as reader:
for row in reader:
actual_rows.append(row)
self.assertEqual(actual_rows, table_rows)
@@ -570,7 +579,9 @@ class TestBigQueryReader(unittest.TestCase):
jobComplete=True, rows=table_rows, schema=schema)
]
actual_rows = []
- with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
+ with beam.io.BigQuerySource(
+ 'dataset.table',
+ use_dataflow_native_source=True).reader(client) as reader:
for row in reader:
actual_rows.append(row)
self.assertEqual(actual_rows, expected_rows)
@@ -590,7 +601,9 @@ class TestBigQueryReader(unittest.TestCase):
jobComplete=True, rows=table_rows, schema=schema)
]
actual_rows = []
- with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
+ with beam.io.BigQuerySource(
+ 'dataset.table',
+ use_dataflow_native_source=True).reader(client) as reader:
for row in reader:
actual_rows.append(row)
# We return expected rows for each of the two pages of results so we
@@ -599,7 +612,8 @@ class TestBigQueryReader(unittest.TestCase):
def test_table_schema_without_project(self):
# Reader should pick executing project by default.
- source = beam.io.BigQuerySource(table='mydataset.mytable')
+ source = beam.io.BigQuerySource(
+ table='mydataset.mytable', use_dataflow_native_source=True)
options = PipelineOptions(flags=['--project', 'myproject'])
source.pipeline_options = options
reader = source.reader()
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index c23f358..5d030d0 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -890,6 +890,11 @@ class Read(ptransform.PTransform):
def expand(self, pbegin):
if isinstance(self.source, BoundedSource):
return pbegin | _SDFBoundedSourceWrapper(self.source)
+ elif isinstance(self.source, ptransform.PTransform):
+ # The Read transform can also admit a full PTransform as an input
+ # rather than an anctual source. If the input is a PTransform, then
+ # just apply it directly.
+ return pbegin.pipeline | self.source
else:
# Treat Read itself as a primitive.
return pvalue.PCollection(
@@ -917,13 +922,17 @@ class Read(ptransform.PTransform):
def to_runner_api_parameter(self, context):
# type: (PipelineContext) -> Tuple[str, beam_runner_api_pb2.ReadPayload]
- return (
- common_urns.deprecated_primitives.READ.urn,
- beam_runner_api_pb2.ReadPayload(
- source=self.source.to_runner_api(context),
- is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED
- if self.source.is_bounded() else
- beam_runner_api_pb2.IsBounded.UNBOUNDED))
+ from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+ if isinstance(self.source, (BoundedSource, dataflow_io.NativeSource)):
+ return (
+ common_urns.deprecated_primitives.READ.urn,
+ beam_runner_api_pb2.ReadPayload(
+ source=self.source.to_runner_api(context),
+ is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED
+ if self.source.is_bounded() else
+ beam_runner_api_pb2.IsBounded.UNBOUNDED))
+ elif isinstance(self.source, ptransform.PTransform):
+ return self.source.to_runner_api_parameter(context)
@staticmethod
def from_runner_api_parameter(unused_ptransform, parameter, context):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 0315af3..ac4722f 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -250,15 +250,6 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
self.assertEqual(job_dict[u'steps'][1][u'kind'], u'ParallelDo')
self.assertEqual(job_dict[u'steps'][2][u'kind'], u'ParallelDo')
- def test_bigquery_read_streaming_fail(self):
- remote_runner = DataflowRunner()
- self.default_properties.append("--streaming")
- with self.assertRaisesRegex(ValueError,
- r'source is not currently available'):
- with Pipeline(remote_runner,
- PipelineOptions(self.default_properties)) as p:
- _ = p | beam.io.Read(beam.io.BigQuerySource('some.table'))
-
def test_biqquery_read_fn_api_fail(self):
remote_runner = DataflowRunner()
for flag in ['beam_fn_api', 'use_unified_worker', 'use_runner_v2']:
@@ -269,7 +260,9 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
'apache_beam.io.gcp.bigquery.ReadFromBigQuery.*'):
with Pipeline(remote_runner,
PipelineOptions(self.default_properties)) as p:
- _ = p | beam.io.Read(beam.io.BigQuerySource('some.table'))
+ _ = p | beam.io.Read(
+ beam.io.BigQuerySource(
+ 'some.table', use_dataflow_native_source=True))
def test_remote_runner_display_data(self):
remote_runner = DataflowRunner()
@@ -323,7 +316,9 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
options=PipelineOptions(self.default_properties)) as p:
# pylint: disable=expression-not-assigned
p | beam.io.Read(
- beam.io.BigQuerySource('dataset.faketable')) | beam.GroupByKey()
+ beam.io.BigQuerySource(
+ 'dataset.faketable',
+ use_dataflow_native_source=True)) | beam.GroupByKey()
def test_group_by_key_input_visitor_with_valid_inputs(self):
p = TestPipeline()
@@ -641,7 +636,10 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
with beam.Pipeline(runner=runner,
options=PipelineOptions(self.default_properties)) as p:
# pylint: disable=expression-not-assigned
- p | beam.io.Read(beam.io.BigQuerySource('some.table', coder=BytesCoder()))
+ p | beam.io.Read(
+ beam.io.BigQuerySource(
+ 'some.table', coder=BytesCoder(),
+ use_dataflow_native_source=True))
self.expect_correct_override(runner.job, u'Read', u'ParallelRead')