You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/04/23 01:15:40 UTC
[beam] branch release-2.21.0 updated: Merge pull request #11466
from [BEAM-9787] Clear error message on UW + BQSource
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.21.0 by this push:
new ab8d235 Merge pull request #11466 from [BEAM-9787] Clear error message on UW + BQSource
new db44c4e Merge pull request #11481 from pabloem/cp-BEAM9787
ab8d235 is described below
commit ab8d23530ec9874c0711143147c998247f8f5e5b
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Tue Apr 21 11:06:59 2020 -0700
Merge pull request #11466 from [BEAM-9787] Clear error message on UW + BQSource
* Clear error message on UW + BQSource
* address comments
* fix lint, formatter
---
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 11 +++++++++++
.../apache_beam/runners/dataflow/dataflow_runner_test.py | 12 ++++++++++++
2 files changed, 23 insertions(+)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 5091d55..d0aaa8f 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -90,6 +90,11 @@ __all__ = ['DataflowRunner']
_LOGGER = logging.getLogger(__name__)
+BQ_SOURCE_UW_ERROR = (
+ 'The Read(BigQuerySource(...)) transform is not supported with newer stack '
+ 'features (Fn API, Dataflow Runner V2, etc). Please use the transform '
+ 'apache_beam.io.gcp.bigquery.ReadFromBigQuery instead.')
+
class DataflowRunner(PipelineRunner):
"""A runner that creates job graphs and submits them for remote execution.
@@ -1177,6 +1182,12 @@ class DataflowRunner(PipelineRunner):
raise ValueError(
'BigQuery source is not currently available for use '
'in streaming pipelines.')
+ debug_options = options.view_as(DebugOptions)
+ use_fn_api = (
+ debug_options.experiments and
+ 'beam_fn_api' in debug_options.experiments)
+ if use_fn_api:
+ raise ValueError(BQ_SOURCE_UW_ERROR)
step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_AVRO')
# TODO(silviuc): Add table validation if transform.source.validate.
if transform.source.table_reference is not None:
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index c4f065c..c939274 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -258,6 +258,18 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
PipelineOptions(self.default_properties)) as p:
_ = p | beam.io.Read(beam.io.BigQuerySource('some.table'))
+ def test_biqquery_read_fn_api_fail(self):
+ remote_runner = DataflowRunner()
+ for flag in ['beam_fn_api', 'use_unified_worker', 'use_runner_v2']:
+ self.default_properties.append("--experiments=%s" % flag)
+ with self.assertRaisesRegex(
+ ValueError,
+ 'The Read.BigQuerySource.*is not supported.*'
+ 'apache_beam.io.gcp.bigquery.ReadFromBigQuery.*'):
+ with Pipeline(remote_runner,
+ PipelineOptions(self.default_properties)) as p:
+ _ = p | beam.io.Read(beam.io.BigQuerySource('some.table'))
+
def test_remote_runner_display_data(self):
remote_runner = DataflowRunner()
p = Pipeline(