You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/23 03:13:42 UTC
[1/3] incubator-beam git commit: Closes #539
Repository: incubator-beam
Updated Branches:
refs/heads/master 122cd0466 -> cf1464465
Closes #539
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cf146446
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cf146446
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cf146446
Branch: refs/heads/master
Commit: cf1464465e1e65be2e4896cc6f693f8bbe82b844
Parents: 122cd04 d3dc368
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 22 19:51:50 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 22 19:51:50 2016 -0700
----------------------------------------------------------------------
runners/spark/pom.xml | 11 -
.../beam/runners/spark/examples/WordCount.java | 137 +++++++++++++
.../beam/runners/spark/SimpleWordCountTest.java | 76 +------
.../apache/beam/runners/spark/TfIdfTest.java | 199 ++++++++++++++++++-
.../beam/runners/spark/io/NumShardsTest.java | 2 +-
.../translation/WindowedWordCountTest.java | 12 +-
.../streaming/SimpleStreamingWordCountTest.java | 6 +-
7 files changed, 353 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
[3/3] incubator-beam git commit: Remove dependency on
beam-examples-java.
Posted by dh...@apache.org.
Remove dependency on beam-examples-java.
Duplicate WordCount into spark examlpes package.
Duplicate parts of TfIdf from beam examlpes.
Better reuse of WordCount and its parts.
Remove dependency on beam-examples-java
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/12fb7781
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/12fb7781
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/12fb7781
Branch: refs/heads/master
Commit: 12fb77811d7bf8c72a3c0a48f391bab3c815fbef
Parents: 122cd04
Author: Sela <an...@paypal.com>
Authored: Mon Jun 27 20:59:07 2016 +0300
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 22 19:51:50 2016 -0700
----------------------------------------------------------------------
runners/spark/pom.xml | 11 -
.../beam/runners/spark/examples/WordCount.java | 138 +++++++++++++
.../beam/runners/spark/SimpleWordCountTest.java | 76 +------
.../apache/beam/runners/spark/TfIdfTest.java | 200 ++++++++++++++++++-
.../beam/runners/spark/io/NumShardsTest.java | 2 +-
.../translation/WindowedWordCountTest.java | 12 +-
.../streaming/SimpleStreamingWordCountTest.java | 6 +-
7 files changed, 355 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 7f65e16..665f15d 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -222,17 +222,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-examples-java</artifactId>
- <exclusions>
- <!-- Use Hadoop/Spark's backend logger instead of jdk14 for tests -->
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>${avro.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
new file mode 100644
index 0000000..defd635
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.examples;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Duplicated from {@link org.apache.beam.examples.WordCount} to avoid dependency on
+ * beam-examlpes.
+ */
+public class WordCount {
+
+ /**
+ * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out-
+ * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
+ * pipeline.
+ */
+ static class ExtractWordsFn extends DoFn<String, String> {
+ private final Aggregator<Long, Long> emptyLines =
+ createAggregator("emptyLines", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (c.element().trim().isEmpty()) {
+ emptyLines.addValue(1L);
+ }
+
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ /** A SimpleFunction that converts a Word and Count into a printable string. */
+ public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
+ @Override
+ public String apply(KV<String, Long> input) {
+ return input.getKey() + ": " + input.getValue();
+ }
+ }
+
+ /**
+ * A PTransform that converts a PCollection containing lines of text into a PCollection of
+ * formatted word counts.
+ *
+ * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
+ * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
+ * modular testing, and an improved monitoring experience.
+ */
+ public static class CountWords extends PTransform<PCollection<String>,
+ PCollection<KV<String, Long>>> {
+ @Override
+ public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+
+ // Convert lines of text into individual words.
+ PCollection<String> words = lines.apply(
+ ParDo.of(new ExtractWordsFn()));
+
+ // Count the number of times each word occurs.
+ PCollection<KV<String, Long>> wordCounts =
+ words.apply(Count.<String>perElement());
+
+ return wordCounts;
+ }
+ }
+
+ /**
+ * Options supported by {@link WordCount}.
+ *
+ * <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments
+ * to be processed by the command-line parser, and specify default values for them. You can then
+ * access the options values in your pipeline code.
+ *
+ * <p>Inherits standard configuration options.
+ */
+ public static interface WordCountOptions extends PipelineOptions {
+ @Description("Path of the file to read from")
+ @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
+ String getInputFile();
+ void setInputFile(String value);
+
+ @Description("Path of the file to write to")
+ String getOutput();
+ void setOutput(String value);
+ }
+
+ public static void main(String[] args) {
+ WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+ .as(WordCountOptions.class);
+ Pipeline p = Pipeline.create(options);
+
+ // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
+ // static FormatAsTextFn() to the ParDo transform.
+ //TODO: remove withoutValidation once possible
+ p.apply("ReadLines", TextIO.Read.from(options.getInputFile()).withoutValidation())
+ .apply(new CountWords())
+ .apply(MapElements.via(new FormatAsTextFn()))
+ .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
+
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 6a3edd7..6f5ce5e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -21,19 +21,14 @@ package org.apache.beam.runners.spark;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
+import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import com.google.common.collect.ImmutableSet;
@@ -48,7 +43,6 @@ import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
-import java.util.regex.Pattern;
/**
* Simple word count test.
@@ -68,7 +62,8 @@ public class SimpleWordCountTest {
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
.of()));
- PCollection<String> output = inputWords.apply(new CountWords());
+ PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+ .apply(MapElements.via(new WordCount.FormatAsTextFn()));
PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
@@ -86,7 +81,8 @@ public class SimpleWordCountTest {
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
.of()));
- PCollection<String> output = inputWords.apply(new CountWords());
+ PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+ .apply(MapElements.via(new WordCount.FormatAsTextFn()));
File outputFile = testFolder.newFile();
output.apply("WriteCounts", TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());
@@ -97,64 +93,4 @@ public class SimpleWordCountTest {
assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)),
containsInAnyOrder(EXPECTED_COUNT_SET.toArray()));
}
-
- /**
- * A DoFn that tokenizes lines of text into individual words.
- */
- static class ExtractWordsFn extends DoFn<String, String> {
- private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
-
- @Override
- public void processElement(ProcessContext c) {
- // Split the line into words.
- String[] words = WORD_BOUNDARY.split(c.element());
-
- // Keep track of the number of lines without any words encountered while tokenizing.
- // This aggregator is visible in the monitoring UI when run using DataflowRunner.
- if (words.length == 0) {
- emptyLines.addValue(1L);
- }
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- /**
- * A DoFn that converts a Word and Count into a printable string.
- */
- private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element().getKey() + ": " + c.element().getValue());
- }
- }
-
- /**
- * A {@link PTransform} counting words.
- */
- public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> {
- @Override
- public PCollection<String> apply(PCollection<String> lines) {
-
- // Convert lines of text into individual words.
- PCollection<String> words = lines.apply(
- ParDo.of(new ExtractWordsFn()));
-
- // Count the number of times each word occurs.
- PCollection<KV<String, Long>> wordCounts =
- words.apply(Count.<String>perElement());
-
- // Format each word and count into a printable string.
-
- return wordCounts.apply(ParDo.of(new FormatCountsFn()));
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index df78338..4052e65 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -18,18 +18,30 @@
package org.apache.beam.runners.spark;
-import org.apache.beam.examples.complete.TfIdf;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringDelegateCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Keys;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Arrays;
@@ -52,7 +64,7 @@ public class TfIdfTest {
KV.of(new URI("x"), "a b c d"),
KV.of(new URI("y"), "a b c"),
KV.of(new URI("z"), "a m n")))
- .apply(new TfIdf.ComputeTfIdf());
+ .apply(new ComputeTfIdf());
PCollection<String> words = wordToUriAndTfIdf
.apply(Keys.<String>create())
@@ -64,4 +76,188 @@ public class TfIdfTest {
res.close();
}
+ /**
+ * Duplicated from {@link org.apache.beam.examples.complete.TfIdf} to avoid dependency on
+ * beam-examlpes.
+ */
+ public static class ComputeTfIdf
+ extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {
+ public ComputeTfIdf() { }
+
+ @Override
+ public PCollection<KV<String, KV<URI, Double>>> apply(
+ PCollection<KV<URI, String>> uriToContent) {
+
+ // Compute the total number of documents, and
+ // prepare this singleton PCollectionView for
+ // use as a side input.
+ final PCollectionView<Long> totalDocuments =
+ uriToContent
+ .apply("GetURIs", Keys.<URI>create())
+ .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
+ .apply(Count.<URI>globally())
+ .apply(View.<Long>asSingleton());
+
+ // Create a collection of pairs mapping a URI to each
+ // of the words in the document associated with that that URI.
+ PCollection<KV<URI, String>> uriToWords = uriToContent
+ .apply("SplitWords", ParDo.of(
+ new DoFn<KV<URI, String>, KV<URI, String>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey();
+ String line = c.element().getValue();
+ for (String word : line.split("\\W+")) {
+ // Log INFO messages when the word \u201clove\u201d is found.
+ if (word.toLowerCase().equals("love")) {
+ LOG.info("Found {}", word.toLowerCase());
+ }
+
+ if (!word.isEmpty()) {
+ c.output(KV.of(uri, word.toLowerCase()));
+ }
+ }
+ }
+ }));
+
+ // Compute a mapping from each word to the total
+ // number of documents in which it appears.
+ PCollection<KV<String, Long>> wordToDocCount = uriToWords
+ .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
+ .apply(Values.<String>create())
+ .apply("CountDocs", Count.<String>perElement());
+
+ // Compute a mapping from each URI to the total
+ // number of words in the document associated with that URI.
+ PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
+ .apply("GetURIs2", Keys.<URI>create())
+ .apply("CountWords", Count.<URI>perElement());
+
+ // Count, for each (URI, word) pair, the number of
+ // occurrences of that word in the document associated
+ // with the URI.
+ PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords
+ .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement());
+
+ // Adjust the above collection to a mapping from
+ // (URI, word) pairs to counts into an isomorphic mapping
+ // from URI to (word, count) pairs, to prepare for a join
+ // by the URI key.
+ PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
+ .apply("ShiftKeys", ParDo.of(
+ new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey().getKey();
+ String word = c.element().getKey().getValue();
+ Long occurrences = c.element().getValue();
+ c.output(KV.of(uri, KV.of(word, occurrences)));
+ }
+ }));
+
+ // Prepare to join the mapping of URI to (word, count) pairs with
+ // the mapping of URI to total word counts, by associating
+ // each of the input PCollection<KV<URI, ...>> with
+ // a tuple tag. Each input must have the same key type, URI
+ // in this case. The type parameter of the tuple tag matches
+ // the types of the values for each collection.
+ final TupleTag<Long> wordTotalsTag = new TupleTag<Long>();
+ final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<KV<String, Long>>();
+ KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple
+ .of(wordTotalsTag, uriToWordTotal)
+ .and(wordCountsTag, uriToWordAndCount);
+
+ // Perform a CoGroupByKey (a sort of pre-join) on the prepared
+ // inputs. This yields a mapping from URI to a CoGbkResult
+ // (CoGroupByKey Result). The CoGbkResult is a mapping
+ // from the above tuple tags to the values in each input
+ // associated with a particular URI. In this case, each
+ // KV<URI, CoGbkResult> group a URI with the total number of
+ // words in that document as well as all the (word, count)
+ // pairs for particular words.
+ PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput
+ .apply("CoGroupByUri", CoGroupByKey.<URI>create());
+
+ // Compute a mapping from each word to a (URI, term frequency)
+ // pair for each URI. A word's term frequency for a document
+ // is simply the number of times that word occurs in the document
+ // divided by the total number of words in the document.
+ PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
+ .apply("ComputeTermFrequencies", ParDo.of(
+ new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey();
+ Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
+
+ for (KV<String, Long> wordAndCount
+ : c.element().getValue().getAll(wordCountsTag)) {
+ String word = wordAndCount.getKey();
+ Long wordCount = wordAndCount.getValue();
+ Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
+ c.output(KV.of(word, KV.of(uri, termFrequency)));
+ }
+ }
+ }));
+
+ // Compute a mapping from each word to its document frequency.
+ // A word's document frequency in a corpus is the number of
+ // documents in which the word appears divided by the total
+ // number of documents in the corpus. Note how the total number of
+ // documents is passed as a side input; the same value is
+ // presented to each invocation of the DoFn.
+ PCollection<KV<String, Double>> wordToDf = wordToDocCount
+ .apply("ComputeDocFrequencies", ParDo
+ .withSideInputs(totalDocuments)
+ .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ String word = c.element().getKey();
+ Long documentCount = c.element().getValue();
+ Long documentTotal = c.sideInput(totalDocuments);
+ Double documentFrequency = documentCount.doubleValue()
+ / documentTotal.doubleValue();
+
+ c.output(KV.of(word, documentFrequency));
+ }
+ }));
+
+ // Join the term frequency and document frequency
+ // collections, each keyed on the word.
+ final TupleTag<KV<URI, Double>> tfTag = new TupleTag<KV<URI, Double>>();
+ final TupleTag<Double> dfTag = new TupleTag<Double>();
+ PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple
+ .of(tfTag, wordToUriAndTf)
+ .and(dfTag, wordToDf)
+ .apply(CoGroupByKey.<String>create());
+
+ // Compute a mapping from each word to a (URI, TF-IDF) score
+ // for each URI. There are a variety of definitions of TF-IDF
+ // ("term frequency - inverse document frequency") score;
+ // here we use a basic version that is the term frequency
+ // divided by the log of the document frequency.
+ return wordToUriAndTfAndDf
+ .apply("ComputeTfIdf", ParDo.of(
+ new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ String word = c.element().getKey();
+ Double df = c.element().getValue().getOnly(dfTag);
+
+ for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
+ URI uri = uriAndTf.getKey();
+ Double tf = uriAndTf.getValue();
+ Double tfIdf = tf * Math.log(1 / df);
+ c.output(KV.of(word, KV.of(uri, tfIdf)));
+ }
+ }
+ }));
+ }
+
+ // Instantiate Logger.
+ // It is suggested that the user specify the class name of the containing class
+ // (in this case ComputeTfIdf).
+ private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index b4268d6..36d8b67 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -21,10 +21,10 @@ package org.apache.beam.runners.spark.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import org.apache.beam.examples.WordCount;
import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index 043d506..b70e090 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -19,14 +19,15 @@
package org.apache.beam.runners.spark.translation;
import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SimpleWordCountTest;
import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -64,7 +65,8 @@ public class WindowedWordCountTest {
PCollection<String> windowedWords =
inputWords.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
- PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+ PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
+ .apply(MapElements.via(new WordCount.FormatAsTextFn()));
PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET);
@@ -85,7 +87,8 @@ public class WindowedWordCountTest {
PCollection<String> windowedWords = inputWords
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))));
- PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+ PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
+ .apply(MapElements.via(new WordCount.FormatAsTextFn()));
PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET);
@@ -108,7 +111,8 @@ public class WindowedWordCountTest {
.apply(Window.<String>into(SlidingWindows.of(Duration.standardMinutes(2))
.every(Duration.standardMinutes(1))));
- PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+ PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
+ .apply(MapElements.via(new WordCount.FormatAsTextFn()));
PAssert.that(output).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 75a702b..75ab274 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -19,14 +19,15 @@ package org.apache.beam.runners.spark.translation.streaming;
import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SimpleWordCountTest;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
+import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
@@ -65,7 +66,8 @@ public class SimpleStreamingWordCountTest implements Serializable {
PCollection<String> windowedWords = inputWords
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
- PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+ PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
+ .apply(MapElements.via(new WordCount.FormatAsTextFn()));
PAssertStreaming.assertContents(output, EXPECTED_COUNTS);
EvaluationResult res = SparkRunner.create(options).run(p);
[2/3] incubator-beam git commit: Remove JavaDoc links to
beam-examples.
Posted by dh...@apache.org.
Remove JavaDoc links to beam-examples.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d3dc3686
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d3dc3686
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d3dc3686
Branch: refs/heads/master
Commit: d3dc368666d5b700c5026fe08455e1210692ce3e
Parents: 12fb778
Author: Sela <an...@paypal.com>
Authored: Mon Jul 4 15:33:57 2016 +0300
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 22 19:51:50 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/runners/spark/examples/WordCount.java | 3 +--
.../src/test/java/org/apache/beam/runners/spark/TfIdfTest.java | 3 +--
2 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d3dc3686/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index defd635..4951043 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -35,8 +35,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
- * Duplicated from {@link org.apache.beam.examples.WordCount} to avoid dependency on
- * beam-examlpes.
+ * Duplicated to avoid dependency on beam-examples.
*/
public class WordCount {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d3dc3686/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index 4052e65..d1f8d12 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -77,8 +77,7 @@ public class TfIdfTest {
}
/**
- * Duplicated from {@link org.apache.beam.examples.complete.TfIdf} to avoid dependency on
- * beam-examlpes.
+ * Duplicated to avoid dependency on beam-examlpes.
*/
public static class ComputeTfIdf
extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {