You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2021/10/07 20:28:01 UTC

[beam] 01/01: Revert "Restore "Default to Runner v2 for Python Streaming jobs. (#15140)""

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch revert-15291-streaming-v2
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0e2a8f690ec15f95732f8c62e59a45baf265b07d
Author: Chamikara Jayalath <ch...@google.com>
AuthorDate: Thu Oct 7 13:26:54 2021 -0700

    Revert "Restore "Default to Runner v2 for Python Streaming jobs. (#15140)""
---
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py   |  9 +--------
 .../apache_beam/runners/dataflow/dataflow_runner_test.py      | 11 ++---------
 2 files changed, 3 insertions(+), 17 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index e78b973..1e2d1ec 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -594,16 +594,9 @@ class DataflowRunner(PipelineRunner):
     return result
 
   def _maybe_add_unified_worker_missing_options(self, options):
-    debug_options = options.view_as(DebugOptions)
-    # Streaming is always portable, default to runner v2.
-    if (options.view_as(StandardOptions).streaming and
-        not options.view_as(GoogleCloudOptions).dataflow_kms_key):
-      if not debug_options.lookup_experiment('disable_runner_v2'):
-        debug_options.add_experiment('beam_fn_api')
-        debug_options.add_experiment('use_runner_v2')
-        debug_options.add_experiment('use_portable_job_submission')
     # set default beam_fn_api experiment if use unified
     # worker experiment flag exists, no-op otherwise.
+    debug_options = options.view_as(DebugOptions)
     from apache_beam.runners.dataflow.internal import apiclient
     if apiclient._use_unified_worker(options):
       if not debug_options.lookup_experiment('beam_fn_api'):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index d5b60c7..0f974a0 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -256,7 +256,6 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
   def test_streaming_create_translation(self):
     remote_runner = DataflowRunner()
     self.default_properties.append("--streaming")
-    self.default_properties.append("--experiments=disable_runner_v2")
     with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p:
       p | ptransform.Create([1])  # pylint: disable=expression-not-assigned
     job_dict = json.loads(str(remote_runner.job))
@@ -840,8 +839,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
         'Runner determined sharding not available in Dataflow for '
         'GroupIntoBatches for jobs not using Runner V2'):
       _ = self._run_group_into_batches_and_get_step_properties(
-          True,
-          ['--enable_streaming_engine', '--experiments=disable_runner_v2'])
+          True, ['--enable_streaming_engine'])
 
     # JRH
     with self.assertRaisesRegex(
@@ -849,12 +847,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
         'Runner determined sharding not available in Dataflow for '
         'GroupIntoBatches for jobs not using Runner V2'):
       _ = self._run_group_into_batches_and_get_step_properties(
-          True,
-          [
-              '--enable_streaming_engine',
-              '--experiments=beam_fn_api',
-              '--experiments=disable_runner_v2'
-          ])
+          True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])
 
   def test_pack_combiners(self):
     class PackableCombines(beam.PTransform):