You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2020/12/15 22:16:56 UTC

[beam] 01/01: Revert "Do not add unnecessary experiment use_multiple_sdk_containers. (#13475)"

This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch revert-13475-no_knobs
in repository https://gitbox.apache.org/repos/asf/beam.git

commit e516676c416d2efc323f77adbc8e762c1ad75d6f
Author: tvalentyn <tv...@users.noreply.github.com>
AuthorDate: Tue Dec 15 14:16:19 2020 -0800

    Revert "Do not add unnecessary experiment use_multiple_sdk_containers. (#13475)"
    
    This reverts commit ba11ed7daa2cf6e7e4d1899f95460f0c829ad4f7.
---
 .../runners/dataflow/internal/apiclient.py         |  7 +++
 .../runners/dataflow/internal/apiclient_test.py    | 54 ++++++++++++++++++++++
 2 files changed, 61 insertions(+)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 89da139..b9de09b 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -210,6 +210,13 @@ class Environment(object):
           self.debug_options.add_experiment(
               'runner_harness_container_image=' + runner_harness_override)
       debug_options_experiments = self.debug_options.experiments
+      # Add use_multiple_sdk_containers flag if it's not already present. Do not
+      # add the flag if 'no_use_multiple_sdk_containers' is present.
+      # TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
+      # till version 2.4.
+      if ('use_multiple_sdk_containers' not in debug_options_experiments and
+          'no_use_multiple_sdk_containers' not in debug_options_experiments):
+        debug_options_experiments.append('use_multiple_sdk_containers')
     # FlexRS
     if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED':
       self.proto.flexResourceSchedulingGoal = (
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 76efd2b..34f4988 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -786,6 +786,60 @@ class UtilTest(unittest.TestCase):
     self.assertEqual('key5', job.proto.labels.additionalProperties[4].key)
     self.assertEqual('', job.proto.labels.additionalProperties[4].value)
 
+  def test_experiment_use_multiple_sdk_containers(self):
+    pipeline_options = PipelineOptions([
+        '--project',
+        'test_project',
+        '--job_name',
+        'test_job_name',
+        '--temp_location',
+        'gs://test-location/temp',
+        '--experiments',
+        'beam_fn_api'
+    ])
+    environment = apiclient.Environment([],
+                                        pipeline_options,
+                                        1,
+                                        FAKE_PIPELINE_URL)
+    self.assertIn('use_multiple_sdk_containers', environment.proto.experiments)
+
+    pipeline_options = PipelineOptions([
+        '--project',
+        'test_project',
+        '--job_name',
+        'test_job_name',
+        '--temp_location',
+        'gs://test-location/temp',
+        '--experiments',
+        'beam_fn_api',
+        '--experiments',
+        'use_multiple_sdk_containers'
+    ])
+    environment = apiclient.Environment([],
+                                        pipeline_options,
+                                        1,
+                                        FAKE_PIPELINE_URL)
+    self.assertIn('use_multiple_sdk_containers', environment.proto.experiments)
+
+    pipeline_options = PipelineOptions([
+        '--project',
+        'test_project',
+        '--job_name',
+        'test_job_name',
+        '--temp_location',
+        'gs://test-location/temp',
+        '--experiments',
+        'beam_fn_api',
+        '--experiments',
+        'no_use_multiple_sdk_containers'
+    ])
+    environment = apiclient.Environment([],
+                                        pipeline_options,
+                                        1,
+                                        FAKE_PIPELINE_URL)
+    self.assertNotIn(
+        'use_multiple_sdk_containers', environment.proto.experiments)
+
   @mock.patch(
       'apache_beam.runners.dataflow.internal.apiclient.sys.version_info',
       (3, 8))