You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2019/06/01 00:24:21 UTC

[beam] branch master updated: Add enable_health_checker flag for Dataflow FnAPI worker

This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 265fb6b  Add enable_health_checker flag for Dataflow FnAPI worker
     new 088c011  Merge pull request #8681 from robinyqiu/resilience
265fb6b is described below

commit 265fb6b56259cf36e775419fa0ea18f4ed370e67
Author: Yueyang Qiu <ro...@gmail.com>
AuthorDate: Fri May 24 11:34:38 2019 -0700

    Add enable_health_checker flag for Dataflow FnAPI worker
---
 .../runners/dataflow/internal/apiclient.py         | 17 +++++++----
 .../runners/dataflow/internal/apiclient_test.py    | 34 ++++++++++++++++++++--
 2 files changed, 43 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 6458f3e..d0c9c45 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -177,20 +177,27 @@ class Environment(object):
             key='major', value=to_json_value(environment_version))])
     # TODO: Use enumerated type instead of strings for job types.
     if job_type.startswith('FNAPI_'):
+      self.debug_options.experiments = self.debug_options.experiments or []
+      debug_options_experiments = self.debug_options.experiments
       runner_harness_override = (
           get_runner_harness_container_image())
-      self.debug_options.experiments = self.debug_options.experiments or []
       if runner_harness_override:
-        self.debug_options.experiments.append(
+        debug_options_experiments.append(
             'runner_harness_container_image=' + runner_harness_override)
-      # Add use_multiple_sdk_containers flag if its not already present. Do not
+      # Add use_multiple_sdk_containers flag if it's not already present. Do not
       # add the flag if 'no_use_multiple_sdk_containers' is present.
       # TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
       # till version 2.4.
-      debug_options_experiments = self.debug_options.experiments
       if ('use_multiple_sdk_containers' not in debug_options_experiments and
           'no_use_multiple_sdk_containers' not in debug_options_experiments):
-        self.debug_options.experiments.append('use_multiple_sdk_containers')
+        debug_options_experiments.append('use_multiple_sdk_containers')
+      # Add enable_health_checker flag if it's not already present. Do not
+      # add the flag if 'disable_health_checker' is present.
+      # TODO[BEAM-7466]: Cleanup enable_health_checker once Python SDK 2.13
+      # becomes unsupported.
+      if ('enable_health_checker' not in debug_options_experiments and
+          'disable_health_checker' not in debug_options_experiments):
+        debug_options_experiments.append('enable_health_checker')
     # FlexRS
     if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED':
       self.proto.flexResourceSchedulingGoal = (
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 2f65716..b67d0ce 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -479,7 +479,7 @@ class UtilTest(unittest.TestCase):
          '--experiments', 'beam_fn_api'])
     environment = apiclient.Environment(
         [], pipeline_options, 1, FAKE_PIPELINE_URL)
-    self.assertIn("use_multiple_sdk_containers", environment.proto.experiments)
+    self.assertIn('use_multiple_sdk_containers', environment.proto.experiments)
 
     pipeline_options = PipelineOptions(
         ['--project', 'test_project', '--job_name', 'test_job_name',
@@ -488,7 +488,7 @@ class UtilTest(unittest.TestCase):
          '--experiments', 'use_multiple_sdk_containers'])
     environment = apiclient.Environment(
         [], pipeline_options, 1, FAKE_PIPELINE_URL)
-    self.assertIn("use_multiple_sdk_containers", environment.proto.experiments)
+    self.assertIn('use_multiple_sdk_containers', environment.proto.experiments)
 
     pipeline_options = PipelineOptions(
         ['--project', 'test_project', '--job_name', 'test_job_name',
@@ -498,7 +498,35 @@ class UtilTest(unittest.TestCase):
     environment = apiclient.Environment(
         [], pipeline_options, 1, FAKE_PIPELINE_URL)
     self.assertNotIn(
-        "use_multiple_sdk_containers", environment.proto.experiments)
+        'use_multiple_sdk_containers', environment.proto.experiments)
+
+  def test_experiment_enable_health_checker(self):
+    pipeline_options = PipelineOptions(
+        ['--project', 'test_project', '--job_name', 'test_job_name',
+         '--temp_location', 'gs://test-location/temp',
+         '--experiments', 'beam_fn_api'])
+    environment = apiclient.Environment(
+        [], pipeline_options, 1, FAKE_PIPELINE_URL)
+    self.assertIn('enable_health_checker', environment.proto.experiments)
+
+    pipeline_options = PipelineOptions(
+        ['--project', 'test_project', '--job_name', 'test_job_name',
+         '--temp_location', 'gs://test-location/temp',
+         '--experiments', 'beam_fn_api',
+         '--experiments', 'enable_health_checker'])
+    environment = apiclient.Environment(
+        [], pipeline_options, 1, FAKE_PIPELINE_URL)
+    self.assertIn('enable_health_checker', environment.proto.experiments)
+
+    pipeline_options = PipelineOptions(
+        ['--project', 'test_project', '--job_name', 'test_job_name',
+         '--temp_location', 'gs://test-location/temp',
+         '--experiments', 'beam_fn_api',
+         '--experiments', 'disable_health_checker'])
+    environment = apiclient.Environment(
+        [], pipeline_options, 1, FAKE_PIPELINE_URL)
+    self.assertNotIn('enable_health_checker', environment.proto.experiments)
+    self.assertIn('disable_health_checker', environment.proto.experiments)
 
   @mock.patch(
       'apache_beam.runners.dataflow.internal.apiclient.sys.version_info',