You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2019/06/01 00:24:21 UTC
[beam] branch master updated: Add enable_health_checker flag for
Dataflow FnAPI worker
This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 265fb6b Add enable_health_checker flag for Dataflow FnAPI worker
new 088c011 Merge pull request #8681 from robinyqiu/resilience
265fb6b is described below
commit 265fb6b56259cf36e775419fa0ea18f4ed370e67
Author: Yueyang Qiu <ro...@gmail.com>
AuthorDate: Fri May 24 11:34:38 2019 -0700
Add enable_health_checker flag for Dataflow FnAPI worker
---
.../runners/dataflow/internal/apiclient.py | 17 +++++++----
.../runners/dataflow/internal/apiclient_test.py | 34 ++++++++++++++++++++--
2 files changed, 43 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 6458f3e..d0c9c45 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -177,20 +177,27 @@ class Environment(object):
key='major', value=to_json_value(environment_version))])
# TODO: Use enumerated type instead of strings for job types.
if job_type.startswith('FNAPI_'):
+ self.debug_options.experiments = self.debug_options.experiments or []
+ debug_options_experiments = self.debug_options.experiments
runner_harness_override = (
get_runner_harness_container_image())
- self.debug_options.experiments = self.debug_options.experiments or []
if runner_harness_override:
- self.debug_options.experiments.append(
+ debug_options_experiments.append(
'runner_harness_container_image=' + runner_harness_override)
- # Add use_multiple_sdk_containers flag if its not already present. Do not
+ # Add use_multiple_sdk_containers flag if it's not already present. Do not
# add the flag if 'no_use_multiple_sdk_containers' is present.
# TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
# till version 2.4.
- debug_options_experiments = self.debug_options.experiments
if ('use_multiple_sdk_containers' not in debug_options_experiments and
'no_use_multiple_sdk_containers' not in debug_options_experiments):
- self.debug_options.experiments.append('use_multiple_sdk_containers')
+ debug_options_experiments.append('use_multiple_sdk_containers')
+ # Add enable_health_checker flag if it's not already present. Do not
+ # add the flag if 'disable_health_checker' is present.
+ # TODO[BEAM-7466]: Cleanup enable_health_checker once Python SDK 2.13
+ # becomes unsupported.
+ if ('enable_health_checker' not in debug_options_experiments and
+ 'disable_health_checker' not in debug_options_experiments):
+ debug_options_experiments.append('enable_health_checker')
# FlexRS
if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED':
self.proto.flexResourceSchedulingGoal = (
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 2f65716..b67d0ce 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -479,7 +479,7 @@ class UtilTest(unittest.TestCase):
'--experiments', 'beam_fn_api'])
environment = apiclient.Environment(
[], pipeline_options, 1, FAKE_PIPELINE_URL)
- self.assertIn("use_multiple_sdk_containers", environment.proto.experiments)
+ self.assertIn('use_multiple_sdk_containers', environment.proto.experiments)
pipeline_options = PipelineOptions(
['--project', 'test_project', '--job_name', 'test_job_name',
@@ -488,7 +488,7 @@ class UtilTest(unittest.TestCase):
'--experiments', 'use_multiple_sdk_containers'])
environment = apiclient.Environment(
[], pipeline_options, 1, FAKE_PIPELINE_URL)
- self.assertIn("use_multiple_sdk_containers", environment.proto.experiments)
+ self.assertIn('use_multiple_sdk_containers', environment.proto.experiments)
pipeline_options = PipelineOptions(
['--project', 'test_project', '--job_name', 'test_job_name',
@@ -498,7 +498,35 @@ class UtilTest(unittest.TestCase):
environment = apiclient.Environment(
[], pipeline_options, 1, FAKE_PIPELINE_URL)
self.assertNotIn(
- "use_multiple_sdk_containers", environment.proto.experiments)
+ 'use_multiple_sdk_containers', environment.proto.experiments)
+
+ def test_experiment_enable_health_checker(self):
+ pipeline_options = PipelineOptions(
+ ['--project', 'test_project', '--job_name', 'test_job_name',
+ '--temp_location', 'gs://test-location/temp',
+ '--experiments', 'beam_fn_api'])
+ environment = apiclient.Environment(
+ [], pipeline_options, 1, FAKE_PIPELINE_URL)
+ self.assertIn('enable_health_checker', environment.proto.experiments)
+
+ pipeline_options = PipelineOptions(
+ ['--project', 'test_project', '--job_name', 'test_job_name',
+ '--temp_location', 'gs://test-location/temp',
+ '--experiments', 'beam_fn_api',
+ '--experiments', 'enable_health_checker'])
+ environment = apiclient.Environment(
+ [], pipeline_options, 1, FAKE_PIPELINE_URL)
+ self.assertIn('enable_health_checker', environment.proto.experiments)
+
+ pipeline_options = PipelineOptions(
+ ['--project', 'test_project', '--job_name', 'test_job_name',
+ '--temp_location', 'gs://test-location/temp',
+ '--experiments', 'beam_fn_api',
+ '--experiments', 'disable_health_checker'])
+ environment = apiclient.Environment(
+ [], pipeline_options, 1, FAKE_PIPELINE_URL)
+ self.assertNotIn('enable_health_checker', environment.proto.experiments)
+ self.assertIn('disable_health_checker', environment.proto.experiments)
@mock.patch(
'apache_beam.runners.dataflow.internal.apiclient.sys.version_info',