You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/26 17:52:36 UTC
[1/3] incubator-beam git commit: Spark tests: force spark runner
Repository: incubator-beam
Updated Branches:
refs/heads/master 8597a3cf4 -> 3c6e147d9
Spark tests: force spark runner
They were somehow using the DirectRunner before.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/937f58e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/937f58e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/937f58e6
Branch: refs/heads/master
Commit: 937f58e67ed5010c86005a4e7e5613ea682a3a57
Parents: 5101158
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jul 25 21:45:06 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 26 10:52:28 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/runners/spark/io/AvroPipelineTest.java | 5 ++++-
.../runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java | 5 ++++-
.../beam/runners/spark/translation/CombinePerKeyTest.java | 5 ++++-
.../runners/spark/translation/MultiOutputWordCountTest.java | 5 ++++-
4 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 4cce03d..787292e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -24,6 +24,7 @@ import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
@@ -74,7 +75,9 @@ public class AvroPipelineTest {
savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
populateGenericFile(Lists.newArrayList(savedRecord), schema);
- Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(SparkRunner.class);
+ Pipeline p = Pipeline.create(options);
PCollection<GenericRecord> input = p.apply(
AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index 4d1658f..6d09503 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.coders.WritableCoder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -69,7 +70,9 @@ public class HadoopFileFormatPipelineTest {
public void testSequenceFile() throws Exception {
populateFile();
- Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(SparkRunner.class);
+ Pipeline p = Pipeline.create(options);
@SuppressWarnings("unchecked")
Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
(Class<? extends FileInputFormat<IntWritable, Text>>)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 65c6870..600217d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -51,7 +52,9 @@ public class CombinePerKeyTest {
ImmutableList.of("the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog");
@Test
public void testRun() {
- Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(SparkRunner.class);
+ Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>());
EvaluationResult res = SparkRunner.create().run(p);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 787691d..ded3eb2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -22,6 +22,7 @@ import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.testing.PAssert;
@@ -68,7 +69,9 @@ public class MultiOutputWordCountTest {
@Test
public void testRun() throws Exception {
- Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(SparkRunner.class);
+ Pipeline p = Pipeline.create(options);
PCollection<String> regex = p.apply(Create.of("[^a-zA-Z']+"));
PCollection<String> w1 = p.apply(Create.of("Here are some words to count", "and some others"));
PCollection<String> w2 = p.apply(Create.of("Here are some more words", "and even more words"));
[2/3] incubator-beam git commit: Remove spark test dependency on
DirectRunner
Posted by dh...@apache.org.
Remove spark test dependency on DirectRunner
Rather than assert that SparkRunner matches DirectRunner, assert that SparkRunner is correct.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51011581
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51011581
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51011581
Branch: refs/heads/master
Commit: 51011581f70fa1f65de62adaba94c193af03fb61
Parents: 8597a3c
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jul 25 18:37:57 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 26 10:52:28 2016 -0700
----------------------------------------------------------------------
runners/spark/pom.xml | 5 -----
.../spark/translation/TransformTranslatorTest.java | 13 +++++++------
2 files changed, 7 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51011581/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 665f15d..60d9ef3 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -246,11 +246,6 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-direct-java</artifactId>
- <version>0.2.0-incubating-SNAPSHOT</version>
- </dependency>
<!-- Depend on test jar to scan for RunnableOnService tests -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51011581/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index a17e8f3..e1789f1 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.spark.translation;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
-import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
@@ -31,6 +30,7 @@ import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.values.PCollection;
import com.google.common.base.Charsets;
+import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.junit.Rule;
@@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
@@ -62,19 +64,18 @@ public class TransformTranslatorTest {
*/
@Test
public void testTextIOReadAndWriteTransforms() throws IOException {
- String directOut = runPipeline(DirectRunner.class);
String sparkOut = runPipeline(SparkRunner.class);
- File directOutFile = new File(directOut);
- List<String> directOutput =
- readFromOutputFiles(directOutFile.getParentFile(), directOutFile.getName());
+ List<String> lines =
+ Files.readLines(
+ Paths.get("src/test/resources/test_text.txt").toFile(), StandardCharsets.UTF_8);
File sparkOutFile = new File(sparkOut);
List<String> sparkOutput =
readFromOutputFiles(sparkOutFile.getParentFile(), sparkOutFile.getName());
// sort output to get a stable result (PCollections are not ordered)
- assertThat(sparkOutput, containsInAnyOrder(directOutput.toArray()));
+ assertThat(sparkOutput, containsInAnyOrder(lines.toArray()));
}
private String runPipeline(Class<? extends PipelineRunner<?>> runner) throws IOException {
[3/3] incubator-beam git commit: Closes #731
Posted by dh...@apache.org.
Closes #731
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3c6e147d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3c6e147d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3c6e147d
Branch: refs/heads/master
Commit: 3c6e147d98151787d13ed551d75007b639cac638
Parents: 8597a3c 937f58e
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jul 26 10:52:29 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 26 10:52:29 2016 -0700
----------------------------------------------------------------------
runners/spark/pom.xml | 5 -----
.../apache/beam/runners/spark/io/AvroPipelineTest.java | 5 ++++-
.../spark/io/hadoop/HadoopFileFormatPipelineTest.java | 5 ++++-
.../runners/spark/translation/CombinePerKeyTest.java | 5 ++++-
.../spark/translation/MultiOutputWordCountTest.java | 5 ++++-
.../spark/translation/TransformTranslatorTest.java | 13 +++++++------
6 files changed, 23 insertions(+), 15 deletions(-)
----------------------------------------------------------------------