You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/23 03:13:42 UTC

[1/3] incubator-beam git commit: Closes #539

Repository: incubator-beam
Updated Branches:
  refs/heads/master 122cd0466 -> cf1464465


Closes #539


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cf146446
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cf146446
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cf146446

Branch: refs/heads/master
Commit: cf1464465e1e65be2e4896cc6f693f8bbe82b844
Parents: 122cd04 d3dc368
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 22 19:51:50 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 22 19:51:50 2016 -0700

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  11 -
 .../beam/runners/spark/examples/WordCount.java  | 137 +++++++++++++
 .../beam/runners/spark/SimpleWordCountTest.java |  76 +------
 .../apache/beam/runners/spark/TfIdfTest.java    | 199 ++++++++++++++++++-
 .../beam/runners/spark/io/NumShardsTest.java    |   2 +-
 .../translation/WindowedWordCountTest.java      |  12 +-
 .../streaming/SimpleStreamingWordCountTest.java |   6 +-
 7 files changed, 353 insertions(+), 90 deletions(-)
----------------------------------------------------------------------



[3/3] incubator-beam git commit: Remove dependency on beam-examples-java.

Posted by dh...@apache.org.
Remove dependency on beam-examples-java.

Duplicate WordCount into spark examlpes package.

Duplicate parts of TfIdf from beam examlpes.

Better reuse of WordCount and its parts.

Remove dependency on beam-examples-java


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/12fb7781
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/12fb7781
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/12fb7781

Branch: refs/heads/master
Commit: 12fb77811d7bf8c72a3c0a48f391bab3c815fbef
Parents: 122cd04
Author: Sela <an...@paypal.com>
Authored: Mon Jun 27 20:59:07 2016 +0300
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 22 19:51:50 2016 -0700

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  11 -
 .../beam/runners/spark/examples/WordCount.java  | 138 +++++++++++++
 .../beam/runners/spark/SimpleWordCountTest.java |  76 +------
 .../apache/beam/runners/spark/TfIdfTest.java    | 200 ++++++++++++++++++-
 .../beam/runners/spark/io/NumShardsTest.java    |   2 +-
 .../translation/WindowedWordCountTest.java      |  12 +-
 .../streaming/SimpleStreamingWordCountTest.java |   6 +-
 7 files changed, 355 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 7f65e16..665f15d 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -222,17 +222,6 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-examples-java</artifactId>
-      <exclusions>
-        <!-- Use Hadoop/Spark's backend logger instead of jdk14 for tests -->
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-jdk14</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
       <groupId>org.apache.avro</groupId>
       <artifactId>avro-mapred</artifactId>
       <version>${avro.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
new file mode 100644
index 0000000..defd635
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.examples;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Duplicated from {@link org.apache.beam.examples.WordCount} to avoid dependency on
+ * beam-examlpes.
+ */
+public class WordCount {
+
+  /**
+   * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out-
+   * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
+   * pipeline.
+   */
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private final Aggregator<Long, Long> emptyLines =
+        createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  /** A SimpleFunction that converts a Word and Count into a printable string. */
+  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
+    @Override
+    public String apply(KV<String, Long> input) {
+      return input.getKey() + ": " + input.getValue();
+    }
+  }
+
+  /**
+   * A PTransform that converts a PCollection containing lines of text into a PCollection of
+   * formatted word counts.
+   *
+   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
+   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
+   * modular testing, and an improved monitoring experience.
+   */
+  public static class CountWords extends PTransform<PCollection<String>,
+      PCollection<KV<String, Long>>> {
+    @Override
+    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+
+      // Convert lines of text into individual words.
+      PCollection<String> words = lines.apply(
+          ParDo.of(new ExtractWordsFn()));
+
+      // Count the number of times each word occurs.
+      PCollection<KV<String, Long>> wordCounts =
+          words.apply(Count.<String>perElement());
+
+      return wordCounts;
+    }
+  }
+
+  /**
+   * Options supported by {@link WordCount}.
+   *
+   * <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments
+   * to be processed by the command-line parser, and specify default values for them. You can then
+   * access the options values in your pipeline code.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  public static interface WordCountOptions extends PipelineOptions {
+    @Description("Path of the file to read from")
+    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
+    String getInputFile();
+    void setInputFile(String value);
+
+    @Description("Path of the file to write to")
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  public static void main(String[] args) {
+    WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+      .as(WordCountOptions.class);
+    Pipeline p = Pipeline.create(options);
+
+    // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
+    // static FormatAsTextFn() to the ParDo transform.
+    //TODO: remove withoutValidation once possible
+    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()).withoutValidation())
+     .apply(new CountWords())
+     .apply(MapElements.via(new FormatAsTextFn()))
+     .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 6a3edd7..6f5ce5e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -21,19 +21,14 @@ package org.apache.beam.runners.spark;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
 
 import com.google.common.collect.ImmutableSet;
@@ -48,7 +43,6 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import java.util.regex.Pattern;
 
 /**
  * Simple word count test.
@@ -68,7 +62,8 @@ public class SimpleWordCountTest {
     Pipeline p = Pipeline.create(options);
     PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
         .of()));
-    PCollection<String> output = inputWords.apply(new CountWords());
+    PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
 
@@ -86,7 +81,8 @@ public class SimpleWordCountTest {
     Pipeline p = Pipeline.create(options);
     PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
         .of()));
-    PCollection<String> output = inputWords.apply(new CountWords());
+    PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
 
     File outputFile = testFolder.newFile();
     output.apply("WriteCounts", TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());
@@ -97,64 +93,4 @@ public class SimpleWordCountTest {
     assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)),
         containsInAnyOrder(EXPECTED_COUNT_SET.toArray()));
   }
-
-  /**
-   * A DoFn that tokenizes lines of text into individual words.
-   */
-  static class ExtractWordsFn extends DoFn<String, String> {
-    private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
-
-    @Override
-    public void processElement(ProcessContext c) {
-      // Split the line into words.
-      String[] words = WORD_BOUNDARY.split(c.element());
-
-      // Keep track of the number of lines without any words encountered while tokenizing.
-      // This aggregator is visible in the monitoring UI when run using DataflowRunner.
-      if (words.length == 0) {
-        emptyLines.addValue(1L);
-      }
-
-      // Output each word encountered into the output PCollection.
-      for (String word : words) {
-        if (!word.isEmpty()) {
-          c.output(word);
-        }
-      }
-    }
-  }
-
-  /**
-   * A DoFn that converts a Word and Count into a printable string.
-   */
-  private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
-    @Override
-    public void processElement(ProcessContext c) {
-      c.output(c.element().getKey() + ": " + c.element().getValue());
-    }
-  }
-
-  /**
-   * A {@link PTransform} counting words.
-   */
-  public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> {
-    @Override
-    public PCollection<String> apply(PCollection<String> lines) {
-
-      // Convert lines of text into individual words.
-      PCollection<String> words = lines.apply(
-          ParDo.of(new ExtractWordsFn()));
-
-      // Count the number of times each word occurs.
-      PCollection<KV<String, Long>> wordCounts =
-          words.apply(Count.<String>perElement());
-
-      // Format each word and count into a printable string.
-
-      return wordCounts.apply(ParDo.of(new FormatCountsFn()));
-    }
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index df78338..4052e65 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -18,18 +18,30 @@
 
 package org.apache.beam.runners.spark;
 
-import org.apache.beam.examples.complete.TfIdf;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringDelegateCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Keys;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.util.Arrays;
@@ -52,7 +64,7 @@ public class TfIdfTest {
             KV.of(new URI("x"), "a b c d"),
             KV.of(new URI("y"), "a b c"),
             KV.of(new URI("z"), "a m n")))
-        .apply(new TfIdf.ComputeTfIdf());
+        .apply(new ComputeTfIdf());
 
     PCollection<String> words = wordToUriAndTfIdf
         .apply(Keys.<String>create())
@@ -64,4 +76,188 @@ public class TfIdfTest {
     res.close();
   }
 
+  /**
+   * Duplicated from {@link org.apache.beam.examples.complete.TfIdf} to avoid dependency on
+   * beam-examlpes.
+   */
+  public static class ComputeTfIdf
+      extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {
+    public ComputeTfIdf() { }
+
+    @Override
+    public PCollection<KV<String, KV<URI, Double>>> apply(
+      PCollection<KV<URI, String>> uriToContent) {
+
+      // Compute the total number of documents, and
+      // prepare this singleton PCollectionView for
+      // use as a side input.
+      final PCollectionView<Long> totalDocuments =
+          uriToContent
+          .apply("GetURIs", Keys.<URI>create())
+          .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
+          .apply(Count.<URI>globally())
+          .apply(View.<Long>asSingleton());
+
+      // Create a collection of pairs mapping a URI to each
+      // of the words in the document associated with that that URI.
+      PCollection<KV<URI, String>> uriToWords = uriToContent
+          .apply("SplitWords", ParDo.of(
+              new DoFn<KV<URI, String>, KV<URI, String>>() {
+                @Override
+                public void processElement(ProcessContext c) {
+                  URI uri = c.element().getKey();
+                  String line = c.element().getValue();
+                  for (String word : line.split("\\W+")) {
+                    // Log INFO messages when the word \u201clove\u201d is found.
+                    if (word.toLowerCase().equals("love")) {
+                      LOG.info("Found {}", word.toLowerCase());
+                    }
+
+                    if (!word.isEmpty()) {
+                      c.output(KV.of(uri, word.toLowerCase()));
+                    }
+                  }
+                }
+              }));
+
+      // Compute a mapping from each word to the total
+      // number of documents in which it appears.
+      PCollection<KV<String, Long>> wordToDocCount = uriToWords
+          .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
+          .apply(Values.<String>create())
+          .apply("CountDocs", Count.<String>perElement());
+
+      // Compute a mapping from each URI to the total
+      // number of words in the document associated with that URI.
+      PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
+          .apply("GetURIs2", Keys.<URI>create())
+          .apply("CountWords", Count.<URI>perElement());
+
+      // Count, for each (URI, word) pair, the number of
+      // occurrences of that word in the document associated
+      // with the URI.
+      PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords
+          .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement());
+
+      // Adjust the above collection to a mapping from
+      // (URI, word) pairs to counts into an isomorphic mapping
+      // from URI to (word, count) pairs, to prepare for a join
+      // by the URI key.
+      PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
+          .apply("ShiftKeys", ParDo.of(
+              new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+                @Override
+                public void processElement(ProcessContext c) {
+                  URI uri = c.element().getKey().getKey();
+                  String word = c.element().getKey().getValue();
+                  Long occurrences = c.element().getValue();
+                  c.output(KV.of(uri, KV.of(word, occurrences)));
+                }
+              }));
+
+      // Prepare to join the mapping of URI to (word, count) pairs with
+      // the mapping of URI to total word counts, by associating
+      // each of the input PCollection<KV<URI, ...>> with
+      // a tuple tag. Each input must have the same key type, URI
+      // in this case. The type parameter of the tuple tag matches
+      // the types of the values for each collection.
+      final TupleTag<Long> wordTotalsTag = new TupleTag<Long>();
+      final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<KV<String, Long>>();
+      KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple
+          .of(wordTotalsTag, uriToWordTotal)
+          .and(wordCountsTag, uriToWordAndCount);
+
+      // Perform a CoGroupByKey (a sort of pre-join) on the prepared
+      // inputs. This yields a mapping from URI to a CoGbkResult
+      // (CoGroupByKey Result). The CoGbkResult is a mapping
+      // from the above tuple tags to the values in each input
+      // associated with a particular URI. In this case, each
+      // KV<URI, CoGbkResult> group a URI with the total number of
+      // words in that document as well as all the (word, count)
+      // pairs for particular words.
+      PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput
+          .apply("CoGroupByUri", CoGroupByKey.<URI>create());
+
+      // Compute a mapping from each word to a (URI, term frequency)
+      // pair for each URI. A word's term frequency for a document
+      // is simply the number of times that word occurs in the document
+      // divided by the total number of words in the document.
+      PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
+          .apply("ComputeTermFrequencies", ParDo.of(
+              new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+                @Override
+                public void processElement(ProcessContext c) {
+                  URI uri = c.element().getKey();
+                  Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
+
+                  for (KV<String, Long> wordAndCount
+                           : c.element().getValue().getAll(wordCountsTag)) {
+                    String word = wordAndCount.getKey();
+                    Long wordCount = wordAndCount.getValue();
+                    Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
+                    c.output(KV.of(word, KV.of(uri, termFrequency)));
+                  }
+                }
+              }));
+
+      // Compute a mapping from each word to its document frequency.
+      // A word's document frequency in a corpus is the number of
+      // documents in which the word appears divided by the total
+      // number of documents in the corpus. Note how the total number of
+      // documents is passed as a side input; the same value is
+      // presented to each invocation of the DoFn.
+      PCollection<KV<String, Double>> wordToDf = wordToDocCount
+          .apply("ComputeDocFrequencies", ParDo
+              .withSideInputs(totalDocuments)
+              .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+                @Override
+                public void processElement(ProcessContext c) {
+                  String word = c.element().getKey();
+                  Long documentCount = c.element().getValue();
+                  Long documentTotal = c.sideInput(totalDocuments);
+                  Double documentFrequency = documentCount.doubleValue()
+                      / documentTotal.doubleValue();
+
+                  c.output(KV.of(word, documentFrequency));
+                }
+              }));
+
+      // Join the term frequency and document frequency
+      // collections, each keyed on the word.
+      final TupleTag<KV<URI, Double>> tfTag = new TupleTag<KV<URI, Double>>();
+      final TupleTag<Double> dfTag = new TupleTag<Double>();
+      PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple
+          .of(tfTag, wordToUriAndTf)
+          .and(dfTag, wordToDf)
+          .apply(CoGroupByKey.<String>create());
+
+      // Compute a mapping from each word to a (URI, TF-IDF) score
+      // for each URI. There are a variety of definitions of TF-IDF
+      // ("term frequency - inverse document frequency") score;
+      // here we use a basic version that is the term frequency
+      // divided by the log of the document frequency.
+      return wordToUriAndTfAndDf
+          .apply("ComputeTfIdf", ParDo.of(
+              new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+                @Override
+                public void processElement(ProcessContext c) {
+                  String word = c.element().getKey();
+                  Double df = c.element().getValue().getOnly(dfTag);
+
+                  for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
+                    URI uri = uriAndTf.getKey();
+                    Double tf = uriAndTf.getValue();
+                    Double tfIdf = tf * Math.log(1 / df);
+                    c.output(KV.of(word, KV.of(uri, tfIdf)));
+                  }
+                }
+              }));
+    }
+
+    // Instantiate Logger.
+    // It is suggested that the user specify the class name of the containing class
+    // (in this case ComputeTfIdf).
+    private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index b4268d6..36d8b67 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -21,10 +21,10 @@ package org.apache.beam.runners.spark.io;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.beam.examples.WordCount;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index 043d506..b70e090 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -19,14 +19,15 @@
 package org.apache.beam.runners.spark.translation;
 
 import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SimpleWordCountTest;
 import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -64,7 +65,8 @@ public class WindowedWordCountTest {
     PCollection<String> windowedWords =
         inputWords.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
 
-    PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+    PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET);
 
@@ -85,7 +87,8 @@ public class WindowedWordCountTest {
     PCollection<String> windowedWords = inputWords
         .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))));
 
-    PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+    PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET);
 
@@ -108,7 +111,8 @@ public class WindowedWordCountTest {
         .apply(Window.<String>into(SlidingWindows.of(Duration.standardMinutes(2))
         .every(Duration.standardMinutes(1))));
 
-    PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+    PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12fb7781/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 75a702b..75ab274 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -19,14 +19,15 @@ package org.apache.beam.runners.spark.translation.streaming;
 
 
 import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SimpleWordCountTest;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
+import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
@@ -65,7 +66,8 @@ public class SimpleStreamingWordCountTest implements Serializable {
     PCollection<String> windowedWords = inputWords
         .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
 
-    PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+    PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
 
     PAssertStreaming.assertContents(output, EXPECTED_COUNTS);
     EvaluationResult res = SparkRunner.create(options).run(p);


[2/3] incubator-beam git commit: Remove JavaDoc links to beam-examples.

Posted by dh...@apache.org.
Remove JavaDoc links to beam-examples.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d3dc3686
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d3dc3686
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d3dc3686

Branch: refs/heads/master
Commit: d3dc368666d5b700c5026fe08455e1210692ce3e
Parents: 12fb778
Author: Sela <an...@paypal.com>
Authored: Mon Jul 4 15:33:57 2016 +0300
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 22 19:51:50 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/spark/examples/WordCount.java    | 3 +--
 .../src/test/java/org/apache/beam/runners/spark/TfIdfTest.java    | 3 +--
 2 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d3dc3686/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index defd635..4951043 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -35,8 +35,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
- * Duplicated from {@link org.apache.beam.examples.WordCount} to avoid dependency on
- * beam-examlpes.
+ * Duplicated to avoid dependency on beam-examples.
  */
 public class WordCount {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d3dc3686/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index 4052e65..d1f8d12 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -77,8 +77,7 @@ public class TfIdfTest {
   }
 
   /**
-   * Duplicated from {@link org.apache.beam.examples.complete.TfIdf} to avoid dependency on
-   * beam-examlpes.
+   * Duplicated to avoid dependency on beam-examlpes.
    */
   public static class ComputeTfIdf
       extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {