You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/04/08 23:26:58 UTC
[beam] branch master updated: [BEAM-9716] Alias zone to worker_zone
and warn user.
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 82689f1 [BEAM-9716] Alias zone to worker_zone and warn user.
new b56740f Merge pull request #11332 from ibzib/beam-9716
82689f1 is described below
commit 82689f143af36c235054762ae455f7e3470e57ee
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Tue Apr 7 10:11:48 2020 -0400
[BEAM-9716] Alias zone to worker_zone and warn user.
Setting the zone of the worker pools is now handled entirely by the Dataflow service.
---
.../apache/beam/runners/dataflow/DataflowRunner.java | 6 ++++++
.../dataflow/DataflowPipelineTranslatorTest.java | 18 ------------------
.../beam/runners/dataflow/DataflowRunnerTest.java | 9 +++++++++
.../beam/sdk/extensions/gcp/options/GcpOptions.java | 8 +++++++-
sdks/python/apache_beam/options/pipeline_options.py | 3 ++-
.../apache_beam/options/pipeline_options_validator.py | 5 +++++
.../options/pipeline_options_validator_test.py | 14 ++++++++++++++
7 files changed, 43 insertions(+), 20 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index f82db5a..4179db5 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -380,6 +380,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
Preconditions.checkArgument(
!hasExperimentWorkerRegion || gcpOptions.getWorkerZone() == null,
"Experiment worker_region and option workerZone are mutually exclusive.");
+
+ if (gcpOptions.getZone() != null) {
+ LOG.warn("Option --zone is deprecated. Please use --workerZone instead.");
+ gcpOptions.setWorkerZone(gcpOptions.getZone());
+ gcpOptions.setZone(null);
+ }
}
@VisibleForTesting
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 775c782..5e8a9e7 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -342,24 +342,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
}
@Test
- public void testZoneConfig() throws IOException {
- final String testZone = "test-zone-1";
-
- DataflowPipelineOptions options = buildPipelineOptions();
- options.setZone(testZone);
-
- Pipeline p = buildPipeline(options);
- p.traverseTopologically(new RecordingPipelineVisitor());
- Job job =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(p, DataflowRunner.fromOptions(options), Collections.emptyList())
- .getJob();
-
- assertEquals(1, job.getEnvironment().getWorkerPools().size());
- assertEquals(testZone, job.getEnvironment().getWorkerPools().get(0).getZone());
- }
-
- @Test
public void testWorkerMachineTypeConfig() throws IOException {
final String testMachineType = "test-machine-type";
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 5d267f8..3ef255b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -591,6 +591,15 @@ public class DataflowRunnerTest implements Serializable {
}
@Test
+ public void testZoneAliasWorkerZone() {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ options.setZone("us-east1-b");
+ DataflowRunner.validateWorkerSettings(options);
+ assertNull(options.getZone());
+ assertEquals("us-east1-b", options.getWorkerZone());
+ }
+
+ @Test
public void testRun() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Pipeline p = buildDataflowPipeline(options);
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index cb11569..1672684 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -91,13 +91,19 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
* operations.
*
* <p>Default is set on a per-service basis.
+ *
+ * @deprecated Use {@link #getWorkerZone()} instead.
*/
+ @Deprecated
@Description(
"GCP availability zone for running GCP operations. "
+ "and GCE availability zone for launching workers "
- + "Default is up to the individual service.")
+ + "Default is up to the individual service. "
+ + "This option is deprecated, and will be replaced by workerZone.")
String getZone();
+ /** @deprecated Use {@link #setWorkerZone} instead. */
+ @Deprecated
void setZone(String value);
/**
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 36704cd..ac48e7e 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -703,7 +703,8 @@ class WorkerOptions(PipelineOptions):
default=None,
help=(
'GCE availability zone for launching workers. Default is up to the '
- 'Dataflow service.'))
+ 'Dataflow service. This flag is deprecated, and will be replaced '
+ 'by worker_zone.'))
parser.add_argument(
'--network',
default=None,
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator.py b/sdks/python/apache_beam/options/pipeline_options_validator.py
index e2f36e7..3b850fb 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator.py
@@ -230,6 +230,11 @@ class PipelineOptionsValidator(object):
errors.extend(
self._validate_error(
'worker_region and worker_zone are mutually exclusive.'))
+ if view.zone:
+ _LOGGER.warning(
+ 'Option --zone is deprecated. Please use --worker_zone instead.')
+ view.worker_zone = view.zone
+ view.zone = None
return errors
def validate_optional_argument_positive(self, view, arg_name):
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
index aa0ccc3..8251155 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
@@ -32,6 +32,7 @@ from hamcrest.core.base_matcher import BaseMatcher
from apache_beam.internal import pickler
from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options_validator import PipelineOptionsValidator
@@ -453,6 +454,19 @@ class SetupTest(unittest.TestCase):
self.assertIn('worker_region', errors[0])
self.assertIn('worker_zone', errors[0])
+ def test_zone_alias_worker_zone(self):
+ runner = MockRunners.DataflowRunner()
+ options = PipelineOptions([
+ '--zone=us-east1-b',
+ '--project=example:example',
+ '--temp_location=gs://foo/bar',
+ ])
+ validator = PipelineOptionsValidator(options, runner)
+ errors = validator.validate()
+ self.assertEqual(len(errors), 0)
+ self.assertIsNone(options.view_as(WorkerOptions).zone)
+ self.assertEqual(options.view_as(WorkerOptions).worker_zone, 'us-east1-b')
+
def test_test_matcher(self):
def get_validator(matcher):
options = [