You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2021/10/07 20:28:00 UTC

[beam] branch revert-15291-streaming-v2 created (now 0e2a8f6)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a change to branch revert-15291-streaming-v2
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 0e2a8f6  Revert "Restore "Default to Runner v2 for Python Streaming jobs. (#15140)""

This branch includes the following new commits:

     new 0e2a8f6  Revert "Restore "Default to Runner v2 for Python Streaming jobs. (#15140)""

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[beam] 01/01: Revert "Restore "Default to Runner v2 for Python Streaming jobs. (#15140)""

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch revert-15291-streaming-v2
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0e2a8f690ec15f95732f8c62e59a45baf265b07d
Author: Chamikara Jayalath <ch...@google.com>
AuthorDate: Thu Oct 7 13:26:54 2021 -0700

    Revert "Restore "Default to Runner v2 for Python Streaming jobs. (#15140)""
---
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py   |  9 +--------
 .../apache_beam/runners/dataflow/dataflow_runner_test.py      | 11 ++---------
 2 files changed, 3 insertions(+), 17 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index e78b973..1e2d1ec 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -594,16 +594,9 @@ class DataflowRunner(PipelineRunner):
     return result
 
   def _maybe_add_unified_worker_missing_options(self, options):
-    debug_options = options.view_as(DebugOptions)
-    # Streaming is always portable, default to runner v2.
-    if (options.view_as(StandardOptions).streaming and
-        not options.view_as(GoogleCloudOptions).dataflow_kms_key):
-      if not debug_options.lookup_experiment('disable_runner_v2'):
-        debug_options.add_experiment('beam_fn_api')
-        debug_options.add_experiment('use_runner_v2')
-        debug_options.add_experiment('use_portable_job_submission')
     # set default beam_fn_api experiment if use unified
     # worker experiment flag exists, no-op otherwise.
+    debug_options = options.view_as(DebugOptions)
     from apache_beam.runners.dataflow.internal import apiclient
     if apiclient._use_unified_worker(options):
       if not debug_options.lookup_experiment('beam_fn_api'):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index d5b60c7..0f974a0 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -256,7 +256,6 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
   def test_streaming_create_translation(self):
     remote_runner = DataflowRunner()
     self.default_properties.append("--streaming")
-    self.default_properties.append("--experiments=disable_runner_v2")
     with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p:
       p | ptransform.Create([1])  # pylint: disable=expression-not-assigned
     job_dict = json.loads(str(remote_runner.job))
@@ -840,8 +839,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
         'Runner determined sharding not available in Dataflow for '
         'GroupIntoBatches for jobs not using Runner V2'):
       _ = self._run_group_into_batches_and_get_step_properties(
-          True,
-          ['--enable_streaming_engine', '--experiments=disable_runner_v2'])
+          True, ['--enable_streaming_engine'])
 
     # JRH
     with self.assertRaisesRegex(
@@ -849,12 +847,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
         'Runner determined sharding not available in Dataflow for '
         'GroupIntoBatches for jobs not using Runner V2'):
       _ = self._run_group_into_batches_and_get_step_properties(
-          True,
-          [
-              '--enable_streaming_engine',
-              '--experiments=beam_fn_api',
-              '--experiments=disable_runner_v2'
-          ])
+          True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])
 
   def test_pack_combiners(self):
     class PackableCombines(beam.PTransform):