You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2019/11/05 00:52:08 UTC
[beam] branch master updated: [BEAM-8254] add workerRegion and
workerZone options to Java SDK
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8f3818e [BEAM-8254] add workerRegion and workerZone options to Java SDK
new e7f1539 Merge pull request #9961 from ibzib/java-worker
8f3818e is described below
commit 8f3818e9a256e3a24129ba7762e7f7c3ddd3f783
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Thu Oct 31 16:50:35 2019 -0700
[BEAM-8254] add workerRegion and workerZone options to Java SDK
---
.../beam/runners/dataflow/DataflowRunner.java | 34 +++++++++++++++
.../beam/runners/dataflow/DataflowRunnerTest.java | 50 ++++++++++++++++++++++
.../sdk/extensions/gcp/options/GcpOptions.java | 31 ++++++++++++++
3 files changed, 115 insertions(+)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 9f7394f..51ee59c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -90,6 +90,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileBasedSink;
@@ -158,6 +159,7 @@ import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Utf8;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -253,6 +255,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
+ "https://cloud.google.com/compute/docs/regions-zones/regions-zones");
}
+ validateWorkerSettings(PipelineOptionsValidator.validate(GcpOptions.class, options));
+
PathValidator validator = dataflowOptions.getPathValidator();
String gcpTempLocation;
try {
@@ -358,6 +362,36 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@VisibleForTesting
+ static void validateWorkerSettings(GcpOptions gcpOptions) {
+ Preconditions.checkArgument(
+ gcpOptions.getZone() == null || gcpOptions.getWorkerRegion() == null,
+ "Cannot use option zone with workerRegion. Prefer either workerZone or workerRegion.");
+ Preconditions.checkArgument(
+ gcpOptions.getZone() == null || gcpOptions.getWorkerZone() == null,
+ "Cannot use option zone with workerZone. Prefer workerZone.");
+ Preconditions.checkArgument(
+ gcpOptions.getWorkerRegion() == null || gcpOptions.getWorkerZone() == null,
+ "workerRegion and workerZone options are mutually exclusive.");
+
+ DataflowPipelineOptions dataflowOptions = gcpOptions.as(DataflowPipelineOptions.class);
+ boolean hasExperimentWorkerRegion = false;
+ if (dataflowOptions.getExperiments() != null) {
+ for (String experiment : dataflowOptions.getExperiments()) {
+ if (experiment.startsWith("worker_region")) {
+ hasExperimentWorkerRegion = true;
+ break;
+ }
+ }
+ }
+ Preconditions.checkArgument(
+ !hasExperimentWorkerRegion || gcpOptions.getWorkerRegion() == null,
+ "Experiment worker_region and option workerRegion are mutually exclusive.");
+ Preconditions.checkArgument(
+ !hasExperimentWorkerRegion || gcpOptions.getWorkerZone() == null,
+ "Experiment worker_region and option workerZone are mutually exclusive.");
+ }
+
+ @VisibleForTesting
protected DataflowRunner(DataflowPipelineOptions options) {
this.options = options;
this.dataflowClient = DataflowClient.create(options);
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index f46d034..4438e14 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -34,6 +34,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
@@ -93,6 +94,7 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
@@ -103,6 +105,7 @@ import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -538,6 +541,53 @@ public class DataflowRunnerTest implements Serializable {
}
@Test
+ public void testZoneAndWorkerRegionMutuallyExclusive() {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ options.setZone("us-east1-b");
+ options.setWorkerRegion("us-east1");
+ assertThrows(
+ IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options));
+ }
+
+ @Test
+ public void testZoneAndWorkerZoneMutuallyExclusive() {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ options.setZone("us-east1-b");
+ options.setWorkerZone("us-east1-c");
+ assertThrows(
+ IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options));
+ }
+
+ @Test
+ public void testExperimentRegionAndWorkerRegionMutuallyExclusive() {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+ ExperimentalOptions.addExperiment(dataflowOptions, "worker_region=us-west1");
+ options.setWorkerRegion("us-east1");
+ assertThrows(
+ IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options));
+ }
+
+ @Test
+ public void testExperimentRegionAndWorkerZoneMutuallyExclusive() {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+ ExperimentalOptions.addExperiment(dataflowOptions, "worker_region=us-west1");
+ options.setWorkerZone("us-east1-b");
+ assertThrows(
+ IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options));
+ }
+
+ @Test
+ public void testWorkerRegionAndWorkerZoneMutuallyExclusive() {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ options.setWorkerRegion("us-east1");
+ options.setWorkerZone("us-east1-b");
+ assertThrows(
+ IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options));
+ }
+
+ @Test
public void testRun() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Pipeline p = buildDataflowPipeline(options);
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index 37cf6bc..cb11569 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -101,6 +101,37 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
void setZone(String value);
/**
+ * The Compute Engine region (https://cloud.google.com/compute/docs/regions-zones/regions-zones)
+ * in which worker processing should occur, e.g. "us-west1". Mutually exclusive with {@link
+ * #getWorkerZone()}. If neither workerRegion nor workerZone is specified, default to same value
+ * as region.
+ */
+ @Description(
+ "The Compute Engine region "
+ + "(https://cloud.google.com/compute/docs/regions-zones/regions-zones) in which worker "
+ + "processing should occur, e.g. \"us-west1\". Mutually exclusive with workerZone. If "
+ + "neither workerRegion nor workerZone is specified, default to same value as region.")
+ String getWorkerRegion();
+
+ void setWorkerRegion(String workerRegion);
+
+ /**
+ * The Compute Engine zone (https://cloud.google.com/compute/docs/regions-zones/regions-zones) in
+ * which worker processing should occur, e.g. "us-west1-a". Mutually exclusive with {@link
+ * #getWorkerRegion()}. If neither workerRegion nor workerZone is specified, the Dataflow service
+ * will choose a zone in region based on available capacity.
+ */
+ @Description(
+ "The Compute Engine zone "
+ + "(https://cloud.google.com/compute/docs/regions-zones/regions-zones) in which worker "
+ + "processing should occur, e.g. \"us-west1-a\". Mutually exclusive with workerRegion. "
+ + "If neither workerRegion nor workerZone is specified, the Dataflow service will choose "
+ + "a zone in region based on available capacity.")
+ String getWorkerZone();
+
+ void setWorkerZone(String workerZone);
+
+ /**
* The class of the credential factory that should be created and used to create credentials. If
* gcpCredential has not been set explicitly, an instance of this class will be constructed and
* used as a credential factory.