You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/04/23 01:15:40 UTC

[beam] branch release-2.21.0 updated: Merge pull request #11466 from [BEAM-9787] Clear error message on UW + BQSource

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.21.0 by this push:
     new ab8d235  Merge pull request #11466 from [BEAM-9787] Clear error message on UW + BQSource
     new db44c4e  Merge pull request #11481 from pabloem/cp-BEAM9787
ab8d235 is described below

commit ab8d23530ec9874c0711143147c998247f8f5e5b
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Tue Apr 21 11:06:59 2020 -0700

    Merge pull request #11466 from [BEAM-9787] Clear error message on UW + BQSource
    
    * Clear error message on UW + BQSource
    
    * address comments
    
    * fix lint, formatter
---
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py  | 11 +++++++++++
 .../apache_beam/runners/dataflow/dataflow_runner_test.py     | 12 ++++++++++++
 2 files changed, 23 insertions(+)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 5091d55..d0aaa8f 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -90,6 +90,11 @@ __all__ = ['DataflowRunner']
 
 _LOGGER = logging.getLogger(__name__)
 
+BQ_SOURCE_UW_ERROR = (
+    'The Read(BigQuerySource(...)) transform is not supported with newer stack '
+    'features (Fn API, Dataflow Runner V2, etc). Please use the transform '
+    'apache_beam.io.gcp.bigquery.ReadFromBigQuery instead.')
+
 
 class DataflowRunner(PipelineRunner):
   """A runner that creates job graphs and submits them for remote execution.
@@ -1177,6 +1182,12 @@ class DataflowRunner(PipelineRunner):
         raise ValueError(
             'BigQuery source is not currently available for use '
             'in streaming pipelines.')
+      debug_options = options.view_as(DebugOptions)
+      use_fn_api = (
+          debug_options.experiments and
+          'beam_fn_api' in debug_options.experiments)
+      if use_fn_api:
+        raise ValueError(BQ_SOURCE_UW_ERROR)
       step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_AVRO')
       # TODO(silviuc): Add table validation if transform.source.validate.
       if transform.source.table_reference is not None:
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index c4f065c..c939274 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -258,6 +258,18 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
                     PipelineOptions(self.default_properties)) as p:
         _ = p | beam.io.Read(beam.io.BigQuerySource('some.table'))
 
+  def test_biqquery_read_fn_api_fail(self):
+    remote_runner = DataflowRunner()
+    for flag in ['beam_fn_api', 'use_unified_worker', 'use_runner_v2']:
+      self.default_properties.append("--experiments=%s" % flag)
+      with self.assertRaisesRegex(
+          ValueError,
+          'The Read.BigQuerySource.*is not supported.*'
+          'apache_beam.io.gcp.bigquery.ReadFromBigQuery.*'):
+        with Pipeline(remote_runner,
+                      PipelineOptions(self.default_properties)) as p:
+          _ = p | beam.io.Read(beam.io.BigQuerySource('some.table'))
+
   def test_remote_runner_display_data(self):
     remote_runner = DataflowRunner()
     p = Pipeline(