You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/09 22:13:18 UTC

[1/2] beam git commit: Stage the portable pipeline; put URL in pipeline options

Repository: beam
Updated Branches:
  refs/heads/master e43edcdb9 -> ae077cce0


Stage the portable pipeline; put URL in pipeline options


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6c1ebdbe
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6c1ebdbe
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6c1ebdbe

Branch: refs/heads/master
Commit: 6c1ebdbe142f073df42447339eade1d98641a21a
Parents: 3dfcb44
Author: Kenneth Knowles <ke...@apache.org>
Authored: Wed Nov 1 14:01:11 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Nov 6 21:37:50 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/runners/dataflow/DataflowRunner.java | 11 ++++-------
 .../dataflow/options/DataflowPipelineOptions.java        |  7 +++++++
 .../apache/beam/runners/dataflow/DataflowRunnerTest.java | 10 +++++-----
 3 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6c1ebdbe/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 334c8e5..0a20a0f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -192,9 +192,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   @VisibleForTesting
   static final String PIPELINE_FILE_NAME = "pipeline.pb";
 
-  @VisibleForTesting
-  static final String STAGED_PIPELINE_METADATA_PROPERTY = "pipeline_url";
-
   private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat;
 
   /**
@@ -544,6 +541,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     maybeRegisterDebuggee(dataflowOptions, requestId);
 
+    // Set the location of the staged pipeline; this must happen before
+    // translation, because that is where the JSON pipeline options are set up
+    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
+
     JobSpecification jobSpecification =
         translator.translate(pipeline, this, packages);
     Job newJob = jobSpecification.getJob();
@@ -571,10 +572,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     String workerHarnessContainerImage = getContainerImageForJob(options);
     for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
       workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage);
-
-      // https://issues.apache.org/jira/browse/BEAM-3116
-      // workerPool.setMetadata(
-      //    ImmutableMap.of(STAGED_PIPELINE_METADATA_PROPERTY, stagedPipeline.getLocation()));
     }
 
     newJob.getEnvironment().setVersion(getEnvironmentVersion(options));

http://git-wip-us.apache.org/repos/asf/beam/blob/6c1ebdbe/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 091f89b..ddb98f1 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -124,6 +124,13 @@ public interface DataflowPipelineOptions
   void setLabels(Map<String, String> labels);
 
   /**
+   * The URL of the staged portable pipeline.
+   */
+  @Description("The URL of the staged portable pipeline")
+  String getPipelineUrl();
+  void setPipelineUrl(String urlString);
+
+  /**
    * Returns a default staging location under {@link GcpOptions#getGcpTempLocation}.
    */
   class StagingLocationFactory implements DefaultValueFactory<String> {

http://git-wip-us.apache.org/repos/asf/beam/blob/6c1ebdbe/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 66cf11d..3467d53 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.startsWith;
@@ -63,6 +64,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
@@ -164,11 +166,9 @@ public class DataflowRunnerTest implements Serializable {
     assertNull(job.getCurrentState());
     assertTrue(Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName()));
 
-    // https://issues.apache.org/jira/browse/BEAM-3116
-    // for (WorkerPool workerPool : job.getEnvironment().getWorkerPools()) {
-    //   assertThat(workerPool.getMetadata(),
-    //       hasKey(DataflowRunner.STAGED_PIPELINE_METADATA_PROPERTY));
-    // }
+    assertThat(
+        (Map<String, Object>) job.getEnvironment().getSdkPipelineOptions().get("options"),
+        hasKey("pipelineUrl"));
   }
 
   @Before


[2/2] beam git commit: This closes #4089: [BEAM-2963] Stage the portable pipeline; put URL in pipeline options

Posted by ke...@apache.org.
This closes #4089: [BEAM-2963] Stage the portable pipeline; put URL in pipeline options


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ae077cce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ae077cce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ae077cce

Branch: refs/heads/master
Commit: ae077cce0585e30763e01344005b21ba1410d17d
Parents: e43edcd 6c1ebdb
Author: Kenneth Knowles <ke...@apache.org>
Authored: Thu Nov 9 13:59:58 2017 -0800
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Thu Nov 9 13:59:58 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/runners/dataflow/DataflowRunner.java | 11 ++++-------
 .../dataflow/options/DataflowPipelineOptions.java        |  7 +++++++
 .../apache/beam/runners/dataflow/DataflowRunnerTest.java | 10 +++++-----
 3 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------