You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/03 22:58:23 UTC

[1/2] beam git commit: DataflowRunner: automatically determine container image type

Repository: beam
Updated Branches:
  refs/heads/master 3711c0caf -> 844e53e34


DataflowRunner: automatically determine container image type

Even if user supplies a base container image policy.

Of course, preserve the ability of the user to fully override the image.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6b7593b0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6b7593b0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6b7593b0

Branch: refs/heads/master
Commit: 6b7593b0a3cd0514145bb6002fff0958a6630303
Parents: 3711c0c
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 3 09:25:57 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 3 15:58:07 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 18 ++++++++++----
 .../options/DataflowPipelineDebugOptions.java   |  2 +-
 .../DataflowPipelineWorkerPoolOptions.java      | 10 +-------
 .../runners/dataflow/DataflowRunnerTest.java    | 25 ++++++++++++++++++++
 4 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6b7593b0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ea96ae8..6eec8f8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -68,7 +68,6 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecificat
 import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
 import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
 import org.apache.beam.runners.dataflow.util.DataflowTransport;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -543,9 +542,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     // Set the Docker container image that executes Dataflow worker harness, residing in Google
     // Container Registry. Translator is guaranteed to create a worker pool prior to this point.
-    String workerHarnessContainerImage =
-        options.as(DataflowPipelineWorkerPoolOptions.class)
-        .getWorkerHarnessContainerImage();
+    String workerHarnessContainerImage = getContainerImageForJob(options);
     for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
       workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage);
     }
@@ -1341,4 +1338,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
+  @VisibleForTesting
+  static String getContainerImageForJob(DataflowPipelineOptions options) {
+    String workerHarnessContainerImage = options.getWorkerHarnessContainerImage();
+    if (!workerHarnessContainerImage.contains("IMAGE")) {
+      return workerHarnessContainerImage;
+    } else if (hasExperiment(options, "beam_fn_api")) {
+      return workerHarnessContainerImage.replace("IMAGE", "java");
+    } else if (options.isStreaming()) {
+      return workerHarnessContainerImage.replace("IMAGE", "beam-java-streaming");
+    } else {
+      return workerHarnessContainerImage.replace("IMAGE", "beam-java-batch");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6b7593b0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 729bca4..d0ea722 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -56,7 +56,7 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
   @Experimental
   @Nullable
   List<String> getExperiments();
-  void setExperiments(List<String> value);
+  void setExperiments(@Nullable List<String> value);
 
   /**
    * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value

http://git-wip-us.apache.org/repos/asf/beam/blob/6b7593b0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
index e2c4bf4..00d2194 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.options;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.util.List;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.DataflowRunnerInfo;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.Default;
@@ -129,15 +128,8 @@ public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions {
       implements DefaultValueFactory<String> {
     @Override
     public String create(PipelineOptions options) {
-      DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
       String containerVersion = DataflowRunnerInfo.getDataflowRunnerInfo().getContainerVersion();
-      String containerType;
-      if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api")) {
-        containerType = "java";
-      } else {
-        containerType = dataflowOptions.isStreaming() ? "beam-java-streaming" : "beam-java-batch";
-      }
-      return String.format("dataflow.gcr.io/v1beta3/%s:%s", containerType, containerVersion);
+      return String.format("dataflow.gcr.io/v1beta3/IMAGE:%s", containerVersion);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6b7593b0/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 0735b5c..79a96e7 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.dataflow;
 
+import static org.apache.beam.runners.dataflow.DataflowRunner.getContainerImageForJob;
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -1103,4 +1104,28 @@ public class DataflowRunnerTest {
     assertFalse(DataflowRunner.hasExperiment(options, "ba"));
     assertFalse(DataflowRunner.hasExperiment(options, "BAR"));
   }
+
+  @Test
+  public void testWorkerHarnessContainerImage() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+
+    // default image set
+    options.setWorkerHarnessContainerImage("some-container");
+    assertThat(getContainerImageForJob(options), equalTo("some-container"));
+
+    // batch, legacy
+    options.setWorkerHarnessContainerImage("gcr.io/IMAGE/foo");
+    options.setExperiments(null);
+    options.setStreaming(false);
+    assertThat(
+        getContainerImageForJob(options), equalTo("gcr.io/beam-java-batch/foo"));
+    // streaming, legacy
+    options.setStreaming(true);
+    assertThat(
+        getContainerImageForJob(options), equalTo("gcr.io/beam-java-streaming/foo"));
+    // streaming, fnapi
+    options.setExperiments(ImmutableList.of("experiment1", "beam_fn_api"));
+    assertThat(
+        getContainerImageForJob(options), equalTo("gcr.io/java/foo"));
+  }
 }


[2/2] beam git commit: This closes #2407

Posted by dh...@apache.org.
This closes #2407


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/844e53e3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/844e53e3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/844e53e3

Branch: refs/heads/master
Commit: 844e53e341bf6d465e4185903a3a5047a3e68282
Parents: 3711c0c 6b7593b
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 3 15:58:11 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 3 15:58:11 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 18 ++++++++++----
 .../options/DataflowPipelineDebugOptions.java   |  2 +-
 .../DataflowPipelineWorkerPoolOptions.java      | 10 +-------
 .../runners/dataflow/DataflowRunnerTest.java    | 25 ++++++++++++++++++++
 4 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------