You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/03/03 02:42:30 UTC
[1/2] beam git commit: DataflowRunner: experimental support for
issuing FnAPI based jobs
Repository: beam
Updated Branches:
refs/heads/master 8f5f19d11 -> 0a6211b56
DataflowRunner: experimental support for issuing FnAPI based jobs
Also cleanup some code around checking for existence of experiments.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/131c9f91
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/131c9f91
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/131c9f91
Branch: refs/heads/master
Commit: 131c9f916dae6345ec77a869112ae5901b568f23
Parents: 8f5f19d
Author: Dan Halperin <dh...@google.com>
Authored: Wed Mar 1 23:06:11 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Mar 2 18:42:19 2017 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 3 +-
.../dataflow/DataflowPipelineTranslator.java | 3 +-
.../beam/runners/dataflow/DataflowRunner.java | 46 ++++++++++++--------
.../runners/dataflow/DataflowRunnerInfo.java | 38 ++++++++--------
.../options/DataflowPipelineDebugOptions.java | 2 +
.../DataflowPipelineWorkerPoolOptions.java | 10 +++--
.../beam/runners/dataflow/dataflow.properties | 8 ++--
.../dataflow/DataflowRunnerInfoTest.java | 23 +++++-----
.../runners/dataflow/DataflowRunnerTest.java | 17 ++++++++
9 files changed, 92 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index fdd088f..fb06797 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -34,7 +34,8 @@
<properties>
<dataflow.container_version>beam-master-20170228</dataflow.container_version>
- <dataflow.environment_major_version>6</dataflow.environment_major_version>
+ <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
+ <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
</properties>
<build>
http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 7e559e9..06e5048 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -327,8 +327,7 @@ public class DataflowPipelineTranslator {
workerPool.setNumWorkers(options.getNumWorkers());
if (options.isStreaming()
- && (options.getExperiments() == null
- || !options.getExperiments().contains("enable_windmill_service"))) {
+ && !DataflowRunner.hasExperiment(options, "enable_windmill_service")) {
// Use separate data disk for streaming.
Disk disk = new Disk();
disk.setDiskType(options.getWorkerDiskType());
http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index dbf1958..50b6b4f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -51,7 +51,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -307,14 +306,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PTransformMatchers.parDoWithFnType(unsupported),
UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, true)));
}
- if (options.getExperiments() == null
- || !options.getExperiments().contains("enable_custom_pubsub_source")) {
+ if (!hasExperiment(options, "enable_custom_pubsub_source")) {
ptoverrides.put(
PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
new ReflectiveRootOverrideFactory(StreamingPubsubIORead.class, this));
}
- if (options.getExperiments() == null
- || !options.getExperiments().contains("enable_custom_pubsub_sink")) {
+ if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
ptoverrides.put(
PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
new StreamingPubsubIOWriteOverrideFactory(this));
@@ -559,20 +556,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage);
}
- // Requirements about the service.
- Map<String, Object> environmentVersion = new HashMap<>();
- environmentVersion.put(
- PropertyNames.ENVIRONMENT_VERSION_MAJOR_KEY,
- DataflowRunnerInfo.getDataflowRunnerInfo().getEnvironmentMajorVersion());
- newJob.getEnvironment().setVersion(environmentVersion);
- // Default jobType is JAVA_BATCH_AUTOSCALING: A Java job with workers that the job can
- // autoscale if specified.
- String jobType = "JAVA_BATCH_AUTOSCALING";
-
- if (options.isStreaming()) {
- jobType = "STREAMING";
- }
- environmentVersion.put(PropertyNames.ENVIRONMENT_VERSION_JOB_TYPE_KEY, jobType);
+ newJob.getEnvironment().setVersion(getEnvironmentVersion(options));
if (hooks != null) {
hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment());
@@ -680,6 +664,30 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
return dataflowPipelineJob;
}
+ /** Returns true if the specified experiment is enabled, handling null experiments. */
+ public static boolean hasExperiment(DataflowPipelineDebugOptions options, String experiment) {
+ List<String> experiments =
+ firstNonNull(options.getExperiments(), Collections.<String>emptyList());
+ return experiments.contains(experiment);
+ }
+
+ /** Helper to configure the Dataflow Job Environment based on the user's job options. */
+ private static Map<String, Object> getEnvironmentVersion(DataflowPipelineOptions options) {
+ DataflowRunnerInfo runnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
+ String majorVersion;
+ String jobType;
+ if (hasExperiment(options, "beam_fn_api")) {
+ majorVersion = runnerInfo.getFnApiEnvironmentMajorVersion();
+ jobType = options.isStreaming() ? "FNAPI_STREAMING" : "FNAPI_BATCH";
+ } else {
+ majorVersion = runnerInfo.getLegacyEnvironmentMajorVersion();
+ jobType = options.isStreaming() ? "STREAMING" : "JAVA_BATCH_AUTOSCALING";
+ }
+ return ImmutableMap.<String, Object>of(
+ PropertyNames.ENVIRONMENT_VERSION_MAJOR_KEY, majorVersion,
+ PropertyNames.ENVIRONMENT_VERSION_JOB_TYPE_KEY, jobType);
+ }
+
@VisibleForTesting
void replaceTransforms(Pipeline pipeline) {
for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : overrides.entrySet()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
index 59cb8a4..12b3f38 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
@@ -47,32 +47,34 @@ public final class DataflowRunnerInfo {
private Properties properties;
- private static final String ENVIRONMENT_MAJOR_VERSION_KEY = "environment.major.version";
- private static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY = "worker.image.batch";
- private static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY =
- "worker.image.streaming";
+ private static final String FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY =
+ "fnapi.environment.major.version";
+ private static final String LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY =
+ "legacy.environment.major.version";
+ private static final String CONTAINER_VERSION_KEY = "container.version";
- /** Provides the environment's major version number. */
- public String getEnvironmentMajorVersion() {
+ /** Provides the legacy environment's major version number. */
+ public String getLegacyEnvironmentMajorVersion() {
checkState(
- properties.containsKey(ENVIRONMENT_MAJOR_VERSION_KEY), "Unknown environment major version");
- return properties.getProperty(ENVIRONMENT_MAJOR_VERSION_KEY);
+ properties.containsKey(LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY),
+ "Unknown legacy environment major version");
+ return properties.getProperty(LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY);
}
- /** Provides the batch worker harness container image name. */
- public String getBatchWorkerHarnessContainerImage() {
+ /** Provides the FnAPI environment's major version number. */
+ public String getFnApiEnvironmentMajorVersion() {
checkState(
- properties.containsKey(BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY),
- "Unknown batch worker harness container image");
- return properties.getProperty(BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY);
+ properties.containsKey(FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY),
+ "Unknown FnAPI environment major version");
+ return properties.getProperty(FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY);
}
- /** Provides the streaming worker harness container image name. */
- public String getStreamingWorkerHarnessContainerImage() {
+ /** Provides the container version that will be used for constructing harness image paths. */
+ public String getContainerVersion() {
checkState(
- properties.containsKey(STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY),
- "Unknown streaming worker harness container image");
- return properties.getProperty(STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY);
+ properties.containsKey(CONTAINER_VERSION_KEY),
+ "Unknown container version");
+ return properties.getProperty(CONTAINER_VERSION_KEY);
}
private DataflowRunnerInfo(String resourcePath) {
http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index cdfa3f5..729bca4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.api.services.dataflow.Dataflow;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.util.DataflowTransport;
import org.apache.beam.runners.dataflow.util.GcsStager;
import org.apache.beam.runners.dataflow.util.Stager;
@@ -53,6 +54,7 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
+ "be enabled with this flag. Please sync with the Dataflow team before enabling any "
+ "experiments.")
@Experimental
+ @Nullable
List<String> getExperiments();
void setExperiments(List<String> value);
http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
index 3c5d05a..e2c4bf4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.options;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.List;
import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.DataflowRunnerInfo;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.Default;
@@ -129,11 +130,14 @@ public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions {
@Override
public String create(PipelineOptions options) {
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
- if (dataflowOptions.isStreaming()) {
- return DataflowRunnerInfo.getDataflowRunnerInfo().getStreamingWorkerHarnessContainerImage();
+ String containerVersion = DataflowRunnerInfo.getDataflowRunnerInfo().getContainerVersion();
+ String containerType;
+ if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api")) {
+ containerType = "java";
} else {
- return DataflowRunnerInfo.getDataflowRunnerInfo().getBatchWorkerHarnessContainerImage();
+ containerType = dataflowOptions.isStreaming() ? "beam-java-streaming" : "beam-java-batch";
}
+ return String.format("dataflow.gcr.io/v1beta3/%s:%s", containerType, containerVersion);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index 47e316c..ac68970 100644
--- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -16,8 +16,6 @@
#
# Dataflow runtime properties
-environment.major.version=${dataflow.environment_major_version}
-
-worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:${dataflow.container_version}
-
-worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:${dataflow.container_version}
+legacy.environment.major.version=${dataflow.legacy_environment_major_version}
+fnapi.environment.major.version=${dataflow.fnapi_environment_major_version}
+container.version=${dataflow.container_version}
http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
index 9b5b374..3502040 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -32,20 +33,22 @@ public class DataflowRunnerInfoTest {
public void getDataflowRunnerInfo() throws Exception {
DataflowRunnerInfo info = DataflowRunnerInfo.getDataflowRunnerInfo();
- String version = info.getEnvironmentMajorVersion();
+ String version = info.getLegacyEnvironmentMajorVersion();
// Validate major version is a number
assertTrue(
- String.format("Environment major version number %s is not a number", version),
+ String.format("Legacy environment major version number %s is not a number", version),
version.matches("\\d+"));
- // Validate container images contain gcr.io
- assertThat(
- "batch worker harness container image invalid",
- info.getBatchWorkerHarnessContainerImage(),
- containsString("gcr.io"));
+ version = info.getFnApiEnvironmentMajorVersion();
+ // Validate major version is a number
+ assertTrue(
+ String.format("FnAPI environment major version number %s is not a number", version),
+ version.matches("\\d+"));
+
+ // Validate container version does not contain a $ (indicating it was not filled in).
assertThat(
- "streaming worker harness container image invalid",
- info.getStreamingWorkerHarnessContainerImage(),
- containsString("gcr.io"));
+ "container version invalid",
+ info.getContainerVersion(),
+ not(containsString("$")));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/131c9f91/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index a788077..246feb0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@@ -1118,4 +1119,20 @@ public class DataflowRunnerTest {
thrown.expect(RuntimeException.class);
p.run();
}
+
+ @Test
+ public void testHasExperiment() {
+ DataflowPipelineDebugOptions options =
+ PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class);
+
+ options.setExperiments(null);
+ assertFalse(DataflowRunner.hasExperiment(options, "foo"));
+
+ options.setExperiments(ImmutableList.of("foo", "bar"));
+ assertTrue(DataflowRunner.hasExperiment(options, "foo"));
+ assertTrue(DataflowRunner.hasExperiment(options, "bar"));
+ assertFalse(DataflowRunner.hasExperiment(options, "baz"));
+ assertFalse(DataflowRunner.hasExperiment(options, "ba"));
+ assertFalse(DataflowRunner.hasExperiment(options, "BAR"));
+ }
}
[2/2] beam git commit: This closes #2140
Posted by dh...@apache.org.
This closes #2140
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a6211b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a6211b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a6211b5
Branch: refs/heads/master
Commit: 0a6211b56fe88309afd49a92eaea3eaf98ed4dbb
Parents: 8f5f19d 131c9f9
Author: Dan Halperin <dh...@google.com>
Authored: Thu Mar 2 18:42:21 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Mar 2 18:42:21 2017 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 3 +-
.../dataflow/DataflowPipelineTranslator.java | 3 +-
.../beam/runners/dataflow/DataflowRunner.java | 46 ++++++++++++--------
.../runners/dataflow/DataflowRunnerInfo.java | 38 ++++++++--------
.../options/DataflowPipelineDebugOptions.java | 2 +
.../DataflowPipelineWorkerPoolOptions.java | 10 +++--
.../beam/runners/dataflow/dataflow.properties | 8 ++--
.../dataflow/DataflowRunnerInfoTest.java | 23 +++++-----
.../runners/dataflow/DataflowRunnerTest.java | 17 ++++++++
9 files changed, 92 insertions(+), 58 deletions(-)
----------------------------------------------------------------------