You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/15 19:21:15 UTC

[1/2] incubator-beam git commit: Remove the DataflowPipeline Class

Repository: incubator-beam
Updated Branches:
  refs/heads/master 1eec6863f -> bcefff6a3


Remove the DataflowPipeline Class

Pipelines that run with the DataflowPipelineRunner should be created
using an appropriately constructed PipelineOptions (i.e. one with the
runner set to DataflowPipelineRunner.class)


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1eee4bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1eee4bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1eee4bc

Branch: refs/heads/master
Commit: b1eee4bcca632eacf4a6dd724ed3eeb27ace0d77
Parents: 1eec686
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 15 09:23:08 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 15 10:07:57 2016 -0700

----------------------------------------------------------------------
 .../sdk/options/DataflowPipelineOptions.java    |  14 +-
 .../beam/sdk/runners/DataflowPipeline.java      |  60 -------
 .../sdk/runners/DataflowPipelineRegistrar.java  |   4 +-
 .../sdk/runners/DataflowPipelineRunnerTest.java |  37 +++--
 .../beam/sdk/runners/DataflowPipelineTest.java  |  45 ------
 .../runners/DataflowPipelineTranslatorTest.java | 159 +++++++++++++------
 6 files changed, 136 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
index 4eae85a..50fc956 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.options;
 
-import org.apache.beam.sdk.runners.DataflowPipeline;
+import org.apache.beam.sdk.runners.DataflowPipelineRunner;
 
 import com.google.common.base.MoreObjects;
 
@@ -27,14 +27,14 @@ import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
 /**
- * Options that can be used to configure the {@link DataflowPipeline}.
+ * Options that can be used to configure the {@link DataflowPipelineRunner}.
  */
 @Description("Options that configure the Dataflow pipeline.")
-public interface DataflowPipelineOptions extends
-    PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
-    DataflowPipelineWorkerPoolOptions, BigQueryOptions,
-    GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
-    DataflowProfilingOptions, PubsubOptions {
+public interface DataflowPipelineOptions
+    extends PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
+        DataflowPipelineWorkerPoolOptions, BigQueryOptions, GcsOptions, StreamingOptions,
+        CloudDebuggerOptions, DataflowWorkerLoggingOptions, DataflowProfilingOptions,
+        PubsubOptions {
 
   @Description("Project id. Required when running a Dataflow in the cloud. "
       + "See https://cloud.google.com/storage/docs/projects for further details.")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java
deleted file mode 100644
index 4d91a38..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * A {@link DataflowPipeline} is a {@link Pipeline} that returns a
- * {@link DataflowPipelineJob} when it is
- * {@link org.apache.beam.sdk.Pipeline#run()}.
- *
- * <p>This is not intended for use by users of Cloud Dataflow.
- * Instead, use {@link Pipeline#create(PipelineOptions)} to initialize a
- * {@link Pipeline}.
- */
-public class DataflowPipeline extends Pipeline {
-
-  /**
-   * Creates and returns a new {@link DataflowPipeline} instance for tests.
-   */
-  public static DataflowPipeline create(DataflowPipelineOptions options) {
-    return new DataflowPipeline(options);
-  }
-
-  private DataflowPipeline(DataflowPipelineOptions options) {
-    super(DataflowPipelineRunner.fromOptions(options), options);
-  }
-
-  @Override
-  public DataflowPipelineJob run() {
-    return (DataflowPipelineJob) super.run();
-  }
-
-  @Override
-  public DataflowPipelineRunner getRunner() {
-    return (DataflowPipelineRunner) super.getRunner();
-  }
-
-  @Override
-  public String toString() {
-    return "DataflowPipeline#" + getOptions().as(DataflowPipelineOptions.class).getJobName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java
index 9333d7d..b0f72ed 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java
@@ -26,8 +26,8 @@ import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
 
 /**
- * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for
- * the {@link DataflowPipeline}.
+ * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for the
+ * {@link DataflowPipelineRunner}.
  */
 public class DataflowPipelineRegistrar {
   private DataflowPipelineRegistrar() { }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
index 303acda..8b024fb 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
@@ -125,7 +125,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Tests for DataflowPipelineRunner.
+ * Tests for the {@link DataflowPipelineRunner}.
  */
 @RunWith(JUnit4.class)
 public class DataflowPipelineRunnerTest {
@@ -143,9 +143,10 @@ public class DataflowPipelineRunnerTest {
     assertNull(job.getCurrentState());
   }
 
-  private DataflowPipeline buildDataflowPipeline(DataflowPipelineOptions options) {
+  private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
     options.setStableUniqueNames(CheckEnabled.ERROR);
-    DataflowPipeline p = DataflowPipeline.create(options);
+    options.setRunner(DataflowPipelineRunner.class);
+    Pipeline p = Pipeline.create(options);
 
     p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
         .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
@@ -212,6 +213,7 @@ public class DataflowPipelineRunnerTest {
   private DataflowPipelineOptions buildPipelineOptions(
       ArgumentCaptor<Job> jobCaptor) throws IOException {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
     options.setProject(PROJECT_ID);
     options.setTempLocation("gs://somebucket/some/path");
     // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
@@ -227,8 +229,8 @@ public class DataflowPipelineRunnerTest {
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
 
     DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
-    DataflowPipeline p = buildDataflowPipeline(options);
-    DataflowPipelineJob job = p.run();
+    Pipeline p = buildDataflowPipeline(options);
+    DataflowPipelineJob job = (DataflowPipelineJob) p.run();
     assertEquals("newid", job.getJobId());
     assertValidJob(jobCaptor.getValue());
   }
@@ -246,7 +248,7 @@ public class DataflowPipelineRunnerTest {
     resultJob.setClientRequestId("different_request_id");
     when(mockRequest.execute()).thenReturn(resultJob);
 
-    DataflowPipeline p = buildDataflowPipeline(options);
+    Pipeline p = buildDataflowPipeline(options);
     try {
       p.run();
       fail("Expected DataflowJobAlreadyExistsException");
@@ -265,8 +267,8 @@ public class DataflowPipelineRunnerTest {
     DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
     options.setUpdate(true);
     options.setJobName("oldJobName");
-    DataflowPipeline p = buildDataflowPipeline(options);
-    DataflowPipelineJob job = p.run();
+    Pipeline p = buildDataflowPipeline(options);
+    DataflowPipelineJob job = (DataflowPipelineJob) p.run();
     assertEquals("newid", job.getJobId());
     assertValidJob(jobCaptor.getValue());
   }
@@ -279,7 +281,7 @@ public class DataflowPipelineRunnerTest {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setUpdate(true);
     options.setJobName("badJobName");
-    DataflowPipeline p = buildDataflowPipeline(options);
+    Pipeline p = buildDataflowPipeline(options);
     p.run();
   }
 
@@ -298,7 +300,7 @@ public class DataflowPipelineRunnerTest {
     resultJob.setClientRequestId("different_request_id");
     when(mockRequest.execute()).thenReturn(resultJob);
 
-    DataflowPipeline p = buildDataflowPipeline(options);
+    Pipeline p = buildDataflowPipeline(options);
 
     thrown.expect(DataflowJobAlreadyUpdatedException.class);
     thrown.expect(new TypeSafeMatcher<DataflowJobAlreadyUpdatedException>() {
@@ -348,9 +350,9 @@ public class DataflowPipelineRunnerTest {
     options.setGcsUtil(mockGcsUtil);
     options.setGcpCredential(new TestCredential());
 
-    DataflowPipeline p = buildDataflowPipeline(options);
+    Pipeline p = buildDataflowPipeline(options);
 
-    DataflowPipelineJob job = p.run();
+    DataflowPipelineJob job = (DataflowPipelineJob) p.run();
     assertEquals("newid", job.getJobId());
 
     Job workflowJob = jobCaptor.getValue();
@@ -750,7 +752,7 @@ public class DataflowPipelineRunnerTest {
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
 
     DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
-    DataflowPipeline p = DataflowPipeline.create(options);
+    Pipeline p = Pipeline.create(options);
 
     p.apply(Create.of(Arrays.asList(1, 2, 3)))
      .apply(new TestTransform());
@@ -758,7 +760,8 @@ public class DataflowPipelineRunnerTest {
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage(Matchers.containsString("no translator registered"));
     DataflowPipelineTranslator.fromOptions(options)
-        .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList());
+        .translate(
+            p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
     assertValidJob(jobCaptor.getValue());
   }
 
@@ -766,7 +769,7 @@ public class DataflowPipelineRunnerTest {
   public void testTransformTranslator() throws IOException {
     // Test that we can provide a custom translation
     DataflowPipelineOptions options = buildPipelineOptions();
-    DataflowPipeline p = DataflowPipeline.create(options);
+    Pipeline p = Pipeline.create(options);
     TestTransform transform = new TestTransform();
 
     p.apply(Create.of(Arrays.asList(1, 2, 3)).withCoder(BigEndianIntegerCoder.of()))
@@ -793,7 +796,7 @@ public class DataflowPipelineRunnerTest {
         });
 
     translator.translate(
-        p, p.getRunner(), Collections.<DataflowPackage>emptyList());
+        p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
     assertTrue(transform.translated);
   }
 
@@ -828,7 +831,7 @@ public class DataflowPipelineRunnerTest {
   @Test
   public void testApplyIsScopedToExactClass() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
-    DataflowPipeline p = DataflowPipeline.create(options);
+    Pipeline p = Pipeline.create(options);
 
     Create.TimestampedValues<String> transform =
         Create.timestamped(Arrays.asList(TimestampedValue.of("TestString", Instant.now())));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java
deleted file mode 100644
index 947b599..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.TestCredential;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowPipeline}. */
-@RunWith(JUnit4.class)
-public class DataflowPipelineTest {
-  @Test
-  public void testToString() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setJobName("TestJobName");
-    options.setProject("project-id");
-    options.setTempLocation("gs://test/temp/location");
-    options.setGcpCredential(new TestCredential());
-    options.setPathValidatorClass(NoopPathValidator.class);
-    assertEquals("DataflowPipeline#TestJobName",
-        DataflowPipeline.create(options).toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
index 1429e5a..7a3caa6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -123,8 +123,9 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     }
   }
 
-  private DataflowPipeline buildPipeline(DataflowPipelineOptions options) {
-    DataflowPipeline p = DataflowPipeline.create(options);
+  private Pipeline buildPipeline(DataflowPipelineOptions options) {
+    options.setRunner(DataflowPipelineRunner.class);
+    Pipeline p = Pipeline.create(options);
 
     p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
         .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
@@ -163,6 +164,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
 
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
     options.setGcpCredential(new TestCredential());
     options.setJobName("some-job-name");
     options.setProject("some-project");
@@ -178,11 +180,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setRunner(DataflowPipelineRunner.class);
 
-    DataflowPipeline p = buildPipeline(options);
+    Pipeline p = buildPipeline(options);
     p.traverseTopologically(new RecordingPipelineVisitor());
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
-            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     // Note that the contents of this materialized map may be changed by the act of reading an
@@ -212,11 +215,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setNetwork(testNetwork);
 
-    DataflowPipeline p = buildPipeline(options);
+    Pipeline p = buildPipeline(options);
     p.traverseTopologically(new RecordingPipelineVisitor());
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
-            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -228,11 +232,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   public void testNetworkConfigMissing() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
 
-    DataflowPipeline p = buildPipeline(options);
+    Pipeline p = buildPipeline(options);
     p.traverseTopologically(new RecordingPipelineVisitor());
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
-            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -246,11 +251,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setSubnetwork(testSubnetwork);
 
-    DataflowPipeline p = buildPipeline(options);
+    Pipeline p = buildPipeline(options);
     p.traverseTopologically(new RecordingPipelineVisitor());
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
-            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -262,11 +268,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   public void testSubnetworkConfigMissing() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
 
-    DataflowPipeline p = buildPipeline(options);
+    Pipeline p = buildPipeline(options);
     p.traverseTopologically(new RecordingPipelineVisitor());
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
-            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -277,11 +284,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   public void testScalingAlgorithmMissing() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
 
-    DataflowPipeline p = buildPipeline(options);
+    Pipeline p = buildPipeline(options);
     p.traverseTopologically(new RecordingPipelineVisitor());
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
-            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -312,11 +320,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setAutoscalingAlgorithm(noScaling);
 
-    DataflowPipeline p = buildPipeline(options);
+    Pipeline p = buildPipeline(options);
     p.traverseTopologically(new RecordingPipelineVisitor());
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
-            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -346,11 +355,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     options.setMaxNumWorkers(42);
     options.setAutoscalingAlgorithm(noScaling);
 
-    DataflowPipeline p = buildPipeline(options);
+    Pipeline p = buildPipeline(options);
     p.traverseTopologically(new RecordingPipelineVisitor());
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
-            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -379,11 +389,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setZone(testZone);
 
-    DataflowPipeline p = buildPipeline(options);
+    Pipeline p = buildPipeline(options);
     p.traverseTopologically(new RecordingPipelineVisitor());
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
-            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -398,11 +409,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setWorkerMachineType(testMachineType);
 
-    DataflowPipeline p = buildPipeline(options);
+    Pipeline p = buildPipeline(options);
     p.traverseTopologically(new RecordingPipelineVisitor());
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
-            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -418,11 +430,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setDiskSizeGb(diskSizeGb);
 
-    DataflowPipeline p = buildPipeline(options);
+    Pipeline p = buildPipeline(options);
     p.traverseTopologically(new RecordingPipelineVisitor());
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
-            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -442,13 +455,18 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Step predefinedStep = createPredefinedStep();
 
     // Create a pipeline that the predefined step will be embedded into
-    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    Pipeline pipeline = Pipeline.create(options);
     pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
         .apply(ParDo.of(new NoOpFn()))
         .apply(new EmbeddedTransform(predefinedStep.clone()))
         .apply(ParDo.of(new NoOpFn()));
-    Job job = translator.translate(
-        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
 
     List<Step> steps = job.getSteps();
     assertEquals(4, steps.size());
@@ -491,13 +509,18 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   private static Step createPredefinedStep() throws Exception {
     DataflowPipelineOptions options = buildPipelineOptions();
     DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    Pipeline pipeline = Pipeline.create(options);
     String stepName = "DoFn1";
     pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
         .apply(ParDo.of(new NoOpFn()).named(stepName))
         .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out"));
-    Job job = translator.translate(
-        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
 
     assertEquals(13, job.getSteps().size());
     Step step = job.getSteps().get(1);
@@ -617,7 +640,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
 
   @Test
   public void testMultiGraphPipelineSerialization() throws IOException {
-    DataflowPipeline p = DataflowPipeline.create(buildPipelineOptions());
+    Pipeline p = Pipeline.create(buildPipelineOptions());
 
     PCollection<Integer> input = p.begin()
         .apply(Create.of(1, 2, 3));
@@ -629,12 +652,13 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         PipelineOptionsFactory.as(DataflowPipelineOptions.class));
 
     // Check that translation doesn't fail.
-    t.translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList());
+    t.translate(
+        p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
   }
 
   @Test
   public void testPartiallyBoundFailure() throws IOException {
-    Pipeline p = DataflowPipeline.create(buildPipelineOptions());
+    Pipeline p = Pipeline.create(buildPipelineOptions());
 
     PCollection<Integer> input = p.begin()
         .apply(Create.of(1, 2, 3));
@@ -651,7 +675,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   @Test
   public void testGoodWildcards() throws Exception {
     DataflowPipelineOptions options = buildPipelineOptions();
-    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    Pipeline pipeline = Pipeline.create(options);
     DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
 
     applyRead(pipeline, "gs://bucket/foo");
@@ -670,7 +694,10 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
 
     // Check that translation doesn't fail.
-    t.translate(pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList());
+    t.translate(
+        pipeline,
+        (DataflowPipelineRunner) pipeline.getRunner(),
+        Collections.<DataflowPackage>emptyList());
   }
 
   private void applyRead(Pipeline pipeline, String path) {
@@ -684,7 +711,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   @Test
   public void testBadWildcardRecursive() throws Exception {
     DataflowPipelineOptions options = buildPipelineOptions();
-    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    Pipeline pipeline = Pipeline.create(options);
     DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
 
     pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
@@ -693,7 +720,10 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     thrown.expectCause(Matchers.allOf(
         instanceOf(IllegalArgumentException.class),
         ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage"))));
-    t.translate(pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList());
+    t.translate(
+        pipeline,
+        (DataflowPipelineRunner) pipeline.getRunner(),
+        Collections.<DataflowPackage>emptyList());
   }
 
   @Test
@@ -706,11 +736,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     options.setExperiments(ImmutableList.of("disable_ism_side_input"));
     DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
 
-    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    Pipeline pipeline = Pipeline.create(options);
     pipeline.apply(Create.of(1))
         .apply(View.<Integer>asSingleton());
-    Job job = translator.translate(
-        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
 
     List<Step> steps = job.getSteps();
     assertEquals(2, steps.size());
@@ -733,11 +768,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     options.setExperiments(ImmutableList.of("disable_ism_side_input"));
     DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
 
-    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    Pipeline pipeline = Pipeline.create(options);
     pipeline.apply(Create.of(1, 2, 3))
         .apply(View.<Integer>asIterable());
-    Job job = translator.translate(
-        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
 
     List<Step> steps = job.getSteps();
     assertEquals(2, steps.size());
@@ -758,11 +798,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     DataflowPipelineOptions options = buildPipelineOptions();
     DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
 
-    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    Pipeline pipeline = Pipeline.create(options);
     pipeline.apply(Create.of(1))
         .apply(View.<Integer>asSingleton());
-    Job job = translator.translate(
-        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
 
     List<Step> steps = job.getSteps();
     assertEquals(5, steps.size());
@@ -786,11 +831,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     DataflowPipelineOptions options = buildPipelineOptions();
     DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
 
-    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    Pipeline pipeline = Pipeline.create(options);
     pipeline.apply(Create.of(1, 2, 3))
         .apply(View.<Integer>asIterable());
-    Job job = translator.translate(
-        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
 
     List<Step> steps = job.getSteps();
     assertEquals(3, steps.size());
@@ -810,7 +860,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   public void testStepDisplayData() throws Exception {
     DataflowPipelineOptions options = buildPipelineOptions();
     DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-    DataflowPipeline pipeline = DataflowPipeline.create(options);
+    Pipeline pipeline = Pipeline.create(options);
 
     DoFn<Integer, Integer> fn1 = new DoFn<Integer, Integer>() {
       @Override
@@ -845,8 +895,13 @@ public class DataflowPipelineTranslatorTest implements Serializable {
       .apply(ParDo.of(fn1))
       .apply(ParDo.of(fn2));
 
-    Job job = translator.translate(
-        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
 
     List<Step> steps = job.getSteps();
     assertEquals(3, steps.size());


[2/2] incubator-beam git commit: This closes #186

Posted by lc...@apache.org.
This closes #186


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bcefff6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bcefff6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bcefff6a

Branch: refs/heads/master
Commit: bcefff6a3bedb0f114d614c27da12a4cc2b34f48
Parents: 1eec686 b1eee4b
Author: Luke Cwik <lc...@google.com>
Authored: Fri Apr 15 10:08:18 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 15 10:08:18 2016 -0700

----------------------------------------------------------------------
 .../sdk/options/DataflowPipelineOptions.java    |  14 +-
 .../beam/sdk/runners/DataflowPipeline.java      |  60 -------
 .../sdk/runners/DataflowPipelineRegistrar.java  |   4 +-
 .../sdk/runners/DataflowPipelineRunnerTest.java |  37 +++--
 .../beam/sdk/runners/DataflowPipelineTest.java  |  45 ------
 .../runners/DataflowPipelineTranslatorTest.java | 159 +++++++++++++------
 6 files changed, 136 insertions(+), 183 deletions(-)
----------------------------------------------------------------------