You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/15 19:00:55 UTC

[1/2] incubator-beam git commit: Remove the DirectPipeline class

Repository: incubator-beam
Updated Branches:
  refs/heads/master 8dc9032ce -> 96d324e39


Remove the DirectPipeline class

Users who wish to use the DirectPipelineRunner should do so by creating
a new Pipeline with the runner set in the PipelineOptions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5a44d126
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5a44d126
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5a44d126

Branch: refs/heads/master
Commit: 5a44d126982d30feec0ba7dea0e4d934494af235
Parents: 8dc9032
Author: Thomas Groh <tg...@google.com>
Authored: Thu Apr 14 16:12:23 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 15 09:39:33 2016 -0700

----------------------------------------------------------------------
 .../BlockingDataflowPipelineRunnerTest.java     | 20 +++----
 .../translation/TransformTranslatorTest.java    | 51 ++++++------------
 .../beam/sdk/options/DirectPipelineOptions.java |  9 ++--
 .../apache/beam/sdk/runners/DirectPipeline.java | 56 --------------------
 .../beam/sdk/runners/DirectPipelineRunner.java  | 15 ------
 .../beam/sdk/io/AvroIOGeneratedClassTest.java   | 13 +++--
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  9 ++--
 .../sdk/runners/DirectPipelineRunnerTest.java   | 29 ++++++----
 .../beam/sdk/runners/DirectPipelineTest.java    | 35 ------------
 .../beam/sdk/runners/TransformTreeTest.java     |  7 +--
 .../transforms/ApproximateQuantilesTest.java    |  5 +-
 11 files changed, 66 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
index 67ecdbe..ae504ed 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.TestDataflowPipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.util.MonitoringUtil;
 import org.apache.beam.sdk.util.NoopPathValidator;
 import org.apache.beam.sdk.util.TestCredential;
@@ -209,7 +210,7 @@ public class BlockingDataflowPipelineRunnerTest {
   @Test
   public void testJobDoneComplete() throws Exception {
     createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE))
-        .run(DirectPipeline.createForTest());
+        .run(TestPipeline.create());
     expectedLogs.verifyInfo("Job finished with status DONE");
   }
 
@@ -223,7 +224,7 @@ public class BlockingDataflowPipelineRunnerTest {
     expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
         JobIdMatcher.expectJobId("testFailedJob-jobId")));
     createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED))
-        .run(DirectPipeline.createForTest());
+        .run(TestPipeline.create());
   }
 
   /**
@@ -236,8 +237,8 @@ public class BlockingDataflowPipelineRunnerTest {
     expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
         JobIdMatcher.expectJobId("testCancelledJob-jobId")));
     createMockRunner(
-        createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED))
-        .run(DirectPipeline.createForTest());
+            createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED))
+        .run(TestPipeline.create());
   }
 
   /**
@@ -256,7 +257,7 @@ public class BlockingDataflowPipelineRunnerTest {
     DataflowPipelineJob replacedByJob =
         createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE);
     when(job.getReplacedByJob()).thenReturn(replacedByJob);
-    createMockRunner(job).run(DirectPipeline.createForTest());
+    createMockRunner(job).run(TestPipeline.create());
   }
 
   /**
@@ -269,8 +270,8 @@ public class BlockingDataflowPipelineRunnerTest {
   public void testUnknownJobThrowsException() throws Exception {
     expectedThrown.expect(IllegalStateException.class);
     createMockRunner(
-        createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN))
-        .run(DirectPipeline.createForTest());
+            createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN))
+        .run(TestPipeline.create());
   }
 
   /**
@@ -283,9 +284,8 @@ public class BlockingDataflowPipelineRunnerTest {
     expectedThrown.expect(DataflowServiceException.class);
     expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
         JobIdMatcher.expectJobId("testNullJob-jobId")));
-    createMockRunner(
-        createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
-        .run(DirectPipeline.createForTest());
+    createMockRunner(createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
+        .run(TestPipeline.create());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index de4a5d2..4ef26d3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -18,29 +18,28 @@
 
 package org.apache.beam.runners.spark.translation;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.values.PCollection;
 
-import com.google.api.client.repackaged.com.google.common.base.Joiner;
 import com.google.common.base.Charsets;
 
-import org.apache.commons.io.FileUtils;
-import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TestName;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -50,22 +49,7 @@ import java.util.List;
  * executed in Spark.
  */
 public class TransformTranslatorTest {
-
-  @Rule
-  public TestName name = new TestName();
-
-  private DirectPipelineRunner directRunner;
-  private SparkPipelineRunner sparkRunner;
-  private String testDataDirName;
-
-  @Before public void init() throws IOException {
-    sparkRunner = SparkPipelineRunner.create();
-    directRunner = DirectPipelineRunner.createForTest();
-    testDataDirName = Joiner.on(File.separator).join("target", "test-data", name.getMethodName())
-        + File.separator;
-    FileUtils.deleteDirectory(new File(testDataDirName));
-    new File(testDataDirName).mkdirs();
-  }
+  @Rule public TemporaryFolder tmp = new TemporaryFolder();
 
   /**
    * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
@@ -74,8 +58,8 @@ public class TransformTranslatorTest {
    */
   @Test
   public void testTextIOReadAndWriteTransforms() throws IOException {
-    String directOut = runPipeline("direct", directRunner);
-    String sparkOut = runPipeline("spark", sparkRunner);
+    String directOut = runPipeline(DirectPipelineRunner.class);
+    String sparkOut = runPipeline(SparkPipelineRunner.class);
 
     List<String> directOutput =
         Files.readAllLines(Paths.get(directOut + "-00000-of-00001"), Charsets.UTF_8);
@@ -84,18 +68,17 @@ public class TransformTranslatorTest {
         Files.readAllLines(Paths.get(sparkOut + "-00000-of-00001"), Charsets.UTF_8);
 
     // sort output to get a stable result (PCollections are not ordered)
-    Collections.sort(directOutput);
-    Collections.sort(sparkOutput);
-
-    Assert.assertArrayEquals(directOutput.toArray(), sparkOutput.toArray());
+    assertThat(sparkOutput, containsInAnyOrder(directOutput.toArray()));
   }
 
-  private String runPipeline(String name, PipelineRunner<?> runner) {
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
-    String outFile = Joiner.on(File.separator).join(testDataDirName, "test_text_out_" + name);
+  private String runPipeline(Class<? extends PipelineRunner<?>> runner) throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(runner);
+    Pipeline p = Pipeline.create(options);
+    File outFile = tmp.newFile();
     PCollection<String> lines =  p.apply(TextIO.Read.from("src/test/resources/test_text.txt"));
-    lines.apply(TextIO.Write.to(outFile));
-    runner.run(p);
-    return outFile;
+    lines.apply(TextIO.Write.to(outFile.getAbsolutePath()));
+    p.run();
+    return outFile.getAbsolutePath();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
index 718948e..4cdc0ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
@@ -18,18 +18,17 @@
 package org.apache.beam.sdk.options;
 
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.DirectPipeline;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.values.PCollection;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 
 /**
- * Options that can be used to configure the {@link DirectPipeline}.
+ * Options that can be used to configure the {@link DirectPipelineRunner}.
  */
-public interface DirectPipelineOptions extends
-    ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions,
-    PipelineOptions, StreamingOptions {
+public interface DirectPipelineOptions
+    extends ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions, PipelineOptions,
+        StreamingOptions {
 
   /**
    * The random seed to use for pseudorandom behaviors in the {@link DirectPipelineRunner}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java
deleted file mode 100644
index 45f7647..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.DirectPipelineOptions;
-
-/**
- * A {@link DirectPipeline} is a {@link Pipeline} that returns
- * {@link DirectPipelineRunner.EvaluationResults} when it is
- * {@link org.apache.beam.sdk.Pipeline#run()}.
- */
-public class DirectPipeline extends Pipeline {
-
-  /**
-   * Creates and returns a new DirectPipeline instance for tests.
-   */
-  public static DirectPipeline createForTest() {
-    DirectPipelineRunner runner = DirectPipelineRunner.createForTest();
-    return new DirectPipeline(runner, runner.getPipelineOptions());
-  }
-
-  private DirectPipeline(DirectPipelineRunner runner, DirectPipelineOptions options) {
-    super(runner, options);
-  }
-
-  @Override
-  public DirectPipelineRunner.EvaluationResults run() {
-    return (DirectPipelineRunner.EvaluationResults) super.run();
-  }
-
-  @Override
-  public DirectPipelineRunner getRunner() {
-    return (DirectPipelineRunner) super.getRunner();
-  }
-
-  @Override
-  public String toString() {
-    return "DirectPipeline#" + hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
index 198d04e..3cb9703 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
@@ -34,8 +34,6 @@ import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.DirectPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -59,7 +57,6 @@ import org.apache.beam.sdk.util.MapAggregatorValues;
 import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
 import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.TestCredential;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.common.Counter;
@@ -195,18 +192,6 @@ public class DirectPipelineRunner
   }
 
   /**
-   * Constructs a runner with default properties for testing.
-   *
-   * @return The newly created runner.
-   */
-  public static DirectPipelineRunner createForTest() {
-    DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class);
-    options.setStableUniqueNames(CheckEnabled.ERROR);
-    options.setGcpCredential(new TestCredential());
-    return new DirectPipelineRunner(options);
-  }
-
-  /**
    * Enable runtime testing to verify that all functions and {@link Coder}
    * instances can be serialized.
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
index f32a420..f757b4e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.DirectPipeline;
-import org.apache.beam.sdk.runners.DirectPipelineRunner.EvaluationResults;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -137,12 +137,11 @@ public class AvroIOGeneratedClassTest {
       throws Exception {
     generateAvroFile(generateAvroObjects());
 
-    DirectPipeline p = DirectPipeline.createForTest();
+    TestPipeline p = TestPipeline.create();
     PCollection<T> output = p.apply(read);
-    EvaluationResults results = p.run();
+    PAssert.that(output).containsInAnyOrder(expectedOutput);
+    p.run();
     assertEquals(expectedName, output.getName());
-    assertThat(results.getPCollection(output),
-               containsInAnyOrder(expectedOutput));
   }
 
   @Test
@@ -257,7 +256,7 @@ public class AvroIOGeneratedClassTest {
       throws Exception {
     AvroGeneratedUser[] users = generateAvroObjects();
 
-    DirectPipeline p = DirectPipeline.createForTest();
+    TestPipeline p = TestPipeline.create();
     @SuppressWarnings("unchecked")
     PCollection<T> input = p.apply(Create.of(Arrays.asList((T[]) users))
                             .withCoder((Coder<T>) AvroCoder.of(AvroGeneratedUser.class)));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 08f146f..57312c0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.AvroIO.Write.Bound;
-import org.apache.beam.sdk.runners.DirectPipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -116,7 +115,7 @@ public class AvroIOTest {
 
   @Test
   public void testAvroIOWriteAndReadASingleFile() throws Throwable {
-    DirectPipeline p = DirectPipeline.createForTest();
+    TestPipeline p = TestPipeline.create();
     List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
         new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
@@ -127,7 +126,7 @@ public class AvroIOTest {
           .withSchema(GenericClass.class));
     p.run();
 
-    p = DirectPipeline.createForTest();
+    p = TestPipeline.create();
     PCollection<GenericClass> input = p
         .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClass.class));
 
@@ -179,7 +178,7 @@ public class AvroIOTest {
    */
   @Test
   public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable {
-    DirectPipeline p = DirectPipeline.createForTest();
+    TestPipeline p = TestPipeline.create();
     List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
         new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
@@ -192,7 +191,7 @@ public class AvroIOTest {
 
     List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null),
         new GenericClassV2(5, "bar", null));
-    p = DirectPipeline.createForTest();
+    p = TestPipeline.create();
     PCollection<GenericClassV2> input = p
         .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClassV2.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
index 2f5272b..ae3b4e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
@@ -98,16 +98,17 @@ public class DirectPipelineRunnerTest implements Serializable {
 
   @Test
   public void testCoderException() {
-    DirectPipeline pipeline = DirectPipeline.createForTest();
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(DirectPipelineRunner.class);
+    Pipeline p = Pipeline.create(options);
 
-    pipeline
-        .apply("CreateTestData", Create.of(42))
+    p.apply("CreateTestData", Create.of(42))
         .apply("CrashDuringCoding", ParDo.of(new HelloDoFn()))
         .setCoder(new CrashingCoder<String>());
 
-      expectedException.expect(RuntimeException.class);
-      expectedException.expectCause(isA(CoderException.class));
-      pipeline.run();
+    expectedException.expect(RuntimeException.class);
+    expectedException.expectCause(isA(CoderException.class));
+    p.run();
   }
 
   @Test
@@ -119,7 +120,9 @@ public class DirectPipelineRunnerTest implements Serializable {
   @Test
   public void testTextIOWriteWithDefaultShardingStrategy() throws Exception {
     String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "output");
-    Pipeline p = DirectPipeline.createForTest();
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(DirectPipelineRunner.class);
+    Pipeline p = Pipeline.create(options);
     String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" };
     p.apply(Create.of(expectedElements))
      .apply(TextIO.Write.to(prefix).withSuffix("txt"));
@@ -139,7 +142,9 @@ public class DirectPipelineRunnerTest implements Serializable {
   public void testTextIOWriteWithLimitedNumberOfShards() throws Exception {
     final int numShards = 3;
     String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "shardedOutput");
-    Pipeline p = DirectPipeline.createForTest();
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(DirectPipelineRunner.class);
+    Pipeline p = Pipeline.create(options);
     String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" };
     p.apply(Create.of(expectedElements))
      .apply(TextIO.Write.to(prefix).withNumShards(numShards).withSuffix("txt"));
@@ -164,7 +169,9 @@ public class DirectPipelineRunnerTest implements Serializable {
   @Test
   public void testAvroIOWriteWithDefaultShardingStrategy() throws Exception {
     String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "output");
-    Pipeline p = DirectPipeline.createForTest();
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(DirectPipelineRunner.class);
+    Pipeline p = Pipeline.create(options);
     String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" };
     p.apply(Create.of(expectedElements))
      .apply(AvroIO.Write.withSchema(String.class).to(prefix).withSuffix(".avro"));
@@ -186,7 +193,9 @@ public class DirectPipelineRunnerTest implements Serializable {
   public void testAvroIOWriteWithLimitedNumberOfShards() throws Exception {
     final int numShards = 3;
     String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "shardedOutput");
-    Pipeline p = DirectPipeline.createForTest();
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(DirectPipelineRunner.class);
+    Pipeline p = Pipeline.create(options);
     String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" };
     p.apply(Create.of(expectedElements))
      .apply(AvroIO.Write.withSchema(String.class).to(prefix)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java
deleted file mode 100644
index 9829ebd..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DirectPipeline}. */
-@RunWith(JUnit4.class)
-public class DirectPipelineTest {
-  @Test
-  public void testToString() {
-    DirectPipeline pipeline = DirectPipeline.createForTest();
-    assertEquals("DirectPipeline#" + pipeline.hashCode(),
-        pipeline.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index d926ac5..7690d2b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -116,7 +117,7 @@ public class TransformTreeTest {
     File inputFile = tmpFolder.newFile();
     File outputFile = tmpFolder.newFile();
 
-    Pipeline p = DirectPipeline.createForTest();
+    Pipeline p = TestPipeline.create();
 
     p.apply(TextIO.Read.named("ReadMyFile").from(inputFile.getPath()))
         .apply(Sample.<String>any(10))
@@ -173,7 +174,7 @@ public class TransformTreeTest {
 
   @Test(expected = IllegalStateException.class)
   public void testOutputChecking() throws Exception {
-    Pipeline p = DirectPipeline.createForTest();
+    Pipeline p = TestPipeline.create();
 
     p.apply(new InvalidCompositeTransform());
 
@@ -183,7 +184,7 @@ public class TransformTreeTest {
 
   @Test
   public void testMultiGraphSetup() {
-    Pipeline p = DirectPipeline.createForTest();
+    Pipeline p = TestPipeline.create();
 
     PCollection<Integer> input = p.begin()
         .apply(Create.of(1, 2, 3));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
index 6d62e08..6bc5c1e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.runners.DirectPipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.ApproximateQuantiles.ApproximateQuantilesCombineFn;
@@ -69,7 +68,7 @@ public class ApproximateQuantilesTest {
 
   @Test
   public void testQuantilesGlobally() {
-    DirectPipeline p = DirectPipeline.createForTest();
+    TestPipeline p = TestPipeline.create();
 
     PCollection<Integer> input = intRangeCollection(p, 101);
     PCollection<List<Integer>> quantiles =
@@ -82,7 +81,7 @@ public class ApproximateQuantilesTest {
 
   @Test
   public void testQuantilesGobally_comparable() {
-    DirectPipeline p = DirectPipeline.createForTest();
+    TestPipeline p = TestPipeline.create();
 
     PCollection<Integer> input = intRangeCollection(p, 101);
     PCollection<List<Integer>> quantiles =


[2/2] incubator-beam git commit: This closes #182

Posted by lc...@apache.org.
This closes #182


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96d324e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96d324e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96d324e3

Branch: refs/heads/master
Commit: 96d324e39a7ecc6ec60df309ec9f9b58d289186d
Parents: 8dc9032 5a44d12
Author: Luke Cwik <lc...@google.com>
Authored: Fri Apr 15 09:40:07 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 15 09:40:07 2016 -0700

----------------------------------------------------------------------
 .../BlockingDataflowPipelineRunnerTest.java     | 20 +++----
 .../translation/TransformTranslatorTest.java    | 51 ++++++------------
 .../beam/sdk/options/DirectPipelineOptions.java |  9 ++--
 .../apache/beam/sdk/runners/DirectPipeline.java | 56 --------------------
 .../beam/sdk/runners/DirectPipelineRunner.java  | 15 ------
 .../beam/sdk/io/AvroIOGeneratedClassTest.java   | 13 +++--
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  9 ++--
 .../sdk/runners/DirectPipelineRunnerTest.java   | 29 ++++++----
 .../beam/sdk/runners/DirectPipelineTest.java    | 35 ------------
 .../beam/sdk/runners/TransformTreeTest.java     |  7 +--
 .../transforms/ApproximateQuantilesTest.java    |  5 +-
 11 files changed, 66 insertions(+), 183 deletions(-)
----------------------------------------------------------------------