You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2019/11/05 00:52:08 UTC

[beam] branch master updated: [BEAM-8254] add workerRegion and workerZone options to Java SDK

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f3818e  [BEAM-8254] add workerRegion and workerZone options to Java SDK
     new e7f1539  Merge pull request #9961 from ibzib/java-worker
8f3818e is described below

commit 8f3818e9a256e3a24129ba7762e7f7c3ddd3f783
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Thu Oct 31 16:50:35 2019 -0700

    [BEAM-8254] add workerRegion and workerZone options to Java SDK
---
 .../beam/runners/dataflow/DataflowRunner.java      | 34 +++++++++++++++
 .../beam/runners/dataflow/DataflowRunnerTest.java  | 50 ++++++++++++++++++++++
 .../sdk/extensions/gcp/options/GcpOptions.java     | 31 ++++++++++++++
 3 files changed, 115 insertions(+)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 9f7394f..51ee59c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -90,6 +90,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
@@ -158,6 +159,7 @@ import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Utf8;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -253,6 +255,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               + "https://cloud.google.com/compute/docs/regions-zones/regions-zones");
     }
 
+    validateWorkerSettings(PipelineOptionsValidator.validate(GcpOptions.class, options));
+
     PathValidator validator = dataflowOptions.getPathValidator();
     String gcpTempLocation;
     try {
@@ -358,6 +362,36 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   @VisibleForTesting
+  static void validateWorkerSettings(GcpOptions gcpOptions) {
+    Preconditions.checkArgument(
+        gcpOptions.getZone() == null || gcpOptions.getWorkerRegion() == null,
+        "Cannot use option zone with workerRegion. Prefer either workerZone or workerRegion.");
+    Preconditions.checkArgument(
+        gcpOptions.getZone() == null || gcpOptions.getWorkerZone() == null,
+        "Cannot use option zone with workerZone. Prefer workerZone.");
+    Preconditions.checkArgument(
+        gcpOptions.getWorkerRegion() == null || gcpOptions.getWorkerZone() == null,
+        "workerRegion and workerZone options are mutually exclusive.");
+
+    DataflowPipelineOptions dataflowOptions = gcpOptions.as(DataflowPipelineOptions.class);
+    boolean hasExperimentWorkerRegion = false;
+    if (dataflowOptions.getExperiments() != null) {
+      for (String experiment : dataflowOptions.getExperiments()) {
+        if (experiment.startsWith("worker_region")) {
+          hasExperimentWorkerRegion = true;
+          break;
+        }
+      }
+    }
+    Preconditions.checkArgument(
+        !hasExperimentWorkerRegion || gcpOptions.getWorkerRegion() == null,
+        "Experiment worker_region and option workerRegion are mutually exclusive.");
+    Preconditions.checkArgument(
+        !hasExperimentWorkerRegion || gcpOptions.getWorkerZone() == null,
+        "Experiment worker_region and option workerZone are mutually exclusive.");
+  }
+
+  @VisibleForTesting
   protected DataflowRunner(DataflowPipelineOptions options) {
     this.options = options;
     this.dataflowClient = DataflowClient.create(options);
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index f46d034..4438e14 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -34,6 +34,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeFalse;
@@ -93,6 +94,7 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
 import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
@@ -103,6 +105,7 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.WriteFilesResult;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -538,6 +541,53 @@ public class DataflowRunnerTest implements Serializable {
   }
 
   @Test
+  public void testZoneAndWorkerRegionMutuallyExclusive() {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    options.setZone("us-east1-b");
+    options.setWorkerRegion("us-east1");
+    assertThrows(
+        IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options));
+  }
+
+  @Test
+  public void testZoneAndWorkerZoneMutuallyExclusive() {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    options.setZone("us-east1-b");
+    options.setWorkerZone("us-east1-c");
+    assertThrows(
+        IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options));
+  }
+
+  @Test
+  public void testExperimentRegionAndWorkerRegionMutuallyExclusive() {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+    ExperimentalOptions.addExperiment(dataflowOptions, "worker_region=us-west1");
+    options.setWorkerRegion("us-east1");
+    assertThrows(
+        IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options));
+  }
+
+  @Test
+  public void testExperimentRegionAndWorkerZoneMutuallyExclusive() {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+    ExperimentalOptions.addExperiment(dataflowOptions, "worker_region=us-west1");
+    options.setWorkerZone("us-east1-b");
+    assertThrows(
+        IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options));
+  }
+
+  @Test
+  public void testWorkerRegionAndWorkerZoneMutuallyExclusive() {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    options.setWorkerRegion("us-east1");
+    options.setWorkerZone("us-east1-b");
+    assertThrows(
+        IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options));
+  }
+
+  @Test
   public void testRun() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
     Pipeline p = buildDataflowPipeline(options);
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index 37cf6bc..cb11569 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -101,6 +101,37 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
   void setZone(String value);
 
   /**
+   * The Compute Engine region (https://cloud.google.com/compute/docs/regions-zones/regions-zones)
+   * in which worker processing should occur, e.g. "us-west1". Mutually exclusive with {@link
+   * #getWorkerZone()}. If neither workerRegion nor workerZone is specified, default to same value
+   * as region.
+   */
+  @Description(
+      "The Compute Engine region "
+          + "(https://cloud.google.com/compute/docs/regions-zones/regions-zones) in which worker "
+          + "processing should occur, e.g. \"us-west1\". Mutually exclusive with workerZone. If "
+          + "neither workerRegion nor workerZone is specified, default to same value as region.")
+  String getWorkerRegion();
+
+  void setWorkerRegion(String workerRegion);
+
+  /**
+   * The Compute Engine zone (https://cloud.google.com/compute/docs/regions-zones/regions-zones) in
+   * which worker processing should occur, e.g. "us-west1-a". Mutually exclusive with {@link
+   * #getWorkerRegion()}. If neither workerRegion nor workerZone is specified, the Dataflow service
+   * will choose a zone in region based on available capacity.
+   */
+  @Description(
+      "The Compute Engine zone "
+          + "(https://cloud.google.com/compute/docs/regions-zones/regions-zones) in which worker "
+          + "processing should occur, e.g. \"us-west1-a\". Mutually exclusive with workerRegion. "
+          + "If neither workerRegion nor workerZone is specified, the Dataflow service will choose "
+          + "a zone in region based on available capacity.")
+  String getWorkerZone();
+
+  void setWorkerZone(String workerZone);
+
+  /**
    * The class of the credential factory that should be created and used to create credentials. If
    * gcpCredential has not been set explicitly, an instance of this class will be constructed and
    * used as a credential factory.