You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/09/23 23:29:35 UTC

[beam] branch master updated: Merge pull request #12782 from [BEAM-10950] Overriding Dataflow Native BQSource.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c0bde2b  Merge pull request #12782 from [BEAM-10950] Overriding Dataflow Native BQSource.
c0bde2b is described below

commit c0bde2b99a5f62a220bdbf97b1df24ce6990783e
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Wed Sep 23 18:29:01 2020 -0500

    Merge pull request #12782 from [BEAM-10950] Overriding Dataflow Native BQSource.
    
    * Overriding Dataflow Native BQSource.
    
    * Fixing tests
    
    * Tryin to fix to_runner_proto transform
    
    * fixup
    
    * fixup
    
    * fixup
    
    * adding comment
---
 .../apache_beam/examples/snippets/snippets_test.py |  1 +
 .../io/gcp/big_query_query_to_table_it_test.py     |  1 +
 .../io/gcp/big_query_query_to_table_pipeline.py    | 28 +++++--------
 sdks/python/apache_beam/io/gcp/bigquery.py         | 40 +++++++++++++++++-
 sdks/python/apache_beam/io/gcp/bigquery_test.py    | 48 +++++++++++++++-------
 .../apache_beam/io/gcp/bigquery_tools_test.py      | 38 +++++++++++------
 sdks/python/apache_beam/io/iobase.py               | 23 +++++++----
 .../runners/dataflow/dataflow_runner_test.py       | 22 +++++-----
 8 files changed, 137 insertions(+), 64 deletions(-)

diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 793ff05..df1d153 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -682,6 +682,7 @@ class SnippetsTest(unittest.TestCase):
         snippets.model_bigqueryio(p, project, dataset, table)
     else:
       p = TestPipeline()
+      p.options.view_as(GoogleCloudOptions).temp_location = 'gs://mylocation'
       snippets.model_bigqueryio(p)
 
   def _run_test_pipeline_for_options(self, fn):
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index c2dc3cd..c806629 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -302,6 +302,7 @@ class BigQueryQueryToTableIT(unittest.TestCase):
         'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
         'use_standard_sql': False,
         'native': True,
+        'use_json_exports': True,
         'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
         'on_success_matcher': all_of(*pipeline_verifiers),
         'experiments': 'use_legacy_bq_sink',
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
index 50cd584..8e4b0df 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
@@ -91,24 +91,16 @@ def run_bq_pipeline(argv=None):
         use_standard_sql=known_args.use_standard_sql,
         use_json_exports=known_args.use_json_exports,
         kms_key=kms_key)
-  if known_args.native:
-    _ = data | 'write' >> beam.io.Write(
-        beam.io.BigQuerySink(
-            known_args.output,
-            schema=table_schema,
-            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-            write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
-            kms_key=kms_key))
-  else:
-    temp_file_format = (
-        'NEWLINE_DELIMITED_JSON' if known_args.use_json_exports else 'AVRO')
-    _ = data | 'write' >> beam.io.WriteToBigQuery(
-        known_args.output,
-        schema=table_schema,
-        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-        write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
-        temp_file_format=temp_file_format,
-        kms_key=kms_key)
+
+  temp_file_format = (
+      'NEWLINE_DELIMITED_JSON' if known_args.use_json_exports else 'AVRO')
+  _ = data | 'write' >> beam.io.WriteToBigQuery(
+      known_args.output,
+      schema=table_schema,
+      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+      write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
+      temp_file_format=temp_file_format,
+      kms_key=kms_key)
 
   result = p.run()
   result.wait_until_finish()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 9df427d..4c42ed3 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -426,7 +426,45 @@ class BigQueryDisposition(object):
 # BigQuerySource, BigQuerySink.
 
 
-class BigQuerySource(dataflow_io.NativeSource):
+@deprecated(since='2.25.0', current="ReadFromBigQuery")
+def BigQuerySource(
+    table=None,
+    dataset=None,
+    project=None,
+    query=None,
+    validate=False,
+    coder=None,
+    use_standard_sql=False,
+    flatten_results=True,
+    kms_key=None,
+    use_dataflow_native_source=False):
+  if use_dataflow_native_source:
+    return _BigQuerySource(
+        table,
+        dataset,
+        project,
+        query,
+        validate,
+        coder,
+        use_standard_sql,
+        flatten_results,
+        kms_key)
+  else:
+    return ReadFromBigQuery(
+        table=table,
+        dataset=dataset,
+        project=project,
+        query=query,
+        validate=validate,
+        coder=coder,
+        use_standard_sql=use_standard_sql,
+        flatten_results=flatten_results,
+        use_json_exports=True,
+        kms_key=kms_key)
+
+
+@deprecated(since='2.25.0', current="ReadFromBigQuery")
+class _BigQuerySource(dataflow_io.NativeSource):
   """A source based on a BigQuery table."""
   def __init__(
       self,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 6d24f29..fa3e84b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -168,7 +168,8 @@ class TestTableRowJsonCoder(unittest.TestCase):
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestBigQuerySource(unittest.TestCase):
   def test_display_data_item_on_validate_true(self):
-    source = beam.io.BigQuerySource('dataset.table', validate=True)
+    source = beam.io.BigQuerySource(
+        'dataset.table', validate=True, use_dataflow_native_source=True)
 
     dd = DisplayData.create_from(source)
     expected_items = [
@@ -178,7 +179,8 @@ class TestBigQuerySource(unittest.TestCase):
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_table_reference_display_data(self):
-    source = beam.io.BigQuerySource('dataset.table')
+    source = beam.io.BigQuerySource(
+        'dataset.table', use_dataflow_native_source=True)
     dd = DisplayData.create_from(source)
     expected_items = [
         DisplayDataItemMatcher('validation', False),
@@ -186,7 +188,8 @@ class TestBigQuerySource(unittest.TestCase):
     ]
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
-    source = beam.io.BigQuerySource('project:dataset.table')
+    source = beam.io.BigQuerySource(
+        'project:dataset.table', use_dataflow_native_source=True)
     dd = DisplayData.create_from(source)
     expected_items = [
         DisplayDataItemMatcher('validation', False),
@@ -194,7 +197,8 @@ class TestBigQuerySource(unittest.TestCase):
     ]
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
-    source = beam.io.BigQuerySource('xyz.com:project:dataset.table')
+    source = beam.io.BigQuerySource(
+        'xyz.com:project:dataset.table', use_dataflow_native_source=True)
     dd = DisplayData.create_from(source)
     expected_items = [
         DisplayDataItemMatcher('validation', False),
@@ -203,27 +207,32 @@ class TestBigQuerySource(unittest.TestCase):
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_parse_table_reference(self):
-    source = beam.io.BigQuerySource('dataset.table')
+    source = beam.io.BigQuerySource(
+        'dataset.table', use_dataflow_native_source=True)
     self.assertEqual(source.table_reference.datasetId, 'dataset')
     self.assertEqual(source.table_reference.tableId, 'table')
 
-    source = beam.io.BigQuerySource('project:dataset.table')
+    source = beam.io.BigQuerySource(
+        'project:dataset.table', use_dataflow_native_source=True)
     self.assertEqual(source.table_reference.projectId, 'project')
     self.assertEqual(source.table_reference.datasetId, 'dataset')
     self.assertEqual(source.table_reference.tableId, 'table')
 
-    source = beam.io.BigQuerySource('xyz.com:project:dataset.table')
+    source = beam.io.BigQuerySource(
+        'xyz.com:project:dataset.table', use_dataflow_native_source=True)
     self.assertEqual(source.table_reference.projectId, 'xyz.com:project')
     self.assertEqual(source.table_reference.datasetId, 'dataset')
     self.assertEqual(source.table_reference.tableId, 'table')
 
-    source = beam.io.BigQuerySource(query='my_query')
+    source = beam.io.BigQuerySource(
+        query='my_query', use_dataflow_native_source=True)
     self.assertEqual(source.query, 'my_query')
     self.assertIsNone(source.table_reference)
     self.assertTrue(source.use_legacy_sql)
 
   def test_query_only_display_data(self):
-    source = beam.io.BigQuerySource(query='my_query')
+    source = beam.io.BigQuerySource(
+        query='my_query', use_dataflow_native_source=True)
     dd = DisplayData.create_from(source)
     expected_items = [
         DisplayDataItemMatcher('validation', False),
@@ -232,25 +241,36 @@ class TestBigQuerySource(unittest.TestCase):
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_specify_query_sql_format(self):
-    source = beam.io.BigQuerySource(query='my_query', use_standard_sql=True)
+    source = beam.io.BigQuerySource(
+        query='my_query',
+        use_standard_sql=True,
+        use_dataflow_native_source=True)
     self.assertEqual(source.query, 'my_query')
     self.assertFalse(source.use_legacy_sql)
 
   def test_specify_query_flattened_records(self):
-    source = beam.io.BigQuerySource(query='my_query', flatten_results=False)
+    source = beam.io.BigQuerySource(
+        query='my_query',
+        flatten_results=False,
+        use_dataflow_native_source=True)
     self.assertFalse(source.flatten_results)
 
   def test_specify_query_unflattened_records(self):
-    source = beam.io.BigQuerySource(query='my_query', flatten_results=True)
+    source = beam.io.BigQuerySource(
+        query='my_query', flatten_results=True, use_dataflow_native_source=True)
     self.assertTrue(source.flatten_results)
 
   def test_specify_query_without_table(self):
-    source = beam.io.BigQuerySource(query='my_query')
+    source = beam.io.BigQuerySource(
+        query='my_query', use_dataflow_native_source=True)
     self.assertEqual(source.query, 'my_query')
     self.assertIsNone(source.table_reference)
 
   def test_date_partitioned_table_name(self):
-    source = beam.io.BigQuerySource('dataset.table$20030102', validate=True)
+    source = beam.io.BigQuerySource(
+        'dataset.table$20030102',
+        validate=True,
+        use_dataflow_native_source=True)
     dd = DisplayData.create_from(source)
     expected_items = [
         DisplayDataItemMatcher('validation', True),
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index a587b7d..82036e9 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -471,7 +471,9 @@ class TestBigQueryReader(unittest.TestCase):
     client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse(
         jobComplete=True, rows=table_rows, schema=schema)
     actual_rows = []
-    with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
+    with beam.io.BigQuerySource(
+        'dataset.table',
+        use_dataflow_native_source=True).reader(client) as reader:
       for row in reader:
         actual_rows.append(row)
     self.assertEqual(actual_rows, expected_rows)
@@ -485,7 +487,9 @@ class TestBigQueryReader(unittest.TestCase):
     client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse(
         jobComplete=True, rows=table_rows, schema=schema)
     actual_rows = []
-    with beam.io.BigQuerySource(query='query').reader(client) as reader:
+    with beam.io.BigQuerySource(
+        query='query',
+        use_dataflow_native_source=True).reader(client) as reader:
       for row in reader:
         actual_rows.append(row)
     self.assertEqual(actual_rows, expected_rows)
@@ -501,8 +505,9 @@ class TestBigQueryReader(unittest.TestCase):
     client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse(
         jobComplete=True, rows=table_rows, schema=schema)
     actual_rows = []
-    with beam.io.BigQuerySource(query='query',
-                                use_standard_sql=True).reader(client) as reader:
+    with beam.io.BigQuerySource(
+        query='query', use_standard_sql=True,
+        use_dataflow_native_source=True).reader(client) as reader:
       for row in reader:
         actual_rows.append(row)
     self.assertEqual(actual_rows, expected_rows)
@@ -518,8 +523,9 @@ class TestBigQueryReader(unittest.TestCase):
     client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse(
         jobComplete=True, rows=table_rows, schema=schema)
     actual_rows = []
-    with beam.io.BigQuerySource(query='query',
-                                flatten_results=False).reader(client) as reader:
+    with beam.io.BigQuerySource(
+        query='query', flatten_results=False,
+        use_dataflow_native_source=True).reader(client) as reader:
       for row in reader:
         actual_rows.append(row)
     self.assertEqual(actual_rows, expected_rows)
@@ -532,12 +538,13 @@ class TestBigQueryReader(unittest.TestCase):
         ValueError,
         r'Both a BigQuery table and a query were specified\. Please specify '
         r'only one of these'):
-      beam.io.BigQuerySource(table='dataset.table', query='query')
+      beam.io.BigQuerySource(
+          table='dataset.table', query='query', use_dataflow_native_source=True)
 
   def test_using_neither_query_nor_table_fails(self):
     with self.assertRaisesRegex(
         ValueError, r'A BigQuery table or a query must be specified'):
-      beam.io.BigQuerySource()
+      beam.io.BigQuerySource(use_dataflow_native_source=True)
 
   def test_read_from_table_as_tablerows(self):
     client = mock.Mock()
@@ -550,7 +557,9 @@ class TestBigQueryReader(unittest.TestCase):
     # We set the coder to TableRowJsonCoder, which is a signal that
     # the caller wants to see the rows as TableRows.
     with beam.io.BigQuerySource(
-        'dataset.table', coder=TableRowJsonCoder).reader(client) as reader:
+        'dataset.table',
+        coder=TableRowJsonCoder,
+        use_dataflow_native_source=True).reader(client) as reader:
       for row in reader:
         actual_rows.append(row)
     self.assertEqual(actual_rows, table_rows)
@@ -570,7 +579,9 @@ class TestBigQueryReader(unittest.TestCase):
             jobComplete=True, rows=table_rows, schema=schema)
     ]
     actual_rows = []
-    with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
+    with beam.io.BigQuerySource(
+        'dataset.table',
+        use_dataflow_native_source=True).reader(client) as reader:
       for row in reader:
         actual_rows.append(row)
     self.assertEqual(actual_rows, expected_rows)
@@ -590,7 +601,9 @@ class TestBigQueryReader(unittest.TestCase):
             jobComplete=True, rows=table_rows, schema=schema)
     ]
     actual_rows = []
-    with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
+    with beam.io.BigQuerySource(
+        'dataset.table',
+        use_dataflow_native_source=True).reader(client) as reader:
       for row in reader:
         actual_rows.append(row)
     # We return expected rows for each of the two pages of results so we
@@ -599,7 +612,8 @@ class TestBigQueryReader(unittest.TestCase):
 
   def test_table_schema_without_project(self):
     # Reader should pick executing project by default.
-    source = beam.io.BigQuerySource(table='mydataset.mytable')
+    source = beam.io.BigQuerySource(
+        table='mydataset.mytable', use_dataflow_native_source=True)
     options = PipelineOptions(flags=['--project', 'myproject'])
     source.pipeline_options = options
     reader = source.reader()
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index c23f358..5d030d0 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -890,6 +890,11 @@ class Read(ptransform.PTransform):
   def expand(self, pbegin):
     if isinstance(self.source, BoundedSource):
       return pbegin | _SDFBoundedSourceWrapper(self.source)
+    elif isinstance(self.source, ptransform.PTransform):
+      # The Read transform can also admit a full PTransform as an input
+      # rather than an anctual source. If the input is a PTransform, then
+      # just apply it directly.
+      return pbegin.pipeline | self.source
     else:
       # Treat Read itself as a primitive.
       return pvalue.PCollection(
@@ -917,13 +922,17 @@ class Read(ptransform.PTransform):
 
   def to_runner_api_parameter(self, context):
     # type: (PipelineContext) -> Tuple[str, beam_runner_api_pb2.ReadPayload]
-    return (
-        common_urns.deprecated_primitives.READ.urn,
-        beam_runner_api_pb2.ReadPayload(
-            source=self.source.to_runner_api(context),
-            is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED
-            if self.source.is_bounded() else
-            beam_runner_api_pb2.IsBounded.UNBOUNDED))
+    from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+    if isinstance(self.source, (BoundedSource, dataflow_io.NativeSource)):
+      return (
+          common_urns.deprecated_primitives.READ.urn,
+          beam_runner_api_pb2.ReadPayload(
+              source=self.source.to_runner_api(context),
+              is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED
+              if self.source.is_bounded() else
+              beam_runner_api_pb2.IsBounded.UNBOUNDED))
+    elif isinstance(self.source, ptransform.PTransform):
+      return self.source.to_runner_api_parameter(context)
 
   @staticmethod
   def from_runner_api_parameter(unused_ptransform, parameter, context):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 0315af3..ac4722f 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -250,15 +250,6 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
     self.assertEqual(job_dict[u'steps'][1][u'kind'], u'ParallelDo')
     self.assertEqual(job_dict[u'steps'][2][u'kind'], u'ParallelDo')
 
-  def test_bigquery_read_streaming_fail(self):
-    remote_runner = DataflowRunner()
-    self.default_properties.append("--streaming")
-    with self.assertRaisesRegex(ValueError,
-                                r'source is not currently available'):
-      with Pipeline(remote_runner,
-                    PipelineOptions(self.default_properties)) as p:
-        _ = p | beam.io.Read(beam.io.BigQuerySource('some.table'))
-
   def test_biqquery_read_fn_api_fail(self):
     remote_runner = DataflowRunner()
     for flag in ['beam_fn_api', 'use_unified_worker', 'use_runner_v2']:
@@ -269,7 +260,9 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
           'apache_beam.io.gcp.bigquery.ReadFromBigQuery.*'):
         with Pipeline(remote_runner,
                       PipelineOptions(self.default_properties)) as p:
-          _ = p | beam.io.Read(beam.io.BigQuerySource('some.table'))
+          _ = p | beam.io.Read(
+              beam.io.BigQuerySource(
+                  'some.table', use_dataflow_native_source=True))
 
   def test_remote_runner_display_data(self):
     remote_runner = DataflowRunner()
@@ -323,7 +316,9 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
                          options=PipelineOptions(self.default_properties)) as p:
         # pylint: disable=expression-not-assigned
         p | beam.io.Read(
-            beam.io.BigQuerySource('dataset.faketable')) | beam.GroupByKey()
+            beam.io.BigQuerySource(
+                'dataset.faketable',
+                use_dataflow_native_source=True)) | beam.GroupByKey()
 
   def test_group_by_key_input_visitor_with_valid_inputs(self):
     p = TestPipeline()
@@ -641,7 +636,10 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
     with beam.Pipeline(runner=runner,
                        options=PipelineOptions(self.default_properties)) as p:
       # pylint: disable=expression-not-assigned
-      p | beam.io.Read(beam.io.BigQuerySource('some.table', coder=BytesCoder()))
+      p | beam.io.Read(
+          beam.io.BigQuerySource(
+              'some.table', coder=BytesCoder(),
+              use_dataflow_native_source=True))
 
     self.expect_correct_override(runner.job, u'Read', u'ParallelRead')