You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2020/12/15 22:16:56 UTC
[beam] 01/01: Revert "Do not add unnecessary experiment
use_multiple_sdk_containers. (#13475)"
This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch revert-13475-no_knobs
in repository https://gitbox.apache.org/repos/asf/beam.git
commit e516676c416d2efc323f77adbc8e762c1ad75d6f
Author: tvalentyn <tv...@users.noreply.github.com>
AuthorDate: Tue Dec 15 14:16:19 2020 -0800
Revert "Do not add unnecessary experiment use_multiple_sdk_containers. (#13475)"
This reverts commit ba11ed7daa2cf6e7e4d1899f95460f0c829ad4f7.
---
.../runners/dataflow/internal/apiclient.py | 7 +++
.../runners/dataflow/internal/apiclient_test.py | 54 ++++++++++++++++++++++
2 files changed, 61 insertions(+)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 89da139..b9de09b 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -210,6 +210,13 @@ class Environment(object):
self.debug_options.add_experiment(
'runner_harness_container_image=' + runner_harness_override)
debug_options_experiments = self.debug_options.experiments
+ # Add use_multiple_sdk_containers flag if it's not already present. Do not
+ # add the flag if 'no_use_multiple_sdk_containers' is present.
+ # TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
+ # till version 2.4.
+ if ('use_multiple_sdk_containers' not in debug_options_experiments and
+ 'no_use_multiple_sdk_containers' not in debug_options_experiments):
+ debug_options_experiments.append('use_multiple_sdk_containers')
# FlexRS
if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED':
self.proto.flexResourceSchedulingGoal = (
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 76efd2b..34f4988 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -786,6 +786,60 @@ class UtilTest(unittest.TestCase):
self.assertEqual('key5', job.proto.labels.additionalProperties[4].key)
self.assertEqual('', job.proto.labels.additionalProperties[4].value)
+ def test_experiment_use_multiple_sdk_containers(self):
+ pipeline_options = PipelineOptions([
+ '--project',
+ 'test_project',
+ '--job_name',
+ 'test_job_name',
+ '--temp_location',
+ 'gs://test-location/temp',
+ '--experiments',
+ 'beam_fn_api'
+ ])
+ environment = apiclient.Environment([],
+ pipeline_options,
+ 1,
+ FAKE_PIPELINE_URL)
+ self.assertIn('use_multiple_sdk_containers', environment.proto.experiments)
+
+ pipeline_options = PipelineOptions([
+ '--project',
+ 'test_project',
+ '--job_name',
+ 'test_job_name',
+ '--temp_location',
+ 'gs://test-location/temp',
+ '--experiments',
+ 'beam_fn_api',
+ '--experiments',
+ 'use_multiple_sdk_containers'
+ ])
+ environment = apiclient.Environment([],
+ pipeline_options,
+ 1,
+ FAKE_PIPELINE_URL)
+ self.assertIn('use_multiple_sdk_containers', environment.proto.experiments)
+
+ pipeline_options = PipelineOptions([
+ '--project',
+ 'test_project',
+ '--job_name',
+ 'test_job_name',
+ '--temp_location',
+ 'gs://test-location/temp',
+ '--experiments',
+ 'beam_fn_api',
+ '--experiments',
+ 'no_use_multiple_sdk_containers'
+ ])
+ environment = apiclient.Environment([],
+ pipeline_options,
+ 1,
+ FAKE_PIPELINE_URL)
+ self.assertNotIn(
+ 'use_multiple_sdk_containers', environment.proto.experiments)
+
@mock.patch(
'apache_beam.runners.dataflow.internal.apiclient.sys.version_info',
(3, 8))