You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/15 19:00:55 UTC
[1/2] incubator-beam git commit: Remove the DirectPipeline class
Repository: incubator-beam
Updated Branches:
refs/heads/master 8dc9032ce -> 96d324e39
Remove the DirectPipeline class
Users who wish to use the DirectPipelineRunner should do so by creating
a new Pipeline with the runner set in the PipelineOptions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5a44d126
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5a44d126
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5a44d126
Branch: refs/heads/master
Commit: 5a44d126982d30feec0ba7dea0e4d934494af235
Parents: 8dc9032
Author: Thomas Groh <tg...@google.com>
Authored: Thu Apr 14 16:12:23 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 15 09:39:33 2016 -0700
----------------------------------------------------------------------
.../BlockingDataflowPipelineRunnerTest.java | 20 +++----
.../translation/TransformTranslatorTest.java | 51 ++++++------------
.../beam/sdk/options/DirectPipelineOptions.java | 9 ++--
.../apache/beam/sdk/runners/DirectPipeline.java | 56 --------------------
.../beam/sdk/runners/DirectPipelineRunner.java | 15 ------
.../beam/sdk/io/AvroIOGeneratedClassTest.java | 13 +++--
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 9 ++--
.../sdk/runners/DirectPipelineRunnerTest.java | 29 ++++++----
.../beam/sdk/runners/DirectPipelineTest.java | 35 ------------
.../beam/sdk/runners/TransformTreeTest.java | 7 +--
.../transforms/ApproximateQuantilesTest.java | 5 +-
11 files changed, 66 insertions(+), 183 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
index 67ecdbe..ae504ed 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.TestDataflowPipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.MonitoringUtil;
import org.apache.beam.sdk.util.NoopPathValidator;
import org.apache.beam.sdk.util.TestCredential;
@@ -209,7 +210,7 @@ public class BlockingDataflowPipelineRunnerTest {
@Test
public void testJobDoneComplete() throws Exception {
createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE))
- .run(DirectPipeline.createForTest());
+ .run(TestPipeline.create());
expectedLogs.verifyInfo("Job finished with status DONE");
}
@@ -223,7 +224,7 @@ public class BlockingDataflowPipelineRunnerTest {
expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
JobIdMatcher.expectJobId("testFailedJob-jobId")));
createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED))
- .run(DirectPipeline.createForTest());
+ .run(TestPipeline.create());
}
/**
@@ -236,8 +237,8 @@ public class BlockingDataflowPipelineRunnerTest {
expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
JobIdMatcher.expectJobId("testCancelledJob-jobId")));
createMockRunner(
- createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED))
- .run(DirectPipeline.createForTest());
+ createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED))
+ .run(TestPipeline.create());
}
/**
@@ -256,7 +257,7 @@ public class BlockingDataflowPipelineRunnerTest {
DataflowPipelineJob replacedByJob =
createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE);
when(job.getReplacedByJob()).thenReturn(replacedByJob);
- createMockRunner(job).run(DirectPipeline.createForTest());
+ createMockRunner(job).run(TestPipeline.create());
}
/**
@@ -269,8 +270,8 @@ public class BlockingDataflowPipelineRunnerTest {
public void testUnknownJobThrowsException() throws Exception {
expectedThrown.expect(IllegalStateException.class);
createMockRunner(
- createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN))
- .run(DirectPipeline.createForTest());
+ createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN))
+ .run(TestPipeline.create());
}
/**
@@ -283,9 +284,8 @@ public class BlockingDataflowPipelineRunnerTest {
expectedThrown.expect(DataflowServiceException.class);
expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
JobIdMatcher.expectJobId("testNullJob-jobId")));
- createMockRunner(
- createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
- .run(DirectPipeline.createForTest());
+ createMockRunner(createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
+ .run(TestPipeline.create());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index de4a5d2..4ef26d3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -18,29 +18,28 @@
package org.apache.beam.runners.spark.translation;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
import org.apache.beam.runners.spark.SparkPipelineRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.values.PCollection;
-import com.google.api.client.repackaged.com.google.common.base.Joiner;
import com.google.common.base.Charsets;
-import org.apache.commons.io.FileUtils;
-import org.junit.Assert;
-import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TestName;
+import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.Collections;
import java.util.List;
/**
@@ -50,22 +49,7 @@ import java.util.List;
* executed in Spark.
*/
public class TransformTranslatorTest {
-
- @Rule
- public TestName name = new TestName();
-
- private DirectPipelineRunner directRunner;
- private SparkPipelineRunner sparkRunner;
- private String testDataDirName;
-
- @Before public void init() throws IOException {
- sparkRunner = SparkPipelineRunner.create();
- directRunner = DirectPipelineRunner.createForTest();
- testDataDirName = Joiner.on(File.separator).join("target", "test-data", name.getMethodName())
- + File.separator;
- FileUtils.deleteDirectory(new File(testDataDirName));
- new File(testDataDirName).mkdirs();
- }
+ @Rule public TemporaryFolder tmp = new TemporaryFolder();
/**
* Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
@@ -74,8 +58,8 @@ public class TransformTranslatorTest {
*/
@Test
public void testTextIOReadAndWriteTransforms() throws IOException {
- String directOut = runPipeline("direct", directRunner);
- String sparkOut = runPipeline("spark", sparkRunner);
+ String directOut = runPipeline(DirectPipelineRunner.class);
+ String sparkOut = runPipeline(SparkPipelineRunner.class);
List<String> directOutput =
Files.readAllLines(Paths.get(directOut + "-00000-of-00001"), Charsets.UTF_8);
@@ -84,18 +68,17 @@ public class TransformTranslatorTest {
Files.readAllLines(Paths.get(sparkOut + "-00000-of-00001"), Charsets.UTF_8);
// sort output to get a stable result (PCollections are not ordered)
- Collections.sort(directOutput);
- Collections.sort(sparkOutput);
-
- Assert.assertArrayEquals(directOutput.toArray(), sparkOutput.toArray());
+ assertThat(sparkOutput, containsInAnyOrder(directOutput.toArray()));
}
- private String runPipeline(String name, PipelineRunner<?> runner) {
- Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
- String outFile = Joiner.on(File.separator).join(testDataDirName, "test_text_out_" + name);
+ private String runPipeline(Class<? extends PipelineRunner<?>> runner) throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(runner);
+ Pipeline p = Pipeline.create(options);
+ File outFile = tmp.newFile();
PCollection<String> lines = p.apply(TextIO.Read.from("src/test/resources/test_text.txt"));
- lines.apply(TextIO.Write.to(outFile));
- runner.run(p);
- return outFile;
+ lines.apply(TextIO.Write.to(outFile.getAbsolutePath()));
+ p.run();
+ return outFile.getAbsolutePath();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
index 718948e..4cdc0ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
@@ -18,18 +18,17 @@
package org.apache.beam.sdk.options;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.DirectPipeline;
import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.values.PCollection;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
- * Options that can be used to configure the {@link DirectPipeline}.
+ * Options that can be used to configure the {@link DirectPipelineRunner}.
*/
-public interface DirectPipelineOptions extends
- ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions,
- PipelineOptions, StreamingOptions {
+public interface DirectPipelineOptions
+ extends ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions, PipelineOptions,
+ StreamingOptions {
/**
* The random seed to use for pseudorandom behaviors in the {@link DirectPipelineRunner}.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java
deleted file mode 100644
index 45f7647..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipeline.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.DirectPipelineOptions;
-
-/**
- * A {@link DirectPipeline} is a {@link Pipeline} that returns
- * {@link DirectPipelineRunner.EvaluationResults} when it is
- * {@link org.apache.beam.sdk.Pipeline#run()}.
- */
-public class DirectPipeline extends Pipeline {
-
- /**
- * Creates and returns a new DirectPipeline instance for tests.
- */
- public static DirectPipeline createForTest() {
- DirectPipelineRunner runner = DirectPipelineRunner.createForTest();
- return new DirectPipeline(runner, runner.getPipelineOptions());
- }
-
- private DirectPipeline(DirectPipelineRunner runner, DirectPipelineOptions options) {
- super(runner, options);
- }
-
- @Override
- public DirectPipelineRunner.EvaluationResults run() {
- return (DirectPipelineRunner.EvaluationResults) super.run();
- }
-
- @Override
- public DirectPipelineRunner getRunner() {
- return (DirectPipelineRunner) super.getRunner();
- }
-
- @Override
- public String toString() {
- return "DirectPipeline#" + hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
index 198d04e..3cb9703 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
@@ -34,8 +34,6 @@ import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.DirectPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -59,7 +57,6 @@ import org.apache.beam.sdk.util.MapAggregatorValues;
import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.common.Counter;
@@ -195,18 +192,6 @@ public class DirectPipelineRunner
}
/**
- * Constructs a runner with default properties for testing.
- *
- * @return The newly created runner.
- */
- public static DirectPipelineRunner createForTest() {
- DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class);
- options.setStableUniqueNames(CheckEnabled.ERROR);
- options.setGcpCredential(new TestCredential());
- return new DirectPipelineRunner(options);
- }
-
- /**
* Enable runtime testing to verify that all functions and {@link Coder}
* instances can be serialized.
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
index f32a420..f757b4e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.DirectPipeline;
-import org.apache.beam.sdk.runners.DirectPipelineRunner.EvaluationResults;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
@@ -137,12 +137,11 @@ public class AvroIOGeneratedClassTest {
throws Exception {
generateAvroFile(generateAvroObjects());
- DirectPipeline p = DirectPipeline.createForTest();
+ TestPipeline p = TestPipeline.create();
PCollection<T> output = p.apply(read);
- EvaluationResults results = p.run();
+ PAssert.that(output).containsInAnyOrder(expectedOutput);
+ p.run();
assertEquals(expectedName, output.getName());
- assertThat(results.getPCollection(output),
- containsInAnyOrder(expectedOutput));
}
@Test
@@ -257,7 +256,7 @@ public class AvroIOGeneratedClassTest {
throws Exception {
AvroGeneratedUser[] users = generateAvroObjects();
- DirectPipeline p = DirectPipeline.createForTest();
+ TestPipeline p = TestPipeline.create();
@SuppressWarnings("unchecked")
PCollection<T> input = p.apply(Create.of(Arrays.asList((T[]) users))
.withCoder((Coder<T>) AvroCoder.of(AvroGeneratedUser.class)));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 08f146f..57312c0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.AvroIO.Write.Bound;
-import org.apache.beam.sdk.runners.DirectPipeline;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -116,7 +115,7 @@ public class AvroIOTest {
@Test
public void testAvroIOWriteAndReadASingleFile() throws Throwable {
- DirectPipeline p = DirectPipeline.createForTest();
+ TestPipeline p = TestPipeline.create();
List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
@@ -127,7 +126,7 @@ public class AvroIOTest {
.withSchema(GenericClass.class));
p.run();
- p = DirectPipeline.createForTest();
+ p = TestPipeline.create();
PCollection<GenericClass> input = p
.apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClass.class));
@@ -179,7 +178,7 @@ public class AvroIOTest {
*/
@Test
public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable {
- DirectPipeline p = DirectPipeline.createForTest();
+ TestPipeline p = TestPipeline.create();
List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
@@ -192,7 +191,7 @@ public class AvroIOTest {
List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null),
new GenericClassV2(5, "bar", null));
- p = DirectPipeline.createForTest();
+ p = TestPipeline.create();
PCollection<GenericClassV2> input = p
.apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClassV2.class));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
index 2f5272b..ae3b4e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
@@ -98,16 +98,17 @@ public class DirectPipelineRunnerTest implements Serializable {
@Test
public void testCoderException() {
- DirectPipeline pipeline = DirectPipeline.createForTest();
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(DirectPipelineRunner.class);
+ Pipeline p = Pipeline.create(options);
- pipeline
- .apply("CreateTestData", Create.of(42))
+ p.apply("CreateTestData", Create.of(42))
.apply("CrashDuringCoding", ParDo.of(new HelloDoFn()))
.setCoder(new CrashingCoder<String>());
- expectedException.expect(RuntimeException.class);
- expectedException.expectCause(isA(CoderException.class));
- pipeline.run();
+ expectedException.expect(RuntimeException.class);
+ expectedException.expectCause(isA(CoderException.class));
+ p.run();
}
@Test
@@ -119,7 +120,9 @@ public class DirectPipelineRunnerTest implements Serializable {
@Test
public void testTextIOWriteWithDefaultShardingStrategy() throws Exception {
String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "output");
- Pipeline p = DirectPipeline.createForTest();
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(DirectPipelineRunner.class);
+ Pipeline p = Pipeline.create(options);
String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" };
p.apply(Create.of(expectedElements))
.apply(TextIO.Write.to(prefix).withSuffix("txt"));
@@ -139,7 +142,9 @@ public class DirectPipelineRunnerTest implements Serializable {
public void testTextIOWriteWithLimitedNumberOfShards() throws Exception {
final int numShards = 3;
String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "shardedOutput");
- Pipeline p = DirectPipeline.createForTest();
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(DirectPipelineRunner.class);
+ Pipeline p = Pipeline.create(options);
String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" };
p.apply(Create.of(expectedElements))
.apply(TextIO.Write.to(prefix).withNumShards(numShards).withSuffix("txt"));
@@ -164,7 +169,9 @@ public class DirectPipelineRunnerTest implements Serializable {
@Test
public void testAvroIOWriteWithDefaultShardingStrategy() throws Exception {
String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "output");
- Pipeline p = DirectPipeline.createForTest();
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(DirectPipelineRunner.class);
+ Pipeline p = Pipeline.create(options);
String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" };
p.apply(Create.of(expectedElements))
.apply(AvroIO.Write.withSchema(String.class).to(prefix).withSuffix(".avro"));
@@ -186,7 +193,9 @@ public class DirectPipelineRunnerTest implements Serializable {
public void testAvroIOWriteWithLimitedNumberOfShards() throws Exception {
final int numShards = 3;
String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "shardedOutput");
- Pipeline p = DirectPipeline.createForTest();
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(DirectPipelineRunner.class);
+ Pipeline p = Pipeline.create(options);
String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" };
p.apply(Create.of(expectedElements))
.apply(AvroIO.Write.withSchema(String.class).to(prefix)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java
deleted file mode 100644
index 9829ebd..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DirectPipeline}. */
-@RunWith(JUnit4.class)
-public class DirectPipelineTest {
- @Test
- public void testToString() {
- DirectPipeline pipeline = DirectPipeline.createForTest();
- assertEquals("DirectPipeline#" + pipeline.hashCode(),
- pipeline.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index d926ac5..7690d2b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
@@ -116,7 +117,7 @@ public class TransformTreeTest {
File inputFile = tmpFolder.newFile();
File outputFile = tmpFolder.newFile();
- Pipeline p = DirectPipeline.createForTest();
+ Pipeline p = TestPipeline.create();
p.apply(TextIO.Read.named("ReadMyFile").from(inputFile.getPath()))
.apply(Sample.<String>any(10))
@@ -173,7 +174,7 @@ public class TransformTreeTest {
@Test(expected = IllegalStateException.class)
public void testOutputChecking() throws Exception {
- Pipeline p = DirectPipeline.createForTest();
+ Pipeline p = TestPipeline.create();
p.apply(new InvalidCompositeTransform());
@@ -183,7 +184,7 @@ public class TransformTreeTest {
@Test
public void testMultiGraphSetup() {
- Pipeline p = DirectPipeline.createForTest();
+ Pipeline p = TestPipeline.create();
PCollection<Integer> input = p.begin()
.apply(Create.of(1, 2, 3));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a44d126/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
index 6d62e08..6bc5c1e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.runners.DirectPipeline;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.ApproximateQuantiles.ApproximateQuantilesCombineFn;
@@ -69,7 +68,7 @@ public class ApproximateQuantilesTest {
@Test
public void testQuantilesGlobally() {
- DirectPipeline p = DirectPipeline.createForTest();
+ TestPipeline p = TestPipeline.create();
PCollection<Integer> input = intRangeCollection(p, 101);
PCollection<List<Integer>> quantiles =
@@ -82,7 +81,7 @@ public class ApproximateQuantilesTest {
@Test
public void testQuantilesGobally_comparable() {
- DirectPipeline p = DirectPipeline.createForTest();
+ TestPipeline p = TestPipeline.create();
PCollection<Integer> input = intRangeCollection(p, 101);
PCollection<List<Integer>> quantiles =
[2/2] incubator-beam git commit: This closes #182
Posted by lc...@apache.org.
This closes #182
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96d324e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96d324e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96d324e3
Branch: refs/heads/master
Commit: 96d324e39a7ecc6ec60df309ec9f9b58d289186d
Parents: 8dc9032 5a44d12
Author: Luke Cwik <lc...@google.com>
Authored: Fri Apr 15 09:40:07 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 15 09:40:07 2016 -0700
----------------------------------------------------------------------
.../BlockingDataflowPipelineRunnerTest.java | 20 +++----
.../translation/TransformTranslatorTest.java | 51 ++++++------------
.../beam/sdk/options/DirectPipelineOptions.java | 9 ++--
.../apache/beam/sdk/runners/DirectPipeline.java | 56 --------------------
.../beam/sdk/runners/DirectPipelineRunner.java | 15 ------
.../beam/sdk/io/AvroIOGeneratedClassTest.java | 13 +++--
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 9 ++--
.../sdk/runners/DirectPipelineRunnerTest.java | 29 ++++++----
.../beam/sdk/runners/DirectPipelineTest.java | 35 ------------
.../beam/sdk/runners/TransformTreeTest.java | 7 +--
.../transforms/ApproximateQuantilesTest.java | 5 +-
11 files changed, 66 insertions(+), 183 deletions(-)
----------------------------------------------------------------------