You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/09 22:13:18 UTC
[1/2] beam git commit: Stage the portable pipeline;
put URL in pipeline options
Repository: beam
Updated Branches:
refs/heads/master e43edcdb9 -> ae077cce0
Stage the portable pipeline; put URL in pipeline options
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6c1ebdbe
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6c1ebdbe
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6c1ebdbe
Branch: refs/heads/master
Commit: 6c1ebdbe142f073df42447339eade1d98641a21a
Parents: 3dfcb44
Author: Kenneth Knowles <ke...@apache.org>
Authored: Wed Nov 1 14:01:11 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Nov 6 21:37:50 2017 -0800
----------------------------------------------------------------------
.../org/apache/beam/runners/dataflow/DataflowRunner.java | 11 ++++-------
.../dataflow/options/DataflowPipelineOptions.java | 7 +++++++
.../apache/beam/runners/dataflow/DataflowRunnerTest.java | 10 +++++-----
3 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6c1ebdbe/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 334c8e5..0a20a0f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -192,9 +192,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@VisibleForTesting
static final String PIPELINE_FILE_NAME = "pipeline.pb";
- @VisibleForTesting
- static final String STAGED_PIPELINE_METADATA_PROPERTY = "pipeline_url";
-
private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat;
/**
@@ -544,6 +541,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
maybeRegisterDebuggee(dataflowOptions, requestId);
+ // Set the location of the staged pipeline; this must happen before
+ // translation, because that is where the JSON pipeline options are set up
+ dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
+
JobSpecification jobSpecification =
translator.translate(pipeline, this, packages);
Job newJob = jobSpecification.getJob();
@@ -571,10 +572,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
String workerHarnessContainerImage = getContainerImageForJob(options);
for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage);
-
- // https://issues.apache.org/jira/browse/BEAM-3116
- // workerPool.setMetadata(
- // ImmutableMap.of(STAGED_PIPELINE_METADATA_PROPERTY, stagedPipeline.getLocation()));
}
newJob.getEnvironment().setVersion(getEnvironmentVersion(options));
http://git-wip-us.apache.org/repos/asf/beam/blob/6c1ebdbe/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 091f89b..ddb98f1 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -124,6 +124,13 @@ public interface DataflowPipelineOptions
void setLabels(Map<String, String> labels);
/**
+ * The URL of the staged portable pipeline.
+ */
+ @Description("The URL of the staged portable pipeline")
+ String getPipelineUrl();
+ void setPipelineUrl(String urlString);
+
+ /**
* Returns a default staging location under {@link GcpOptions#getGcpTempLocation}.
*/
class StagingLocationFactory implements DefaultValueFactory<String> {
http://git-wip-us.apache.org/repos/asf/beam/blob/6c1ebdbe/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 66cf11d..3467d53 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
@@ -63,6 +64,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
@@ -164,11 +166,9 @@ public class DataflowRunnerTest implements Serializable {
assertNull(job.getCurrentState());
assertTrue(Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName()));
- // https://issues.apache.org/jira/browse/BEAM-3116
- // for (WorkerPool workerPool : job.getEnvironment().getWorkerPools()) {
- // assertThat(workerPool.getMetadata(),
- // hasKey(DataflowRunner.STAGED_PIPELINE_METADATA_PROPERTY));
- // }
+ assertThat(
+ (Map<String, Object>) job.getEnvironment().getSdkPipelineOptions().get("options"),
+ hasKey("pipelineUrl"));
}
@Before
[2/2] beam git commit: This closes #4089: [BEAM-2963] Stage the
portable pipeline; put URL in pipeline options
Posted by ke...@apache.org.
This closes #4089: [BEAM-2963] Stage the portable pipeline; put URL in pipeline options
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ae077cce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ae077cce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ae077cce
Branch: refs/heads/master
Commit: ae077cce0585e30763e01344005b21ba1410d17d
Parents: e43edcd 6c1ebdb
Author: Kenneth Knowles <ke...@apache.org>
Authored: Thu Nov 9 13:59:58 2017 -0800
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Thu Nov 9 13:59:58 2017 -0800
----------------------------------------------------------------------
.../org/apache/beam/runners/dataflow/DataflowRunner.java | 11 ++++-------
.../dataflow/options/DataflowPipelineOptions.java | 7 +++++++
.../apache/beam/runners/dataflow/DataflowRunnerTest.java | 10 +++++-----
3 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------