You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/04/19 21:40:31 UTC

[4/5] incubator-beam git commit: Add unit test for TextIO output to support the mvn exec:exec example we provide in README

Add unit test for TextIO output to support the mvn exec:exec example we provide in README

Satisfy checkstyle


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/932e5b49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/932e5b49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/932e5b49

Branch: refs/heads/master
Commit: 932e5b4976e2c5bf388768f7d3cdeb4da4e84e71
Parents: 1ca3b30
Author: Sela <an...@paypal.com>
Authored: Thu Apr 14 23:18:24 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Tue Apr 19 22:23:17 2016 +0300

----------------------------------------------------------------------
 .../spark/translation/EvaluationContext.java    |  3 +-
 .../beam/runners/spark/SimpleWordCountTest.java | 35 ++++++++++++++++++--
 2 files changed, 35 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/932e5b49/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 531a6ce..6d49bd3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -262,7 +262,8 @@ public class EvaluationContext implements EvaluationResult {
 
   @Override
   public <T> Iterable<T> get(PCollection<T> pcollection) {
-    @SuppressWarnings("unchecked") RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
+    @SuppressWarnings("unchecked")
+    RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
     Iterable<WindowedValue<T>> windowedValues = rddHolder.getValues(pcollection);
     return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/932e5b49/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index faa4dbf..4e9c0b8 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.spark;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
@@ -31,11 +32,18 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.io.FileUtils;
+import org.junit.rules.TemporaryFolder;
+import org.junit.Rule;
+import org.junit.Test;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 
-import org.junit.Test;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
 
+import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
@@ -50,7 +58,7 @@ public class SimpleWordCountTest {
       ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
 
   @Test
-  public void testRun() throws Exception {
+  public void testInMem() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
@@ -64,6 +72,29 @@ public class SimpleWordCountTest {
     res.close();
   }
 
+  @Rule
+  public TemporaryFolder testFolder = new TemporaryFolder();
+
+  @Test
+  public void testOutputFile() throws Exception {
+    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+    options.setRunner(SparkPipelineRunner.class);
+    Pipeline p = Pipeline.create(options);
+    PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder
+        .of());
+    PCollection<String> output = inputWords.apply(new CountWords());
+
+    File outputFile = testFolder.newFile();
+    output.apply(
+        TextIO.Write.named("WriteCounts").to(outputFile.getAbsolutePath()).withoutSharding());
+
+    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    res.close();
+
+    assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)),
+        containsInAnyOrder(EXPECTED_COUNT_SET.toArray()));
+  }
+
   /**
    * A DoFn that tokenizes lines of text into individual words.
    */