You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2019/10/31 23:55:10 UTC

[beam] branch master updated: [BEAM-8252] Python: add worker_region and worker_zone options

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 791c1b6  [BEAM-8252] Python: add worker_region and worker_zone options
     new 8370811  Merge pull request #9594 from ibzib/py-worker-region-zone
791c1b6 is described below

commit 791c1b6eb033426396d03a8ba9c6875814a6546e
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Mon Sep 16 18:01:06 2019 -0700

    [BEAM-8252] Python: add worker_region and worker_zone options
---
 .../python/apache_beam/options/pipeline_options.py | 20 +++++++++
 .../options/pipeline_options_validator.py          | 17 ++++++++
 .../options/pipeline_options_validator_test.py     | 50 ++++++++++++++++++++++
 3 files changed, 87 insertions(+)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 658978f..7d9c917 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -602,6 +602,25 @@ class WorkerOptions(PipelineOptions):
         default=None,
         help=('Specifies what type of persistent disk should be used.'))
     parser.add_argument(
+        '--worker_region',
+        default=None,
+        help=
+        ('The Compute Engine region '
+         '(https://cloud.google.com/compute/docs/regions-zones/regions-zones) '
+         'in which worker processing should occur, e.g. "us-west1". Mutually '
+         'exclusive with worker_zone. If neither worker_region nor worker_zone '
+         'is specified, default to same value as --region.'))
+    parser.add_argument(
+        '--worker_zone',
+        default=None,
+        help=
+        ('The Compute Engine zone '
+         '(https://cloud.google.com/compute/docs/regions-zones/regions-zones) '
+         'in which worker processing should occur, e.g. "us-west1-a". Mutually '
+         'exclusive with worker_region. If neither worker_region nor '
+         'worker_zone is specified, the Dataflow service will choose a zone in '
+         '--region based on available capacity.'))
+    parser.add_argument(
         '--zone',
         default=None,
         help=(
@@ -660,6 +679,7 @@ class WorkerOptions(PipelineOptions):
     if validator.is_service_runner():
       errors.extend(
           validator.validate_optional_argument_positive(self, 'num_workers'))
+      errors.extend(validator.validate_worker_region_zone(self))
     return errors
 
 
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator.py b/sdks/python/apache_beam/options/pipeline_options_validator.py
index 8f7c946..33c35b3 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator.py
@@ -191,6 +191,23 @@ class PipelineOptionsValidator(object):
           break
     return errors
 
+  def validate_worker_region_zone(self, view):
+    """Validates Dataflow worker region and zone arguments are consistent."""
+    errors = []
+    if view.zone and (view.worker_region or view.worker_zone):
+      errors.extend(self._validate_error(
+          'Cannot use deprecated flag --zone along with worker_region or '
+          'worker_zone.'))
+    if self.options.view_as(DebugOptions).lookup_experiment('worker_region')\
+        and (view.worker_region or view.worker_zone):
+      errors.extend(self._validate_error(
+          'Cannot use deprecated experiment worker_region along with '
+          'worker_region or worker_zone.'))
+    if view.worker_region and view.worker_zone:
+      errors.extend(self._validate_error(
+          'worker_region and worker_zone are mutually exclusive.'))
+    return errors
+
   def validate_optional_argument_positive(self, view, arg_name):
     """Validates that an optional argument (if set) has a positive value."""
     arg = getattr(view, arg_name, None)
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
index 7010716..f380f9e 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
@@ -307,6 +307,56 @@ class SetupTest(unittest.TestCase):
     errors = validator.validate()
     self.assertFalse(errors)
 
+  def test_zone_and_worker_region_mutually_exclusive(self):
+    runner = MockRunners.DataflowRunner()
+    options = PipelineOptions([
+        '--zone', 'us-east1-b',
+        '--worker_region', 'us-east1',
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = validator.validate()
+    self.assertTrue(errors)
+
+  def test_zone_and_worker_zone_mutually_exclusive(self):
+    runner = MockRunners.DataflowRunner()
+    options = PipelineOptions([
+        '--zone', 'us-east1-b',
+        '--worker_zone', 'us-east1-c',
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = validator.validate()
+    self.assertTrue(errors)
+
+  def test_experiment_region_and_worker_region_mutually_exclusive(self):
+    runner = MockRunners.DataflowRunner()
+    options = PipelineOptions([
+        '--experiments', 'worker_region=us-west1',
+        '--worker_region', 'us-east1',
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = validator.validate()
+    self.assertTrue(errors)
+
+  def test_experiment_region_and_worker_zone_mutually_exclusive(self):
+    runner = MockRunners.DataflowRunner()
+    options = PipelineOptions([
+        '--experiments', 'worker_region=us-west1',
+        '--worker_zone', 'us-east1-b',
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = validator.validate()
+    self.assertTrue(errors)
+
+  def test_worker_region_and_worker_zone_mutually_exclusive(self):
+    runner = MockRunners.DataflowRunner()
+    options = PipelineOptions([
+        '--worker_region', 'us-east1',
+        '--worker_zone', 'us-east1-b',
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = validator.validate()
+    self.assertTrue(errors)
+
   def test_test_matcher(self):
     def get_validator(matcher):
       options = ['--project=example:example',