You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:55:16 UTC

[02/50] incubator-beam git commit: Port Flink fork of examples to new DoFn

Port Flink fork of examples to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87313f1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87313f1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87313f1c

Branch: refs/heads/gearpump-runner
Commit: 87313f1c3d8cf874e04aaf528161478afa030f38
Parents: ae1f6d1
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:24:24 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/flink/examples/TFIDF.java      | 28 +++++++--------
 .../beam/runners/flink/examples/WordCount.java  |  5 +--
 .../flink/examples/streaming/AutoComplete.java  | 37 ++++++++++----------
 .../flink/examples/streaming/JoinExamples.java  | 14 ++++----
 .../examples/streaming/KafkaIOExamples.java     |  7 ++--
 .../KafkaWindowedWordCountExample.java          | 10 +++---
 .../examples/streaming/WindowedWordCount.java   | 10 +++---
 7 files changed, 57 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 716c8ad..4deca12 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -230,10 +230,10 @@ public class TFIDF {
       // Create a collection of pairs mapping a URI to each
       // of the words in the document associated with that that URI.
       PCollection<KV<URI, String>> uriToWords = uriToContent
-          .apply("SplitWords", ParDo.of(new OldDoFn<KV<URI, String>, KV<URI, String>>() {
+          .apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() {
             private static final long serialVersionUID = 0;
 
-            @Override
+            @ProcessElement
             public void processElement(ProcessContext c) {
               URI uri = c.element().getKey();
               String line = c.element().getValue();
@@ -275,10 +275,10 @@ public class TFIDF {
       // by the URI key.
       PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
           .apply("ShiftKeys", ParDo.of(
-              new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+              new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
                 private static final long serialVersionUID = 0;
 
-                @Override
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey().getKey();
                   String word = c.element().getKey().getValue();
@@ -316,10 +316,10 @@ public class TFIDF {
       // divided by the total number of words in the document.
       PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
           .apply("ComputeTermFrequencies", ParDo.of(
-              new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+              new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
                 private static final long serialVersionUID = 0;
 
-                @Override
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey();
                   Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
@@ -339,14 +339,14 @@ public class TFIDF {
       // documents in which the word appears divided by the total
       // number of documents in the corpus. Note how the total number of
       // documents is passed as a side input; the same value is
-      // presented to each invocation of the OldDoFn.
+      // presented to each invocation of the DoFn.
       PCollection<KV<String, Double>> wordToDf = wordToDocCount
           .apply("ComputeDocFrequencies", ParDo
               .withSideInputs(totalDocuments)
-              .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() {
+              .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
                 private static final long serialVersionUID = 0;
 
-                @Override
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   String word = c.element().getKey();
                   Long documentCount = c.element().getValue();
@@ -375,10 +375,10 @@ public class TFIDF {
 
       return wordToUriAndTfAndDf
           .apply("ComputeTfIdf", ParDo.of(
-              new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+              new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
                 private static final long serialVersionUID = 0;
 
-                @Override
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   String word = c.element().getKey();
                   Double df = c.element().getValue().getOnly(dfTag);
@@ -416,10 +416,10 @@ public class TFIDF {
     @Override
     public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
       return wordToUriAndTfIdf
-          .apply("Format", ParDo.of(new OldDoFn<KV<String, KV<URI, Double>>, String>() {
+          .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
             private static final long serialVersionUID = 0;
 
-            @Override
+            @ProcessElement
             public void processElement(ProcessContext c) {
               c.output(String.format("%s,\t%s,\t%f",
                   c.element().getKey(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index 080cdc9..fdffd39 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -38,11 +39,11 @@ import org.apache.beam.sdk.values.PCollection;
 
 public class WordCount {
 
-  public static class ExtractWordsFn extends OldDoFn<String, String> {
+  public static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
         emptyLines.addValue(1L);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 068404a..aff1a35 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -92,10 +93,10 @@ public class AutoComplete {
 
         // Map the KV outputs of Count into our own CompletionCandiate class.
         .apply("CreateCompletionCandidates", ParDo.of(
-            new OldDoFn<KV<String, Long>, CompletionCandidate>() {
+            new DoFn<KV<String, Long>, CompletionCandidate>() {
               private static final long serialVersionUID = 0;
 
-              @Override
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue());
                 c.output(cand);
@@ -182,10 +183,10 @@ public class AutoComplete {
     }
 
     private static class FlattenTops
-        extends OldDoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+        extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
       private static final long serialVersionUID = 0;
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) {
         for (CompletionCandidate cc : c.element().getValue()) {
           c.output(cc);
@@ -236,10 +237,10 @@ public class AutoComplete {
   }
 
   /**
-   * A OldDoFn that keys each candidate by all its prefixes.
+   * A DoFn that keys each candidate by all its prefixes.
    */
   private static class AllPrefixes
-      extends OldDoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+      extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
     private static final long serialVersionUID = 0;
 
     private final int minPrefix;
@@ -251,7 +252,7 @@ public class AutoComplete {
       this.minPrefix = minPrefix;
       this.maxPrefix = maxPrefix;
     }
-    @Override
+    @ProcessElement
       public void processElement(ProcessContext c) {
       String word = c.element().value;
       for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
@@ -314,11 +315,11 @@ public class AutoComplete {
     }
   }
 
-  static class ExtractWordsFn extends OldDoFn<String, String> {
+  static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
             createAggregator("emptyLines", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
         emptyLines.addValue(1L);
@@ -337,21 +338,21 @@ public class AutoComplete {
   }
 
   /**
-   * Takes as input a the top candidates per prefix, and emits an entity
-   * suitable for writing to Datastore.
+   * Takes as input a the top candidates per prefix, and emits an entity suitable for writing to
+   * Datastore.
    */
-  static class FormatForPerTaskLocalFile extends OldDoFn<KV<String, List<CompletionCandidate>>, String>
-          implements OldDoFn.RequiresWindowAccess{
+  static class FormatForPerTaskLocalFile
+      extends DoFn<KV<String, List<CompletionCandidate>>, String> {
 
     private static final long serialVersionUID = 0;
 
-    @Override
-    public void processElement(ProcessContext c) {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
       StringBuilder str = new StringBuilder();
       KV<String, List<CompletionCandidate>> elem = c.element();
 
-      str.append(elem.getKey() +" @ "+ c.window() +" -> ");
-      for(CompletionCandidate cand: elem.getValue()) {
+      str.append(elem.getKey() +" @ "+ window +" -> ");
+      for (CompletionCandidate cand: elem.getValue()) {
         str.append(cand.toString() + " ");
       }
       System.out.println(str.toString());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index 7d7c0c7..458a263 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -76,10 +76,10 @@ public class JoinExamples {
     // country code 'key' -> string of <event info>, <country name>
     PCollection<KV<String, String>> finalResultCollection =
         kvpCollection.apply("Process", ParDo.of(
-            new OldDoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+            new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
               private static final long serialVersionUID = 0;
 
-              @Override
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 KV<String, CoGbkResult> e = c.element();
                 String key = e.getKey();
@@ -98,10 +98,10 @@ public class JoinExamples {
             }));
 
     return finalResultCollection
-        .apply("Format", ParDo.of(new OldDoFn<KV<String, String>, String>() {
+        .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
           private static final long serialVersionUID = 0;
 
-          @Override
+          @ProcessElement
           public void processElement(ProcessContext c) {
             String result = c.element().getKey() + " -> " + c.element().getValue();
             System.out.println(result);
@@ -110,10 +110,10 @@ public class JoinExamples {
         }));
   }
 
-  static class ExtractEventDataFn extends OldDoFn<String, KV<String, String>> {
+  static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
     private static final long serialVersionUID = 0;
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String line = c.element().toLowerCase();
       String key = line.split("\\s")[0];

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
index 395b409..68a9edc 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -30,9 +30,10 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
@@ -326,9 +327,9 @@ public class KafkaIOExamples {
    * Print contents to stdout
    * @param <T> type of the input
    */
-  private static class PrintFn<T> extends OldDoFn<T, T> {
+  private static class PrintFn<T> extends DoFn<T, T> {
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       System.out.println(c.element().toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 8c31783..39ce225 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -49,11 +49,11 @@ public class KafkaWindowedWordCountExample {
   static final String GROUP_ID = "myGroup";  // Default groupId
   static final String ZOOKEEPER = "localhost:2181";  // Default zookeeper to connect to for Kafka
 
-  public static class ExtractWordsFn extends OldDoFn<String, String> {
+  public static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
         emptyLines.addValue(1L);
@@ -71,8 +71,8 @@ public class KafkaWindowedWordCountExample {
     }
   }
 
-  public static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
-    @Override
+  public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
       System.out.println(row);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index d149e4e..fe8e627 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -59,19 +59,19 @@ public class WindowedWordCount {
   static final long WINDOW_SIZE = 10;  // Default window duration in seconds
   static final long SLIDE_SIZE = 5;  // Default window slide in seconds
 
-  static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
-    @Override
+  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
       c.output(row);
     }
   }
 
-  static class ExtractWordsFn extends OldDoFn<String, String> {
+  static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
         emptyLines.addValue(1L);