You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/03 22:58:23 UTC
[1/2] beam git commit: DataflowRunner: automatically determine
container image type
Repository: beam
Updated Branches:
refs/heads/master 3711c0caf -> 844e53e34
DataflowRunner: automatically determine container image type
Even if user supplies a base container image policy.
Of course, preserve the ability of the user to fully override the image.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6b7593b0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6b7593b0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6b7593b0
Branch: refs/heads/master
Commit: 6b7593b0a3cd0514145bb6002fff0958a6630303
Parents: 3711c0c
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 3 09:25:57 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 3 15:58:07 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 18 ++++++++++----
.../options/DataflowPipelineDebugOptions.java | 2 +-
.../DataflowPipelineWorkerPoolOptions.java | 10 +-------
.../runners/dataflow/DataflowRunnerTest.java | 25 ++++++++++++++++++++
4 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6b7593b0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ea96ae8..6eec8f8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -68,7 +68,6 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecificat
import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
import org.apache.beam.runners.dataflow.util.DataflowTransport;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -543,9 +542,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// Set the Docker container image that executes Dataflow worker harness, residing in Google
// Container Registry. Translator is guaranteed to create a worker pool prior to this point.
- String workerHarnessContainerImage =
- options.as(DataflowPipelineWorkerPoolOptions.class)
- .getWorkerHarnessContainerImage();
+ String workerHarnessContainerImage = getContainerImageForJob(options);
for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage);
}
@@ -1341,4 +1338,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
+ @VisibleForTesting
+ static String getContainerImageForJob(DataflowPipelineOptions options) {
+ String workerHarnessContainerImage = options.getWorkerHarnessContainerImage();
+ if (!workerHarnessContainerImage.contains("IMAGE")) {
+ return workerHarnessContainerImage;
+ } else if (hasExperiment(options, "beam_fn_api")) {
+ return workerHarnessContainerImage.replace("IMAGE", "java");
+ } else if (options.isStreaming()) {
+ return workerHarnessContainerImage.replace("IMAGE", "beam-java-streaming");
+ } else {
+ return workerHarnessContainerImage.replace("IMAGE", "beam-java-batch");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6b7593b0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 729bca4..d0ea722 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -56,7 +56,7 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
@Experimental
@Nullable
List<String> getExperiments();
- void setExperiments(List<String> value);
+ void setExperiments(@Nullable List<String> value);
/**
* The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value
http://git-wip-us.apache.org/repos/asf/beam/blob/6b7593b0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
index e2c4bf4..00d2194 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.options;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.List;
import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.DataflowRunnerInfo;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.Default;
@@ -129,15 +128,8 @@ public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions {
implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
- DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
String containerVersion = DataflowRunnerInfo.getDataflowRunnerInfo().getContainerVersion();
- String containerType;
- if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api")) {
- containerType = "java";
- } else {
- containerType = dataflowOptions.isStreaming() ? "beam-java-streaming" : "beam-java-batch";
- }
- return String.format("dataflow.gcr.io/v1beta3/%s:%s", containerType, containerVersion);
+ return String.format("dataflow.gcr.io/v1beta3/IMAGE:%s", containerVersion);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6b7593b0/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 0735b5c..79a96e7 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow;
+import static org.apache.beam.runners.dataflow.DataflowRunner.getContainerImageForJob;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@@ -1103,4 +1104,28 @@ public class DataflowRunnerTest {
assertFalse(DataflowRunner.hasExperiment(options, "ba"));
assertFalse(DataflowRunner.hasExperiment(options, "BAR"));
}
+
+ @Test
+ public void testWorkerHarnessContainerImage() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+
+ // default image set
+ options.setWorkerHarnessContainerImage("some-container");
+ assertThat(getContainerImageForJob(options), equalTo("some-container"));
+
+ // batch, legacy
+ options.setWorkerHarnessContainerImage("gcr.io/IMAGE/foo");
+ options.setExperiments(null);
+ options.setStreaming(false);
+ assertThat(
+ getContainerImageForJob(options), equalTo("gcr.io/beam-java-batch/foo"));
+ // streaming, legacy
+ options.setStreaming(true);
+ assertThat(
+ getContainerImageForJob(options), equalTo("gcr.io/beam-java-streaming/foo"));
+ // streaming, fnapi
+ options.setExperiments(ImmutableList.of("experiment1", "beam_fn_api"));
+ assertThat(
+ getContainerImageForJob(options), equalTo("gcr.io/java/foo"));
+ }
}
[2/2] beam git commit: This closes #2407
Posted by dh...@apache.org.
This closes #2407
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/844e53e3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/844e53e3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/844e53e3
Branch: refs/heads/master
Commit: 844e53e341bf6d465e4185903a3a5047a3e68282
Parents: 3711c0c 6b7593b
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 3 15:58:11 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 3 15:58:11 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 18 ++++++++++----
.../options/DataflowPipelineDebugOptions.java | 2 +-
.../DataflowPipelineWorkerPoolOptions.java | 10 +-------
.../runners/dataflow/DataflowRunnerTest.java | 25 ++++++++++++++++++++
4 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------