You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2021/10/07 20:28:01 UTC
[beam] 01/01: Revert "Restore "Default to Runner v2 for Python
Streaming jobs. (#15140)""
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch revert-15291-streaming-v2
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 0e2a8f690ec15f95732f8c62e59a45baf265b07d
Author: Chamikara Jayalath <ch...@google.com>
AuthorDate: Thu Oct 7 13:26:54 2021 -0700
Revert "Restore "Default to Runner v2 for Python Streaming jobs. (#15140)""
---
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 9 +--------
.../apache_beam/runners/dataflow/dataflow_runner_test.py | 11 ++---------
2 files changed, 3 insertions(+), 17 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index e78b973..1e2d1ec 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -594,16 +594,9 @@ class DataflowRunner(PipelineRunner):
return result
def _maybe_add_unified_worker_missing_options(self, options):
- debug_options = options.view_as(DebugOptions)
- # Streaming is always portable, default to runner v2.
- if (options.view_as(StandardOptions).streaming and
- not options.view_as(GoogleCloudOptions).dataflow_kms_key):
- if not debug_options.lookup_experiment('disable_runner_v2'):
- debug_options.add_experiment('beam_fn_api')
- debug_options.add_experiment('use_runner_v2')
- debug_options.add_experiment('use_portable_job_submission')
# set default beam_fn_api experiment if use unified
# worker experiment flag exists, no-op otherwise.
+ debug_options = options.view_as(DebugOptions)
from apache_beam.runners.dataflow.internal import apiclient
if apiclient._use_unified_worker(options):
if not debug_options.lookup_experiment('beam_fn_api'):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index d5b60c7..0f974a0 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -256,7 +256,6 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
def test_streaming_create_translation(self):
remote_runner = DataflowRunner()
self.default_properties.append("--streaming")
- self.default_properties.append("--experiments=disable_runner_v2")
with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p:
p | ptransform.Create([1]) # pylint: disable=expression-not-assigned
job_dict = json.loads(str(remote_runner.job))
@@ -840,8 +839,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for jobs not using Runner V2'):
_ = self._run_group_into_batches_and_get_step_properties(
- True,
- ['--enable_streaming_engine', '--experiments=disable_runner_v2'])
+ True, ['--enable_streaming_engine'])
# JRH
with self.assertRaisesRegex(
@@ -849,12 +847,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for jobs not using Runner V2'):
_ = self._run_group_into_batches_and_get_step_properties(
- True,
- [
- '--enable_streaming_engine',
- '--experiments=beam_fn_api',
- '--experiments=disable_runner_v2'
- ])
+ True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])
def test_pack_combiners(self):
class PackableCombines(beam.PTransform):