You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/20 23:15:51 UTC
[1/2] incubator-beam git commit: [BEAM-781] Remove Spark's batch unit
tests and rely on ROS tests instead.
Repository: incubator-beam
Updated Branches:
refs/heads/master 2d1df8bc7 -> 2efc27829
[BEAM-781] Remove Spark's batch unit tests and rely on ROS tests instead.
The runner does not support Spark before 1.6 anymore.
Remove tests that can be trusted to ROS.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b3c95026
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b3c95026
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b3c95026
Branch: refs/heads/master
Commit: b3c9502636d48a10000e7cf6f2078c5130d7921f
Parents: 3078193
Author: Sela <an...@paypal.com>
Authored: Thu Oct 20 16:34:06 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Thu Oct 20 16:42:52 2016 +0300
----------------------------------------------------------------------
.../apache/beam/runners/spark/DeDupTest.java | 58 -----
.../beam/runners/spark/EmptyInputTest.java | 74 ------
.../beam/runners/spark/SimpleWordCountTest.java | 107 --------
.../apache/beam/runners/spark/TfIdfTest.java | 259 -------------------
.../spark/translation/CombinePerKeyTest.java | 78 ------
.../spark/translation/DoFnOutputTest.java | 65 -----
.../translation/MultiOutputWordCountTest.java | 174 -------------
.../spark/translation/SerializationTest.java | 199 --------------
.../spark/translation/SideEffectsTest.java | 20 +-
.../translation/SparkPipelineOptionsTest.java | 42 ---
.../translation/TransformTranslatorTest.java | 104 --------
.../translation/WindowedWordCountTest.java | 116 ---------
12 files changed, 1 insertion(+), 1295 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
deleted file mode 100644
index 6c26e76..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import com.google.common.collect.ImmutableSet;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-
-/**
- * A test based on {@code DeDupExample} from the SDK.
- */
-public class DeDupTest {
-
- private static final String[] LINES_ARRAY = {
- "hi there", "hello", "hi there",
- "hi", "hello"};
- private static final List<String> LINES = Arrays.asList(LINES_ARRAY);
- private static final Set<String> EXPECTED_SET =
- ImmutableSet.of("hi there", "hi", "hello");
-
- @Test
- public void testRun() throws Exception {
- SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(options);
- PCollection<String> input = p.apply(Create.of(LINES).withCoder(StringUtf8Coder.of()));
- PCollection<String> output = input.apply(RemoveDuplicates.<String>create());
-
- PAssert.that(output).containsInAnyOrder(EXPECTED_SET);
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
deleted file mode 100644
index 765dd66..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.collect.Iterables;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-
-/**
- * Empty input test.
- */
-public class EmptyInputTest {
-
- @Test
- public void test() throws Exception {
- SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(options);
- List<String> empty = Collections.emptyList();
- PCollection<String> inputWords = p.apply(Create.of(empty).withCoder(StringUtf8Coder.of()));
- PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords()));
-
- EvaluationResult res = (EvaluationResult) p.run();
- assertEquals("", Iterables.getOnlyElement(res.get(output)));
- }
-
- /**
- * Concat words serizaliable function used in test.
- */
- public static class ConcatWords implements SerializableFunction<Iterable<String>, String> {
- @Override
- public String apply(Iterable<String> input) {
- StringBuilder all = new StringBuilder();
- for (String item : input) {
- if (!item.isEmpty()) {
- if (all.length() == 0) {
- all.append(item);
- } else {
- all.append(",");
- all.append(item);
- }
- }
- }
- return all.toString();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
deleted file mode 100644
index 2d13fdd..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import java.io.File;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics;
-import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.io.FileUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Simple word count test.
- */
-public class SimpleWordCountTest {
-
- @Rule
- public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
-
- @Rule
- public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule();
-
- private static final String[] WORDS_ARRAY = {
- "hi there", "hi", "hi sue bob",
- "hi sue", "", "bob hi"};
- private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
- private static final Set<String> EXPECTED_COUNT_SET =
- ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
-
- @Test
- public void testInMem() throws Exception {
- assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));
-
- SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(options);
- PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
- .of()));
- PCollection<String> output = inputWords.apply(new WordCount.CountWords())
- .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
- PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
-
- p.run();
-
- assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d));
- }
-
- @Rule
- public TemporaryFolder testFolder = new TemporaryFolder();
-
- @Test
- public void testOutputFile() throws Exception {
- SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(options);
- PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
- .of()));
- PCollection<String> output = inputWords.apply(new WordCount.CountWords())
- .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
- File outputFile = testFolder.newFile();
- output.apply("WriteCounts", TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());
-
- p.run();
-
- assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)),
- containsInAnyOrder(EXPECTED_COUNT_SET.toArray()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
deleted file mode 100644
index ac9310d..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import java.net.URI;
-import java.util.Arrays;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringDelegateCoder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A test based on {@code TfIdf} from the SDK.
- */
-public class TfIdfTest {
-
- @Test
- public void testTfIdf() throws Exception {
- SparkPipelineOptions opts = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- opts.setRunner(SparkRunner.class);
- Pipeline pipeline = Pipeline.create(opts);
-
- pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
-
- PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
- .apply(Create.of(
- KV.of(new URI("x"), "a b c d"),
- KV.of(new URI("y"), "a b c"),
- KV.of(new URI("z"), "a m n")))
- .apply(new ComputeTfIdf());
-
- PCollection<String> words = wordToUriAndTfIdf
- .apply(Keys.<String>create())
- .apply(RemoveDuplicates.<String>create());
-
- PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d"));
-
- pipeline.run();
- }
-
- /**
- * Duplicated to avoid dependency on beam-examlpes.
- */
- public static class ComputeTfIdf
- extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {
- public ComputeTfIdf() { }
-
- @Override
- public PCollection<KV<String, KV<URI, Double>>> apply(
- PCollection<KV<URI, String>> uriToContent) {
-
- // Compute the total number of documents, and
- // prepare this singleton PCollectionView for
- // use as a side input.
- final PCollectionView<Long> totalDocuments =
- uriToContent
- .apply("GetURIs", Keys.<URI>create())
- .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
- .apply(Count.<URI>globally())
- .apply(View.<Long>asSingleton());
-
- // Create a collection of pairs mapping a URI to each
- // of the words in the document associated with that that URI.
- PCollection<KV<URI, String>> uriToWords = uriToContent
- .apply("SplitWords", ParDo.of(
- new DoFn<KV<URI, String>, KV<URI, String>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- URI uri = c.element().getKey();
- String line = c.element().getValue();
- for (String word : line.split("\\W+")) {
- // Log INFO messages when the word \u201clove\u201d is found.
- if (word.toLowerCase().equals("love")) {
- LOG.info("Found {}", word.toLowerCase());
- }
-
- if (!word.isEmpty()) {
- c.output(KV.of(uri, word.toLowerCase()));
- }
- }
- }
- }));
-
- // Compute a mapping from each word to the total
- // number of documents in which it appears.
- PCollection<KV<String, Long>> wordToDocCount = uriToWords
- .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
- .apply(Values.<String>create())
- .apply("CountDocs", Count.<String>perElement());
-
- // Compute a mapping from each URI to the total
- // number of words in the document associated with that URI.
- PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
- .apply("GetURIs2", Keys.<URI>create())
- .apply("CountWords", Count.<URI>perElement());
-
- // Count, for each (URI, word) pair, the number of
- // occurrences of that word in the document associated
- // with the URI.
- PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords
- .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement());
-
- // Adjust the above collection to a mapping from
- // (URI, word) pairs to counts into an isomorphic mapping
- // from URI to (word, count) pairs, to prepare for a join
- // by the URI key.
- PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
- .apply("ShiftKeys", ParDo.of(
- new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- URI uri = c.element().getKey().getKey();
- String word = c.element().getKey().getValue();
- Long occurrences = c.element().getValue();
- c.output(KV.of(uri, KV.of(word, occurrences)));
- }
- }));
-
- // Prepare to join the mapping of URI to (word, count) pairs with
- // the mapping of URI to total word counts, by associating
- // each of the input PCollection<KV<URI, ...>> with
- // a tuple tag. Each input must have the same key type, URI
- // in this case. The type parameter of the tuple tag matches
- // the types of the values for each collection.
- final TupleTag<Long> wordTotalsTag = new TupleTag<Long>();
- final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<KV<String, Long>>();
- KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple
- .of(wordTotalsTag, uriToWordTotal)
- .and(wordCountsTag, uriToWordAndCount);
-
- // Perform a CoGroupByKey (a sort of pre-join) on the prepared
- // inputs. This yields a mapping from URI to a CoGbkResult
- // (CoGroupByKey Result). The CoGbkResult is a mapping
- // from the above tuple tags to the values in each input
- // associated with a particular URI. In this case, each
- // KV<URI, CoGbkResult> group a URI with the total number of
- // words in that document as well as all the (word, count)
- // pairs for particular words.
- PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput
- .apply("CoGroupByUri", CoGroupByKey.<URI>create());
-
- // Compute a mapping from each word to a (URI, term frequency)
- // pair for each URI. A word's term frequency for a document
- // is simply the number of times that word occurs in the document
- // divided by the total number of words in the document.
- PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
- .apply("ComputeTermFrequencies", ParDo.of(
- new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- URI uri = c.element().getKey();
- Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
-
- for (KV<String, Long> wordAndCount
- : c.element().getValue().getAll(wordCountsTag)) {
- String word = wordAndCount.getKey();
- Long wordCount = wordAndCount.getValue();
- Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
- c.output(KV.of(word, KV.of(uri, termFrequency)));
- }
- }
- }));
-
- // Compute a mapping from each word to its document frequency.
- // A word's document frequency in a corpus is the number of
- // documents in which the word appears divided by the total
- // number of documents in the corpus. Note how the total number of
- // documents is passed as a side input; the same value is
- // presented to each invocation of the DoFn.
- PCollection<KV<String, Double>> wordToDf = wordToDocCount
- .apply("ComputeDocFrequencies", ParDo
- .withSideInputs(totalDocuments)
- .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- String word = c.element().getKey();
- Long documentCount = c.element().getValue();
- Long documentTotal = c.sideInput(totalDocuments);
- Double documentFrequency = documentCount.doubleValue()
- / documentTotal.doubleValue();
-
- c.output(KV.of(word, documentFrequency));
- }
- }));
-
- // Join the term frequency and document frequency
- // collections, each keyed on the word.
- final TupleTag<KV<URI, Double>> tfTag = new TupleTag<KV<URI, Double>>();
- final TupleTag<Double> dfTag = new TupleTag<Double>();
- PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple
- .of(tfTag, wordToUriAndTf)
- .and(dfTag, wordToDf)
- .apply(CoGroupByKey.<String>create());
-
- // Compute a mapping from each word to a (URI, TF-IDF) score
- // for each URI. There are a variety of definitions of TF-IDF
- // ("term frequency - inverse document frequency") score;
- // here we use a basic version that is the term frequency
- // divided by the log of the document frequency.
- return wordToUriAndTfAndDf
- .apply("ComputeTfIdf", ParDo.of(
- new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- String word = c.element().getKey();
- Double df = c.element().getValue().getOnly(dfTag);
-
- for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
- URI uri = uriAndTf.getKey();
- Double tf = uriAndTf.getValue();
- Double tfIdf = tf * Math.log(1 / df);
- c.output(KV.of(word, KV.of(uri, tfIdf)));
- }
- }
- }));
- }
-
- // Instantiate Logger.
- // It is suggested that the user specify the class name of the containing class
- // (in this case ComputeTfIdf).
- private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
deleted file mode 100644
index b1012a3..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import com.google.common.collect.ImmutableList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Combine per key function test.
- */
-public class CombinePerKeyTest {
-
- private static final List<String> WORDS =
- ImmutableList.of("the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog");
- @Test
- public void testRun() {
- PipelineOptions options = PipelineOptionsFactory.create();
- options.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(options);
- PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
- PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>());
- EvaluationResult res = (EvaluationResult) p.run();
- Map<String, Long> actualCnts = new HashMap<>();
- for (KV<String, Long> kv : res.get(cnts)) {
- actualCnts.put(kv.getKey(), kv.getValue());
- }
- Assert.assertEquals(8, actualCnts.size());
- Assert.assertEquals(Long.valueOf(2L), actualCnts.get("the"));
- }
-
- private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> {
- @Override
- public PCollection<KV<T, Long>> apply(PCollection<T> pcol) {
- PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T, KV<T, Long>>() {
- @ProcessElement
- public void processElement(ProcessContext processContext) throws Exception {
- processContext.output(KV.of(processContext.element(), 1L));
- }
- })).setCoder(KvCoder.of(pcol.getCoder(), VarLongCoder.of()));
- return withLongs.apply(Sum.<T>longsPerKey());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
deleted file mode 100644
index b94ca4d..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import java.io.Serializable;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-
-/**
- * DoFN output test.
- */
-public class DoFnOutputTest implements Serializable {
- @Test
- public void test() throws Exception {
- SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(options);
-
- PCollection<String> strings = p.apply(Create.of("a"));
- // Test that values written from startBundle() and finishBundle() are written to
- // the output
- PCollection<String> output = strings.apply(ParDo.of(new OldDoFn<String, String>() {
- @Override
- public void startBundle(Context c) throws Exception {
- c.output("start");
- }
- @Override
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element());
- }
- @Override
- public void finishBundle(Context c) throws Exception {
- c.output("finish");
- }
- }));
-
- PAssert.that(output).containsInAnyOrder("start", "a", "finish");
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
deleted file mode 100644
index f308f2f..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import java.util.Set;
-import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.AggregatorValues;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.ApproximateUnique;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Multi-output word count test.
- */
-public class MultiOutputWordCountTest {
-
- private static final TupleTag<String> upper = new TupleTag<>();
- private static final TupleTag<String> lower = new TupleTag<>();
- private static final TupleTag<KV<String, Long>> lowerCnts = new TupleTag<>();
- private static final TupleTag<KV<String, Long>> upperCnts = new TupleTag<>();
-
- private static final Set<String> EXPECTED_LOWER_COUNTS =
- ImmutableSet.of("are: 2", "some: 3", "words: 3", "more: 2", "to: 1", "count: 1", "and: 2",
- "even: 1", "others: 1");
-
- @Test
- public void testRun() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
- options.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(options);
- PCollection<String> regex = p.apply(Create.of("[^a-zA-Z']+"));
- PCollection<String> w1 = p.apply(Create.of("Here are some words to count", "and some others"));
- PCollection<String> w2 = p.apply(Create.of("Here are some more words", "and even more words"));
- PCollectionList<String> list = PCollectionList.of(w1).and(w2);
-
- PCollection<String> union = list.apply(Flatten.<String>pCollections());
- PCollectionView<String> regexView = regex.apply(View.<String>asSingleton());
- CountWords countWords = new CountWords(regexView);
- PCollectionTuple luc = union.apply(countWords);
- PCollection<Long> unique = luc.get(lowerCnts).apply(
- ApproximateUnique.<KV<String, Long>>globally(16));
-
- EvaluationResult res = (EvaluationResult) p.run();
- PAssert.that(luc.get(lowerCnts).apply(ParDo.of(new FormatCountsFn())))
- .containsInAnyOrder(EXPECTED_LOWER_COUNTS);
- Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));
- Assert.assertEquals("Here", actualUpper.iterator().next().getKey());
- Iterable<Long> actualUniqCount = res.get(unique);
- Assert.assertEquals(9, (long) actualUniqCount.iterator().next());
- int actualTotalWords = res.getAggregatorValue("totalWords", Integer.class);
- Assert.assertEquals(18, actualTotalWords);
- int actualMaxWordLength = res.getAggregatorValue("maxWordLength", Integer.class);
- Assert.assertEquals(6, actualMaxWordLength);
- AggregatorValues<Integer> aggregatorValues = res.getAggregatorValues(countWords
- .getTotalWordsAggregator());
- Assert.assertEquals(18, Iterables.getOnlyElement(aggregatorValues.getValues()).intValue());
- }
-
- /**
- * A {@link DoFn} that tokenizes lines of text into individual words.
- */
- static class ExtractWordsFn extends DoFn<String, String> {
-
- private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords",
- new Sum.SumIntegerFn());
- private final Aggregator<Integer, Integer> maxWordLength = createAggregator("maxWordLength",
- new Max.MaxIntegerFn());
- private final PCollectionView<String> regex;
-
- ExtractWordsFn(PCollectionView<String> regex) {
- this.regex = regex;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- String[] words = c.element().split(c.sideInput(regex));
- for (String word : words) {
- totalWords.addValue(1);
- if (!word.isEmpty()) {
- maxWordLength.addValue(word.length());
- if (Character.isLowerCase(word.charAt(0))) {
- c.output(word);
- } else {
- c.sideOutput(upper, word);
- }
- }
- }
- }
- }
-
- /**
- * Count words {@link PTransform} used in the test.
- */
- public static class CountWords extends PTransform<PCollection<String>, PCollectionTuple> {
-
- private final PCollectionView<String> regex;
- private final ExtractWordsFn extractWordsFn;
-
- public CountWords(PCollectionView<String> regex) {
- this.regex = regex;
- this.extractWordsFn = new ExtractWordsFn(regex);
- }
-
- @Override
- public PCollectionTuple apply(PCollection<String> lines) {
- // Convert lines of text into individual words.
- PCollectionTuple lowerUpper = lines
- .apply(ParDo.of(extractWordsFn)
- .withSideInputs(regex)
- .withOutputTags(lower, TupleTagList.of(upper)));
- lowerUpper.get(lower).setCoder(StringUtf8Coder.of());
- lowerUpper.get(upper).setCoder(StringUtf8Coder.of());
- PCollection<KV<String, Long>> lowerCounts = lowerUpper.get(lower).apply(Count
- .<String>perElement());
- PCollection<KV<String, Long>> upperCounts = lowerUpper.get(upper).apply(Count
- .<String>perElement());
- return PCollectionTuple
- .of(lowerCnts, lowerCounts)
- .and(upperCnts, upperCounts);
- }
-
- Aggregator<Integer, Integer> getTotalWordsAggregator() {
- return extractWordsFn.totalWords;
- }
- }
-
- private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().getKey() + ": " + c.element().getValue());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
deleted file mode 100644
index d8b4a20..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Test;
-
-/**
- * Serialization test.
- */
-public class SerializationTest {
-
- /**
- * Simple String holder.
- */
- public static class StringHolder { // not serializable
- private final String string;
-
- public StringHolder(String string) {
- this.string = string;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- StringHolder that = (StringHolder) o;
- return string.equals(that.string);
- }
-
- @Override
- public int hashCode() {
- return string.hashCode();
- }
-
- @Override
- public String toString() {
- return string;
- }
- }
-
- /**
- * Simple String holder with UTF-8 encoding.
- */
- public static class StringHolderUtf8Coder extends AtomicCoder<StringHolder> {
-
- private final StringUtf8Coder stringUtf8Coder = StringUtf8Coder.of();
-
- @Override
- public void encode(StringHolder value,
- OutputStream outStream,
- Context context) throws IOException {
- stringUtf8Coder.encode(value.toString(), outStream, context);
- }
-
- @Override
- public StringHolder decode(InputStream inStream, Context context) throws IOException {
- return new StringHolder(stringUtf8Coder.decode(inStream, context));
- }
-
- public static Coder<StringHolder> of() {
- return new StringHolderUtf8Coder();
- }
- }
-
- private static final String[] WORDS_ARRAY = {
- "hi there", "hi", "hi sue bob",
- "hi sue", "", "bob hi"};
- private static final List<StringHolder> WORDS = Lists.transform(
- Arrays.asList(WORDS_ARRAY), new Function<String, StringHolder>() {
- @Override public StringHolder apply(String s) {
- return new StringHolder(s);
- }
- });
- private static final Set<StringHolder> EXPECTED_COUNT_SET =
- ImmutableSet.copyOf(Lists.transform(
- Arrays.asList("hi: 5", "there: 1", "sue: 2", "bob: 2"),
- new Function<String, StringHolder>() {
- @Override
- public StringHolder apply(String s) {
- return new StringHolder(s);
- }
- }));
-
- @Test
- public void testRun() throws Exception {
- SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(options);
- PCollection<StringHolder> inputWords =
- p.apply(Create.of(WORDS).withCoder(StringHolderUtf8Coder.of()));
- PCollection<StringHolder> output = inputWords.apply(new CountWords());
-
- PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
-
- p.run();
- }
-
- /**
- * A {@link DoFn} that tokenizes lines of text into individual words.
- */
- static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> {
- private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- // Split the line into words.
- String[] words = WORD_BOUNDARY.split(c.element().toString());
-
- // Keep track of the number of lines without any words encountered while tokenizing.
- // This aggregator is visible in the monitoring UI when run using DataflowRunner.
- if (words.length == 0) {
- emptyLines.addValue(1L);
- }
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(new StringHolder(word));
- }
- }
- }
- }
-
- /**
- * A {@link DoFn} that converts a Word and Count into a printable string.
- */
- private static class FormatCountsFn extends DoFn<KV<StringHolder, Long>, StringHolder> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(new StringHolder(c.element().getKey() + ": " + c.element().getValue()));
- }
- }
-
- private static class CountWords
- extends PTransform<PCollection<StringHolder>, PCollection<StringHolder>> {
- @Override
- public PCollection<StringHolder> apply(PCollection<StringHolder> lines) {
-
- // Convert lines of text into individual words.
- PCollection<StringHolder> words = lines.apply(
- ParDo.of(new ExtractWordsFn()));
-
- // Count the number of times each word occurs.
- PCollection<KV<StringHolder, Long>> wordCounts =
- words.apply(Count.<StringHolder>perElement());
-
- // Format each word and count into a printable string.
-
- return wordCounts.apply(ParDo.of(new FormatCountsFn()));
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index f85baab..1304e12 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -32,8 +32,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
/**
@@ -63,24 +61,8 @@ public class SideEffectsTest implements Serializable {
p.run();
fail("Run should thrown an exception");
} catch (RuntimeException e) {
+ assertTrue(e.getCause() instanceof UserException);
assertNotNull(e.getCause());
-
- // TODO: remove the version check (and the setup and teardown methods) when we no
- // longer support Spark 1.3 or 1.4
- String version = SparkContextFactory.getSparkContext(options).version();
- if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) {
- assertTrue(e.getCause() instanceof UserException);
- }
}
}
-
- @Before
- public void setup() {
- System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "true");
- }
-
- @After
- public void teardown() {
- System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "false");
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
deleted file mode 100644
index d294eca..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Simple test on the Spark runner pipeline options.
- */
-public class SparkPipelineOptionsTest {
- @Test
- public void testDefaultCreateMethod() {
- SparkPipelineOptions actualOptions = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- Assert.assertEquals("local[4]", actualOptions.getSparkMaster());
- }
-
- @Test
- public void testSettingCustomOptions() {
- SparkPipelineOptions actualOptions = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- actualOptions.setSparkMaster("spark://207.184.161.138:7077");
- Assert.assertEquals("spark://207.184.161.138:7077", actualOptions.getSparkMaster());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
deleted file mode 100644
index f72eba7..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Pattern;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.io.FileUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A test for the transforms registered in TransformTranslator.
- * Builds a regular Beam pipeline with each of the mapped
- * transforms, and makes sure that they work when the pipeline is
- * executed in Spark.
- */
-public class TransformTranslatorTest {
- private static final Logger LOG = LoggerFactory.getLogger(TransformTranslatorTest.class);
- @Rule public TemporaryFolder tmp = new TemporaryFolder();
-
- /**
- * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
- * in DirectRunner and on SparkRunner, with the mapped beam-to-spark
- * transforms. Finally it makes sure that the results are the same for both runs.
- */
- @Test
- public void testTextIOReadAndWriteTransforms() throws IOException {
- String sparkOut = runPipeline(SparkRunner.class);
-
- List<String> lines =
- Files.readLines(
- Paths.get("src/test/resources/test_text.txt").toFile(), StandardCharsets.UTF_8);
-
- File sparkOutFile = new File(sparkOut);
- List<String> sparkOutput =
- readFromOutputFiles(sparkOutFile.getParentFile(), sparkOutFile.getName());
-
- // sort output to get a stable result (PCollections are not ordered)
- assertThat(sparkOutput, containsInAnyOrder(lines.toArray()));
- }
-
- private String runPipeline(Class<? extends PipelineRunner<?>> runner) throws IOException {
- PipelineOptions options = PipelineOptionsFactory.create();
- options.setRunner(runner);
- Pipeline p = Pipeline.create(options);
- File outFile = tmp.newFile();
- PCollection<String> lines = p.apply(TextIO.Read.from("src/test/resources/test_text.txt"));
- lines.apply(TextIO.Write.to(outFile.getAbsolutePath()));
- p.run();
- return outFile.getAbsolutePath();
- }
-
- private List<String> readFromOutputFiles(File parent, String outPattern) throws IOException {
- // example pattern: outprefix-00000-of-00001
- Pattern pattern = Pattern.compile(String.format("%s-[0-9]{5}-of-[0-9]{5}", outPattern));
- List<String> lines = new ArrayList<>();
- if (parent.exists() && parent.isDirectory()) {
- //noinspection ConstantConditions
- for (File f : parent.listFiles()) {
- if (pattern.matcher(f.getName()).matches()) {
- LOG.info("For " + outPattern + " reading file " + f.getName());
- lines.addAll(FileUtils.readLines(f, Charsets.UTF_8));
- }
- }
- }
- return lines;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3c95026/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
deleted file mode 100644
index e727d2f..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import com.google.common.collect.ImmutableList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Test;
-
-/**
- * Windowed word count test.
- */
-public class WindowedWordCountTest {
- private static final String[] WORDS_ARRAY = {
- "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
- private static final Long[] TIMESTAMPS_ARRAY = {
- 60000L, 60000L, 60000L, 179000L, 179000L, 179000L};
- private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
- private static final List<Long> TIMESTAMPS = Arrays.asList(TIMESTAMPS_ARRAY);
-
- private static final List<String> EXPECTED_FIXED_SEPARATE_COUNT_SET =
- ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", "hi: 2", "sue: 1", "bob: 1");
-
- @Test
- public void testFixed() throws Exception {
- PipelineOptions opts = PipelineOptionsFactory.create();
- opts.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(opts);
- PCollection<String> inputWords =
- p.apply(Create.timestamped(WORDS, TIMESTAMPS)).setCoder(StringUtf8Coder.of());
- PCollection<String> windowedWords =
- inputWords.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
-
- PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
- .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
- PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET);
-
- p.run();
- }
-
- private static final List<String> EXPECTED_FIXED_SAME_COUNT_SET =
- ImmutableList.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
-
- @Test
- public void testFixed2() throws Exception {
- PipelineOptions opts = PipelineOptionsFactory.create();
- opts.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(opts);
- PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
- .withCoder(StringUtf8Coder.of()));
- PCollection<String> windowedWords = inputWords
- .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))));
-
- PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
- .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
- PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET);
-
- p.run();
- }
-
- private static final List<String> EXPECTED_SLIDING_COUNT_SET =
- ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", "hi: 5", "there: 1", "sue: 2",
- "bob: 2", "hi: 2", "sue: 1", "bob: 1");
-
- @Test
- public void testSliding() throws Exception {
- PipelineOptions opts = PipelineOptionsFactory.create();
- opts.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(opts);
- PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
- .withCoder(StringUtf8Coder.of()));
- PCollection<String> windowedWords = inputWords
- .apply(Window.<String>into(SlidingWindows.of(Duration.standardMinutes(2))
- .every(Duration.standardMinutes(1))));
-
- PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
- .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
- PAssert.that(output).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET);
-
- p.run();
- }
-
-}
[2/2] incubator-beam git commit: This closes #1144
Posted by ke...@apache.org.
This closes #1144
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2efc2782
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2efc2782
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2efc2782
Branch: refs/heads/master
Commit: 2efc27829ac0a1e35df414fea8be26550f5cfbf2
Parents: 2d1df8b b3c9502
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 20 16:14:31 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 16:14:31 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/spark/DeDupTest.java | 58 -----
.../beam/runners/spark/EmptyInputTest.java | 74 ------
.../beam/runners/spark/SimpleWordCountTest.java | 107 --------
.../apache/beam/runners/spark/TfIdfTest.java | 259 -------------------
.../spark/translation/CombinePerKeyTest.java | 78 ------
.../spark/translation/DoFnOutputTest.java | 65 -----
.../translation/MultiOutputWordCountTest.java | 174 -------------
.../spark/translation/SerializationTest.java | 199 --------------
.../spark/translation/SideEffectsTest.java | 20 +-
.../translation/SparkPipelineOptionsTest.java | 42 ---
.../translation/TransformTranslatorTest.java | 104 --------
.../translation/WindowedWordCountTest.java | 116 ---------
12 files changed, 1 insertion(+), 1295 deletions(-)
----------------------------------------------------------------------