You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/18 20:09:16 UTC

[4/7] beam git commit: Add assertion that valid jobs must have staged pipeline

Add assertion that valid jobs must have staged pipeline


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cef997ff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cef997ff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cef997ff

Branch: refs/heads/master
Commit: cef997ff06629a2c77b5aeb4f9ad40d8c4b3b22c
Parents: 090c512
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 18 06:49:13 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Wed Oct 18 13:02:25 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 3 ++-
 .../org/apache/beam/runners/dataflow/DataflowRunnerTest.java  | 7 +++++++
 2 files changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cef997ff/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ecef072..545321d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -192,7 +192,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   @VisibleForTesting
   static final String PIPELINE_FILE_NAME = "pipeline.pb";
 
-  private static final String STAGED_PIPELINE_METADATA_PROPERTY = "pipeline_url";
+  @VisibleForTesting
+  static final String STAGED_PIPELINE_METADATA_PROPERTY = "pipeline_url";
 
   private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cef997ff/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 5bc798a..02abc34 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.startsWith;
@@ -45,6 +46,7 @@ import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.ListJobsResponse;
+import com.google.api.services.dataflow.model.WorkerPool;
 import com.google.api.services.storage.model.StorageObject;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
@@ -163,6 +165,11 @@ public class DataflowRunnerTest implements Serializable {
     assertNull(job.getId());
     assertNull(job.getCurrentState());
     assertTrue(Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName()));
+
+    for (WorkerPool workerPool : job.getEnvironment().getWorkerPools()) {
+      assertThat(workerPool.getMetadata(),
+          hasKey(DataflowRunner.STAGED_PIPELINE_METADATA_PROPERTY));
+    }
   }
 
   @Before