You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/20 23:15:51 UTC

[1/2] incubator-beam git commit: [BEAM-781] Remove Spark's batch unit tests and rely on ROS tests instead.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 2d1df8bc7 -> 2efc27829


[BEAM-781] Remove Spark's batch unit tests and rely on ROS tests instead.

The runner does not support Spark before 1.6 anymore.

Remove tests that can be trusted to ROS.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b3c95026
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b3c95026
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b3c95026

Branch: refs/heads/master
Commit: b3c9502636d48a10000e7cf6f2078c5130d7921f
Parents: 3078193
Author: Sela <an...@paypal.com>
Authored: Thu Oct 20 16:34:06 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Thu Oct 20 16:42:52 2016 +0300

----------------------------------------------------------------------
 .../apache/beam/runners/spark/DeDupTest.java    |  58 -----
 .../beam/runners/spark/EmptyInputTest.java      |  74 ------
 .../beam/runners/spark/SimpleWordCountTest.java | 107 --------
 .../apache/beam/runners/spark/TfIdfTest.java    | 259 -------------------
 .../spark/translation/CombinePerKeyTest.java    |  78 ------
 .../spark/translation/DoFnOutputTest.java       |  65 -----
 .../translation/MultiOutputWordCountTest.java   | 174 -------------
 .../spark/translation/SerializationTest.java    | 199 --------------
 .../spark/translation/SideEffectsTest.java      |  20 +-
 .../translation/SparkPipelineOptionsTest.java   |  42 ---
 .../translation/TransformTranslatorTest.java    | 104 --------
 .../translation/WindowedWordCountTest.java      | 116 ---------
 12 files changed, 1 insertion(+), 1295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
deleted file mode 100644
index 6c26e76..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.common.collect.ImmutableSet;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-
-/**
- * A test based on {@code DeDupExample} from the SDK.
- */
-public class DeDupTest {
-
-  private static final String[] LINES_ARRAY = {
-      "hi there", "hello", "hi there",
-      "hi", "hello"};
-  private static final List<String> LINES = Arrays.asList(LINES_ARRAY);
-  private static final Set<String> EXPECTED_SET =
-      ImmutableSet.of("hi there", "hi", "hello");
-
-  @Test
-  public void testRun() throws Exception {
-    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(options);
-    PCollection<String> input = p.apply(Create.of(LINES).withCoder(StringUtf8Coder.of()));
-    PCollection<String> output = input.apply(RemoveDuplicates.<String>create());
-
-    PAssert.that(output).containsInAnyOrder(EXPECTED_SET);
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
deleted file mode 100644
index 765dd66..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.collect.Iterables;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-
-/**
- * Empty input test.
- */
-public class EmptyInputTest {
-
-  @Test
-  public void test() throws Exception {
-    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(options);
-    List<String> empty = Collections.emptyList();
-    PCollection<String> inputWords = p.apply(Create.of(empty).withCoder(StringUtf8Coder.of()));
-    PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords()));
-
-    EvaluationResult res = (EvaluationResult) p.run();
-    assertEquals("", Iterables.getOnlyElement(res.get(output)));
-  }
-
-  /**
-   * Concat words serizaliable function used in test.
-   */
-  public static class ConcatWords implements SerializableFunction<Iterable<String>, String> {
-    @Override
-    public String apply(Iterable<String> input) {
-      StringBuilder all = new StringBuilder();
-      for (String item : input) {
-        if (!item.isEmpty()) {
-          if (all.length() == 0) {
-            all.append(item);
-          } else {
-            all.append(",");
-            all.append(item);
-          }
-        }
-      }
-      return all.toString();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
deleted file mode 100644
index 2d13fdd..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import java.io.File;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics;
-import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.io.FileUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Simple word count test.
- */
-public class SimpleWordCountTest {
-
-  @Rule
-  public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
-
-  @Rule
-  public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule();
-
-  private static final String[] WORDS_ARRAY = {
-      "hi there", "hi", "hi sue bob",
-      "hi sue", "", "bob hi"};
-  private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
-  private static final Set<String> EXPECTED_COUNT_SET =
-      ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
-
-  @Test
-  public void testInMem() throws Exception {
-    assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));
-
-    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(options);
-    PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
-        .of()));
-    PCollection<String> output = inputWords.apply(new WordCount.CountWords())
-        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
-    PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
-
-    p.run();
-
-    assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d));
-  }
-
-  @Rule
-  public TemporaryFolder testFolder = new TemporaryFolder();
-
-  @Test
-  public void testOutputFile() throws Exception {
-    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(options);
-    PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
-        .of()));
-    PCollection<String> output = inputWords.apply(new WordCount.CountWords())
-        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
-    File outputFile = testFolder.newFile();
-    output.apply("WriteCounts", TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());
-
-    p.run();
-
-    assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)),
-        containsInAnyOrder(EXPECTED_COUNT_SET.toArray()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
deleted file mode 100644
index ac9310d..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import java.net.URI;
-import java.util.Arrays;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringDelegateCoder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A test based on {@code TfIdf} from the SDK.
- */
-public class TfIdfTest {
-
-  @Test
-  public void testTfIdf() throws Exception {
-    SparkPipelineOptions opts = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    opts.setRunner(SparkRunner.class);
-    Pipeline pipeline = Pipeline.create(opts);
-
-    pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
-
-    PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
-        .apply(Create.of(
-            KV.of(new URI("x"), "a b c d"),
-            KV.of(new URI("y"), "a b c"),
-            KV.of(new URI("z"), "a m n")))
-        .apply(new ComputeTfIdf());
-
-    PCollection<String> words = wordToUriAndTfIdf
-        .apply(Keys.<String>create())
-        .apply(RemoveDuplicates.<String>create());
-
-    PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d"));
-
-    pipeline.run();
-  }
-
-  /**
-   * Duplicated to avoid dependency on beam-examlpes.
-   */
-  public static class ComputeTfIdf
-      extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {
-    public ComputeTfIdf() { }
-
-    @Override
-    public PCollection<KV<String, KV<URI, Double>>> apply(
-      PCollection<KV<URI, String>> uriToContent) {
-
-      // Compute the total number of documents, and
-      // prepare this singleton PCollectionView for
-      // use as a side input.
-      final PCollectionView<Long> totalDocuments =
-          uriToContent
-          .apply("GetURIs", Keys.<URI>create())
-          .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
-          .apply(Count.<URI>globally())
-          .apply(View.<Long>asSingleton());
-
-      // Create a collection of pairs mapping a URI to each
-      // of the words in the document associated with that that URI.
-      PCollection<KV<URI, String>> uriToWords = uriToContent
-          .apply("SplitWords", ParDo.of(
-              new DoFn<KV<URI, String>, KV<URI, String>>() {
-                @ProcessElement
-                public void processElement(ProcessContext c) {
-                  URI uri = c.element().getKey();
-                  String line = c.element().getValue();
-                  for (String word : line.split("\\W+")) {
-                    // Log INFO messages when the word \u201clove\u201d is found.
-                    if (word.toLowerCase().equals("love")) {
-                      LOG.info("Found {}", word.toLowerCase());
-                    }
-
-                    if (!word.isEmpty()) {
-                      c.output(KV.of(uri, word.toLowerCase()));
-                    }
-                  }
-                }
-              }));
-
-      // Compute a mapping from each word to the total
-      // number of documents in which it appears.
-      PCollection<KV<String, Long>> wordToDocCount = uriToWords
-          .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
-          .apply(Values.<String>create())
-          .apply("CountDocs", Count.<String>perElement());
-
-      // Compute a mapping from each URI to the total
-      // number of words in the document associated with that URI.
-      PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
-          .apply("GetURIs2", Keys.<URI>create())
-          .apply("CountWords", Count.<URI>perElement());
-
-      // Count, for each (URI, word) pair, the number of
-      // occurrences of that word in the document associated
-      // with the URI.
-      PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords
-          .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement());
-
-      // Adjust the above collection to a mapping from
-      // (URI, word) pairs to counts into an isomorphic mapping
-      // from URI to (word, count) pairs, to prepare for a join
-      // by the URI key.
-      PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
-          .apply("ShiftKeys", ParDo.of(
-              new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
-                @ProcessElement
-                public void processElement(ProcessContext c) {
-                  URI uri = c.element().getKey().getKey();
-                  String word = c.element().getKey().getValue();
-                  Long occurrences = c.element().getValue();
-                  c.output(KV.of(uri, KV.of(word, occurrences)));
-                }
-              }));
-
-      // Prepare to join the mapping of URI to (word, count) pairs with
-      // the mapping of URI to total word counts, by associating
-      // each of the input PCollection<KV<URI, ...>> with
-      // a tuple tag. Each input must have the same key type, URI
-      // in this case. The type parameter of the tuple tag matches
-      // the types of the values for each collection.
-      final TupleTag<Long> wordTotalsTag = new TupleTag<Long>();
-      final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<KV<String, Long>>();
-      KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple
-          .of(wordTotalsTag, uriToWordTotal)
-          .and(wordCountsTag, uriToWordAndCount);
-
-      // Perform a CoGroupByKey (a sort of pre-join) on the prepared
-      // inputs. This yields a mapping from URI to a CoGbkResult
-      // (CoGroupByKey Result). The CoGbkResult is a mapping
-      // from the above tuple tags to the values in each input
-      // associated with a particular URI. In this case, each
-      // KV<URI, CoGbkResult> group a URI with the total number of
-      // words in that document as well as all the (word, count)
-      // pairs for particular words.
-      PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput
-          .apply("CoGroupByUri", CoGroupByKey.<URI>create());
-
-      // Compute a mapping from each word to a (URI, term frequency)
-      // pair for each URI. A word's term frequency for a document
-      // is simply the number of times that word occurs in the document
-      // divided by the total number of words in the document.
-      PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
-          .apply("ComputeTermFrequencies", ParDo.of(
-              new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
-                @ProcessElement
-                public void processElement(ProcessContext c) {
-                  URI uri = c.element().getKey();
-                  Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
-
-                  for (KV<String, Long> wordAndCount
-                           : c.element().getValue().getAll(wordCountsTag)) {
-                    String word = wordAndCount.getKey();
-                    Long wordCount = wordAndCount.getValue();
-                    Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
-                    c.output(KV.of(word, KV.of(uri, termFrequency)));
-                  }
-                }
-              }));
-
-      // Compute a mapping from each word to its document frequency.
-      // A word's document frequency in a corpus is the number of
-      // documents in which the word appears divided by the total
-      // number of documents in the corpus. Note how the total number of
-      // documents is passed as a side input; the same value is
-      // presented to each invocation of the DoFn.
-      PCollection<KV<String, Double>> wordToDf = wordToDocCount
-          .apply("ComputeDocFrequencies", ParDo
-              .withSideInputs(totalDocuments)
-              .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
-                @ProcessElement
-                public void processElement(ProcessContext c) {
-                  String word = c.element().getKey();
-                  Long documentCount = c.element().getValue();
-                  Long documentTotal = c.sideInput(totalDocuments);
-                  Double documentFrequency = documentCount.doubleValue()
-                      / documentTotal.doubleValue();
-
-                  c.output(KV.of(word, documentFrequency));
-                }
-              }));
-
-      // Join the term frequency and document frequency
-      // collections, each keyed on the word.
-      final TupleTag<KV<URI, Double>> tfTag = new TupleTag<KV<URI, Double>>();
-      final TupleTag<Double> dfTag = new TupleTag<Double>();
-      PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple
-          .of(tfTag, wordToUriAndTf)
-          .and(dfTag, wordToDf)
-          .apply(CoGroupByKey.<String>create());
-
-      // Compute a mapping from each word to a (URI, TF-IDF) score
-      // for each URI. There are a variety of definitions of TF-IDF
-      // ("term frequency - inverse document frequency") score;
-      // here we use a basic version that is the term frequency
-      // divided by the log of the document frequency.
-      return wordToUriAndTfAndDf
-          .apply("ComputeTfIdf", ParDo.of(
-              new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
-                @ProcessElement
-                public void processElement(ProcessContext c) {
-                  String word = c.element().getKey();
-                  Double df = c.element().getValue().getOnly(dfTag);
-
-                  for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
-                    URI uri = uriAndTf.getKey();
-                    Double tf = uriAndTf.getValue();
-                    Double tfIdf = tf * Math.log(1 / df);
-                    c.output(KV.of(word, KV.of(uri, tfIdf)));
-                  }
-                }
-              }));
-    }
-
-    // Instantiate Logger.
-    // It is suggested that the user specify the class name of the containing class
-    // (in this case ComputeTfIdf).
-    private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
deleted file mode 100644
index b1012a3..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import com.google.common.collect.ImmutableList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Combine per key function test.
- */
-public class CombinePerKeyTest {
-
-    private static final List<String> WORDS =
-        ImmutableList.of("the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog");
-    @Test
-    public void testRun() {
-        PipelineOptions options = PipelineOptionsFactory.create();
-        options.setRunner(SparkRunner.class);
-        Pipeline p = Pipeline.create(options);
-        PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
-        PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>());
-        EvaluationResult res = (EvaluationResult) p.run();
-        Map<String, Long> actualCnts = new HashMap<>();
-        for (KV<String, Long> kv : res.get(cnts)) {
-            actualCnts.put(kv.getKey(), kv.getValue());
-        }
-        Assert.assertEquals(8, actualCnts.size());
-        Assert.assertEquals(Long.valueOf(2L), actualCnts.get("the"));
-    }
-
-    private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> {
-      @Override
-      public PCollection<KV<T, Long>> apply(PCollection<T> pcol) {
-          PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T, KV<T, Long>>() {
-              @ProcessElement
-              public void processElement(ProcessContext processContext) throws Exception {
-                  processContext.output(KV.of(processContext.element(), 1L));
-              }
-          })).setCoder(KvCoder.of(pcol.getCoder(), VarLongCoder.of()));
-          return withLongs.apply(Sum.<T>longsPerKey());
-      }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
deleted file mode 100644
index b94ca4d..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import java.io.Serializable;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-
-/**
- * DoFN output test.
- */
-public class DoFnOutputTest implements Serializable {
-  @Test
-  public void test() throws Exception {
-    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(options);
-
-    PCollection<String> strings = p.apply(Create.of("a"));
-    // Test that values written from startBundle() and finishBundle() are written to
-    // the output
-    PCollection<String> output = strings.apply(ParDo.of(new OldDoFn<String, String>() {
-      @Override
-      public void startBundle(Context c) throws Exception {
-        c.output("start");
-      }
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-        c.output(c.element());
-      }
-      @Override
-      public void finishBundle(Context c) throws Exception {
-        c.output("finish");
-      }
-    }));
-
-    PAssert.that(output).containsInAnyOrder("start", "a", "finish");
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
deleted file mode 100644
index f308f2f..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import java.util.Set;
-import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.AggregatorValues;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.ApproximateUnique;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Multi-output word count test.
- */
-public class MultiOutputWordCountTest {
-
-  private static final TupleTag<String> upper = new TupleTag<>();
-  private static final TupleTag<String> lower = new TupleTag<>();
-  private static final TupleTag<KV<String, Long>> lowerCnts = new TupleTag<>();
-  private static final TupleTag<KV<String, Long>> upperCnts = new TupleTag<>();
-
-  private static final Set<String> EXPECTED_LOWER_COUNTS =
-      ImmutableSet.of("are: 2", "some: 3", "words: 3", "more: 2", "to: 1", "count: 1", "and: 2",
-      "even: 1", "others: 1");
-
-  @Test
-  public void testRun() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(options);
-    PCollection<String> regex = p.apply(Create.of("[^a-zA-Z']+"));
-    PCollection<String> w1 = p.apply(Create.of("Here are some words to count", "and some others"));
-    PCollection<String> w2 = p.apply(Create.of("Here are some more words", "and even more words"));
-    PCollectionList<String> list = PCollectionList.of(w1).and(w2);
-
-    PCollection<String> union = list.apply(Flatten.<String>pCollections());
-    PCollectionView<String> regexView = regex.apply(View.<String>asSingleton());
-    CountWords countWords = new CountWords(regexView);
-    PCollectionTuple luc = union.apply(countWords);
-    PCollection<Long> unique = luc.get(lowerCnts).apply(
-        ApproximateUnique.<KV<String, Long>>globally(16));
-
-    EvaluationResult res = (EvaluationResult) p.run();
-    PAssert.that(luc.get(lowerCnts).apply(ParDo.of(new FormatCountsFn())))
-        .containsInAnyOrder(EXPECTED_LOWER_COUNTS);
-    Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));
-    Assert.assertEquals("Here", actualUpper.iterator().next().getKey());
-    Iterable<Long> actualUniqCount = res.get(unique);
-    Assert.assertEquals(9, (long) actualUniqCount.iterator().next());
-    int actualTotalWords = res.getAggregatorValue("totalWords", Integer.class);
-    Assert.assertEquals(18, actualTotalWords);
-    int actualMaxWordLength = res.getAggregatorValue("maxWordLength", Integer.class);
-    Assert.assertEquals(6, actualMaxWordLength);
-    AggregatorValues<Integer> aggregatorValues = res.getAggregatorValues(countWords
-        .getTotalWordsAggregator());
-    Assert.assertEquals(18, Iterables.getOnlyElement(aggregatorValues.getValues()).intValue());
-  }
-
-  /**
-   * A {@link DoFn} that tokenizes lines of text into individual words.
-   */
-  static class ExtractWordsFn extends DoFn<String, String> {
-
-    private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords",
-        new Sum.SumIntegerFn());
-    private final Aggregator<Integer, Integer> maxWordLength = createAggregator("maxWordLength",
-        new Max.MaxIntegerFn());
-    private final PCollectionView<String> regex;
-
-    ExtractWordsFn(PCollectionView<String> regex) {
-      this.regex = regex;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      String[] words = c.element().split(c.sideInput(regex));
-      for (String word : words) {
-        totalWords.addValue(1);
-        if (!word.isEmpty()) {
-          maxWordLength.addValue(word.length());
-          if (Character.isLowerCase(word.charAt(0))) {
-            c.output(word);
-          } else {
-            c.sideOutput(upper, word);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Count words {@link PTransform} used in the test.
-   */
-  public static class CountWords extends PTransform<PCollection<String>, PCollectionTuple> {
-
-    private final PCollectionView<String> regex;
-    private final ExtractWordsFn extractWordsFn;
-
-    public CountWords(PCollectionView<String> regex) {
-      this.regex = regex;
-      this.extractWordsFn = new ExtractWordsFn(regex);
-    }
-
-    @Override
-    public PCollectionTuple apply(PCollection<String> lines) {
-      // Convert lines of text into individual words.
-      PCollectionTuple lowerUpper = lines
-          .apply(ParDo.of(extractWordsFn)
-              .withSideInputs(regex)
-              .withOutputTags(lower, TupleTagList.of(upper)));
-      lowerUpper.get(lower).setCoder(StringUtf8Coder.of());
-      lowerUpper.get(upper).setCoder(StringUtf8Coder.of());
-      PCollection<KV<String, Long>> lowerCounts = lowerUpper.get(lower).apply(Count
-          .<String>perElement());
-      PCollection<KV<String, Long>> upperCounts = lowerUpper.get(upper).apply(Count
-          .<String>perElement());
-      return PCollectionTuple
-          .of(lowerCnts, lowerCounts)
-          .and(upperCnts, upperCounts);
-    }
-
-    Aggregator<Integer, Integer> getTotalWordsAggregator() {
-      return extractWordsFn.totalWords;
-    }
-  }
-
-  private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(c.element().getKey() + ": " + c.element().getValue());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
deleted file mode 100644
index d8b4a20..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-
-/**
- * Serialization test.
- */
-public class SerializationTest {
-
-  /**
-   * Simple String holder.
-   */
-  public static class StringHolder { // not serializable
-    private final String string;
-
-    public StringHolder(String string) {
-      this.string = string;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      StringHolder that = (StringHolder) o;
-      return string.equals(that.string);
-    }
-
-    @Override
-    public int hashCode() {
-      return string.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return string;
-    }
-  }
-
-  /**
-   * Simple String holder with UTF-8 encoding.
-   */
-  public static class StringHolderUtf8Coder extends AtomicCoder<StringHolder> {
-
-    private final StringUtf8Coder stringUtf8Coder = StringUtf8Coder.of();
-
-    @Override
-    public void encode(StringHolder value,
-                       OutputStream outStream,
-                       Context context) throws IOException {
-      stringUtf8Coder.encode(value.toString(), outStream, context);
-    }
-
-    @Override
-    public StringHolder decode(InputStream inStream, Context context) throws IOException {
-      return new StringHolder(stringUtf8Coder.decode(inStream, context));
-    }
-
-    public static Coder<StringHolder> of() {
-      return new StringHolderUtf8Coder();
-    }
-  }
-
-  private static final String[] WORDS_ARRAY = {
-      "hi there", "hi", "hi sue bob",
-      "hi sue", "", "bob hi"};
-  private static final List<StringHolder> WORDS = Lists.transform(
-      Arrays.asList(WORDS_ARRAY), new Function<String, StringHolder>() {
-        @Override public StringHolder apply(String s) {
-          return new StringHolder(s);
-        }
-      });
-  private static final Set<StringHolder> EXPECTED_COUNT_SET =
-      ImmutableSet.copyOf(Lists.transform(
-          Arrays.asList("hi: 5", "there: 1", "sue: 2", "bob: 2"),
-          new Function<String, StringHolder>() {
-            @Override
-            public StringHolder apply(String s) {
-              return new StringHolder(s);
-            }
-          }));
-
-  @Test
-  public void testRun() throws Exception {
-    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(options);
-    PCollection<StringHolder> inputWords =
-        p.apply(Create.of(WORDS).withCoder(StringHolderUtf8Coder.of()));
-    PCollection<StringHolder> output = inputWords.apply(new CountWords());
-
-    PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
-
-    p.run();
-  }
-
-  /**
-   * A {@link DoFn} that tokenizes lines of text into individual words.
-   */
-  static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> {
-    private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      // Split the line into words.
-      String[] words = WORD_BOUNDARY.split(c.element().toString());
-
-      // Keep track of the number of lines without any words encountered while tokenizing.
-      // This aggregator is visible in the monitoring UI when run using DataflowRunner.
-      if (words.length == 0) {
-        emptyLines.addValue(1L);
-      }
-
-      // Output each word encountered into the output PCollection.
-      for (String word : words) {
-        if (!word.isEmpty()) {
-          c.output(new StringHolder(word));
-        }
-      }
-    }
-  }
-
-  /**
-   * A {@link DoFn} that converts a Word and Count into a printable string.
-   */
-  private static class FormatCountsFn extends DoFn<KV<StringHolder, Long>, StringHolder> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(new StringHolder(c.element().getKey() + ": " + c.element().getValue()));
-    }
-  }
-
-  private static class CountWords
-      extends PTransform<PCollection<StringHolder>, PCollection<StringHolder>> {
-    @Override
-    public PCollection<StringHolder> apply(PCollection<StringHolder> lines) {
-
-      // Convert lines of text into individual words.
-      PCollection<StringHolder> words = lines.apply(
-          ParDo.of(new ExtractWordsFn()));
-
-      // Count the number of times each word occurs.
-      PCollection<KV<StringHolder, Long>> wordCounts =
-          words.apply(Count.<StringHolder>perElement());
-
-      // Format each word and count into a printable string.
-
-      return wordCounts.apply(ParDo.of(new FormatCountsFn()));
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index f85baab..1304e12 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -32,8 +32,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -63,24 +61,8 @@ public class SideEffectsTest implements Serializable {
       p.run();
       fail("Run should thrown an exception");
     } catch (RuntimeException e) {
+      assertTrue(e.getCause() instanceof UserException);
       assertNotNull(e.getCause());
-
-      // TODO: remove the version check (and the setup and teardown methods) when we no
-      // longer support Spark 1.3 or 1.4
-      String version = SparkContextFactory.getSparkContext(options).version();
-      if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) {
-        assertTrue(e.getCause() instanceof UserException);
-      }
     }
   }
-
-  @Before
-  public void setup() {
-    System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "true");
-  }
-
-  @After
-  public void teardown() {
-    System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "false");
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
deleted file mode 100644
index d294eca..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Simple test on the Spark runner pipeline options.
- */
-public class SparkPipelineOptionsTest {
-  @Test
-  public void testDefaultCreateMethod() {
-    SparkPipelineOptions actualOptions = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    Assert.assertEquals("local[4]", actualOptions.getSparkMaster());
-  }
-
-  @Test
-  public void testSettingCustomOptions() {
-    SparkPipelineOptions actualOptions = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    actualOptions.setSparkMaster("spark://207.184.161.138:7077");
-    Assert.assertEquals("spark://207.184.161.138:7077", actualOptions.getSparkMaster());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
deleted file mode 100644
index f72eba7..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Pattern;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.io.FileUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A test for the transforms registered in TransformTranslator.
- * Builds a regular Beam pipeline with each of the mapped
- * transforms, and makes sure that they work when the pipeline is
- * executed in Spark.
- */
-public class TransformTranslatorTest {
-  private static final Logger LOG = LoggerFactory.getLogger(TransformTranslatorTest.class);
-  @Rule public TemporaryFolder tmp = new TemporaryFolder();
-
-  /**
-   * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
-   * in DirectRunner and on SparkRunner, with the mapped beam-to-spark
-   * transforms. Finally it makes sure that the results are the same for both runs.
-   */
-  @Test
-  public void testTextIOReadAndWriteTransforms() throws IOException {
-    String sparkOut = runPipeline(SparkRunner.class);
-
-    List<String> lines =
-        Files.readLines(
-            Paths.get("src/test/resources/test_text.txt").toFile(), StandardCharsets.UTF_8);
-
-    File sparkOutFile = new File(sparkOut);
-    List<String> sparkOutput =
-            readFromOutputFiles(sparkOutFile.getParentFile(), sparkOutFile.getName());
-
-    // sort output to get a stable result (PCollections are not ordered)
-    assertThat(sparkOutput, containsInAnyOrder(lines.toArray()));
-  }
-
-  private String runPipeline(Class<? extends PipelineRunner<?>> runner) throws IOException {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(runner);
-    Pipeline p = Pipeline.create(options);
-    File outFile = tmp.newFile();
-    PCollection<String> lines =  p.apply(TextIO.Read.from("src/test/resources/test_text.txt"));
-    lines.apply(TextIO.Write.to(outFile.getAbsolutePath()));
-    p.run();
-    return outFile.getAbsolutePath();
-  }
-
-  private List<String> readFromOutputFiles(File parent, String outPattern) throws IOException {
-    // example pattern: outprefix-00000-of-00001
-    Pattern pattern = Pattern.compile(String.format("%s-[0-9]{5}-of-[0-9]{5}", outPattern));
-    List<String> lines = new ArrayList<>();
-    if (parent.exists() && parent.isDirectory()) {
-      //noinspection ConstantConditions
-      for (File f : parent.listFiles()) {
-        if (pattern.matcher(f.getName()).matches()) {
-          LOG.info("For " + outPattern + " reading file " + f.getName());
-          lines.addAll(FileUtils.readLines(f, Charsets.UTF_8));
-        }
-      }
-    }
-    return lines;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
deleted file mode 100644
index e727d2f..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import com.google.common.collect.ImmutableList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Test;
-
-/**
- * Windowed word count test.
- */
-public class WindowedWordCountTest {
-  private static final String[] WORDS_ARRAY = {
-      "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
-  private static final Long[] TIMESTAMPS_ARRAY = {
-      60000L, 60000L, 60000L, 179000L, 179000L, 179000L};
-  private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
-  private static final List<Long> TIMESTAMPS = Arrays.asList(TIMESTAMPS_ARRAY);
-
-  private static final List<String> EXPECTED_FIXED_SEPARATE_COUNT_SET =
-      ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", "hi: 2", "sue: 1", "bob: 1");
-
-  @Test
-  public void testFixed() throws Exception {
-    PipelineOptions opts = PipelineOptionsFactory.create();
-    opts.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(opts);
-    PCollection<String> inputWords =
-        p.apply(Create.timestamped(WORDS, TIMESTAMPS)).setCoder(StringUtf8Coder.of());
-    PCollection<String> windowedWords =
-        inputWords.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
-
-    PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
-        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
-    PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET);
-
-    p.run();
-  }
-
-  private static final List<String> EXPECTED_FIXED_SAME_COUNT_SET =
-      ImmutableList.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
-
-  @Test
-  public void testFixed2() throws Exception {
-    PipelineOptions opts = PipelineOptionsFactory.create();
-    opts.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(opts);
-    PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
-        .withCoder(StringUtf8Coder.of()));
-    PCollection<String> windowedWords = inputWords
-        .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))));
-
-    PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
-        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
-    PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET);
-
-    p.run();
-  }
-
-  private static final List<String> EXPECTED_SLIDING_COUNT_SET =
-      ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", "hi: 5", "there: 1", "sue: 2",
-      "bob: 2", "hi: 2", "sue: 1", "bob: 1");
-
-  @Test
-  public void testSliding() throws Exception {
-    PipelineOptions opts = PipelineOptionsFactory.create();
-    opts.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(opts);
-    PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
-        .withCoder(StringUtf8Coder.of()));
-    PCollection<String> windowedWords = inputWords
-        .apply(Window.<String>into(SlidingWindows.of(Duration.standardMinutes(2))
-        .every(Duration.standardMinutes(1))));
-
-    PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
-        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
-    PAssert.that(output).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET);
-
-    p.run();
-  }
-
-}


[2/2] incubator-beam git commit: This closes #1144

Posted by ke...@apache.org.
This closes #1144


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2efc2782
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2efc2782
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2efc2782

Branch: refs/heads/master
Commit: 2efc27829ac0a1e35df414fea8be26550f5cfbf2
Parents: 2d1df8b b3c9502
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 20 16:14:31 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 16:14:31 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/spark/DeDupTest.java    |  58 -----
 .../beam/runners/spark/EmptyInputTest.java      |  74 ------
 .../beam/runners/spark/SimpleWordCountTest.java | 107 --------
 .../apache/beam/runners/spark/TfIdfTest.java    | 259 -------------------
 .../spark/translation/CombinePerKeyTest.java    |  78 ------
 .../spark/translation/DoFnOutputTest.java       |  65 -----
 .../translation/MultiOutputWordCountTest.java   | 174 -------------
 .../spark/translation/SerializationTest.java    | 199 --------------
 .../spark/translation/SideEffectsTest.java      |  20 +-
 .../translation/SparkPipelineOptionsTest.java   |  42 ---
 .../translation/TransformTranslatorTest.java    | 104 --------
 .../translation/WindowedWordCountTest.java      | 116 ---------
 12 files changed, 1 insertion(+), 1295 deletions(-)
----------------------------------------------------------------------