You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/26 17:52:36 UTC

[1/3] incubator-beam git commit: Spark tests: force spark runner

Repository: incubator-beam
Updated Branches:
  refs/heads/master 8597a3cf4 -> 3c6e147d9


Spark tests: force spark runner

They were somehow using the DirectRunner before.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/937f58e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/937f58e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/937f58e6

Branch: refs/heads/master
Commit: 937f58e67ed5010c86005a4e7e5613ea682a3a57
Parents: 5101158
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jul 25 21:45:06 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 26 10:52:28 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/spark/io/AvroPipelineTest.java | 5 ++++-
 .../runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java   | 5 ++++-
 .../beam/runners/spark/translation/CombinePerKeyTest.java       | 5 ++++-
 .../runners/spark/translation/MultiOutputWordCountTest.java     | 5 ++++-
 4 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 4cce03d..787292e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -24,6 +24,7 @@ import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -74,7 +75,9 @@ public class AvroPipelineTest {
     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
     populateGenericFile(Lists.newArrayList(savedRecord), schema);
 
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(SparkRunner.class);
+    Pipeline p = Pipeline.create(options);
     PCollection<GenericRecord> input = p.apply(
         AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
     input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index 4d1658f..6d09503 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.coders.WritableCoder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -69,7 +70,9 @@ public class HadoopFileFormatPipelineTest {
   public void testSequenceFile() throws Exception {
     populateFile();
 
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(SparkRunner.class);
+    Pipeline p = Pipeline.create(options);
     @SuppressWarnings("unchecked")
     Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
         (Class<? extends FileInputFormat<IntWritable, Text>>)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 65c6870..600217d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -51,7 +52,9 @@ public class CombinePerKeyTest {
         ImmutableList.of("the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog");
     @Test
     public void testRun() {
-        Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+        PipelineOptions options = PipelineOptionsFactory.create();
+        options.setRunner(SparkRunner.class);
+        Pipeline p = Pipeline.create(options);
         PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
         PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>());
         EvaluationResult res = SparkRunner.create().run(p);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937f58e6/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 787691d..ded3eb2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -22,6 +22,7 @@ import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.testing.PAssert;
@@ -68,7 +69,9 @@ public class MultiOutputWordCountTest {
 
   @Test
   public void testRun() throws Exception {
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(SparkRunner.class);
+    Pipeline p = Pipeline.create(options);
     PCollection<String> regex = p.apply(Create.of("[^a-zA-Z']+"));
     PCollection<String> w1 = p.apply(Create.of("Here are some words to count", "and some others"));
     PCollection<String> w2 = p.apply(Create.of("Here are some more words", "and even more words"));


[2/3] incubator-beam git commit: Remove spark test dependency on DirectRunner

Posted by dh...@apache.org.
Remove spark test dependency on DirectRunner

Rather than assert that SparkRunner matches DirectRunner, assert that SparkRunner is correct.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51011581
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51011581
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51011581

Branch: refs/heads/master
Commit: 51011581f70fa1f65de62adaba94c193af03fb61
Parents: 8597a3c
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jul 25 18:37:57 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 26 10:52:28 2016 -0700

----------------------------------------------------------------------
 runners/spark/pom.xml                                  |  5 -----
 .../spark/translation/TransformTranslatorTest.java     | 13 +++++++------
 2 files changed, 7 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51011581/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 665f15d..60d9ef3 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -246,11 +246,6 @@
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-direct-java</artifactId>
-      <version>0.2.0-incubating-SNAPSHOT</version>
-    </dependency>
 
     <!-- Depend on test jar to scan for RunnableOnService tests -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51011581/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index a17e8f3..e1789f1 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.spark.translation;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
-import org.apache.beam.runners.direct.DirectRunner;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
@@ -31,6 +30,7 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.values.PCollection;
 
 import com.google.common.base.Charsets;
+import com.google.common.io.Files;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.Rule;
@@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Pattern;
@@ -62,19 +64,18 @@ public class TransformTranslatorTest {
    */
   @Test
   public void testTextIOReadAndWriteTransforms() throws IOException {
-    String directOut = runPipeline(DirectRunner.class);
     String sparkOut = runPipeline(SparkRunner.class);
 
-    File directOutFile = new File(directOut);
-    List<String> directOutput =
-            readFromOutputFiles(directOutFile.getParentFile(), directOutFile.getName());
+    List<String> lines =
+        Files.readLines(
+            Paths.get("src/test/resources/test_text.txt").toFile(), StandardCharsets.UTF_8);
 
     File sparkOutFile = new File(sparkOut);
     List<String> sparkOutput =
             readFromOutputFiles(sparkOutFile.getParentFile(), sparkOutFile.getName());
 
     // sort output to get a stable result (PCollections are not ordered)
-    assertThat(sparkOutput, containsInAnyOrder(directOutput.toArray()));
+    assertThat(sparkOutput, containsInAnyOrder(lines.toArray()));
   }
 
   private String runPipeline(Class<? extends PipelineRunner<?>> runner) throws IOException {


[3/3] incubator-beam git commit: Closes #731

Posted by dh...@apache.org.
Closes #731


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3c6e147d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3c6e147d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3c6e147d

Branch: refs/heads/master
Commit: 3c6e147d98151787d13ed551d75007b639cac638
Parents: 8597a3c 937f58e
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jul 26 10:52:29 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 26 10:52:29 2016 -0700

----------------------------------------------------------------------
 runners/spark/pom.xml                                  |  5 -----
 .../apache/beam/runners/spark/io/AvroPipelineTest.java |  5 ++++-
 .../spark/io/hadoop/HadoopFileFormatPipelineTest.java  |  5 ++++-
 .../runners/spark/translation/CombinePerKeyTest.java   |  5 ++++-
 .../spark/translation/MultiOutputWordCountTest.java    |  5 ++++-
 .../spark/translation/TransformTranslatorTest.java     | 13 +++++++------
 6 files changed, 23 insertions(+), 15 deletions(-)
----------------------------------------------------------------------