You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 01:26:24 UTC

[16/19] incubator-beam git commit: Port AutoComplete example from OldDoFn to DoFn

Port AutoComplete example from OldDoFn to DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3236eec2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3236eec2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3236eec2

Branch: refs/heads/master
Commit: 3236eec22a8902393e6becefb771b9a4768ccc50
Parents: 49d2f17
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jul 22 14:29:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 3 18:25:53 2016 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/AutoComplete.java    | 30 ++++++++++----------
 1 file changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3236eec2/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index 7b44af8..1ab39c9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -36,9 +36,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Partition;
@@ -130,8 +130,8 @@ public class AutoComplete {
 
         // Map the KV outputs of Count into our own CompletionCandiate class.
         .apply("CreateCompletionCandidates", ParDo.of(
-            new OldDoFn<KV<String, Long>, CompletionCandidate>() {
-              @Override
+            new DoFn<KV<String, Long>, CompletionCandidate>() {
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 c.output(new CompletionCandidate(c.element().getKey(), c.element().getValue()));
               }
@@ -209,8 +209,8 @@ public class AutoComplete {
     }
 
     private static class FlattenTops
-        extends OldDoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
-      @Override
+        extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+      @ProcessElement
       public void processElement(ProcessContext c) {
         for (CompletionCandidate cc : c.element().getValue()) {
           c.output(cc);
@@ -260,10 +260,10 @@ public class AutoComplete {
   }
 
   /**
-   * A OldDoFn that keys each candidate by all its prefixes.
+   * A DoFn that keys each candidate by all its prefixes.
    */
   private static class AllPrefixes
-      extends OldDoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+      extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
     private final int minPrefix;
     private final int maxPrefix;
     public AllPrefixes(int minPrefix) {
@@ -273,8 +273,8 @@ public class AutoComplete {
       this.minPrefix = minPrefix;
       this.maxPrefix = maxPrefix;
     }
-    @Override
-      public void processElement(ProcessContext c) {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
       String word = c.element().value;
       for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
         c.output(KV.of(word.substring(0, i), c.element()));
@@ -341,8 +341,8 @@ public class AutoComplete {
   /**
    * Takes as input a set of strings, and emits each #hashtag found therein.
    */
-  static class ExtractHashtags extends OldDoFn<String, String> {
-    @Override
+  static class ExtractHashtags extends DoFn<String, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       Matcher m = Pattern.compile("#\\S+").matcher(c.element());
       while (m.find()) {
@@ -351,8 +351,8 @@ public class AutoComplete {
     }
   }
 
-  static class FormatForBigquery extends OldDoFn<KV<String, List<CompletionCandidate>>, TableRow> {
-    @Override
+  static class FormatForBigquery extends DoFn<KV<String, List<CompletionCandidate>>, TableRow> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       List<TableRow> completions = new ArrayList<>();
       for (CompletionCandidate cc : c.element().getValue()) {
@@ -385,14 +385,14 @@ public class AutoComplete {
    * Takes as input a the top candidates per prefix, and emits an entity
    * suitable for writing to Datastore.
    */
-  static class FormatForDatastore extends OldDoFn<KV<String, List<CompletionCandidate>>, Entity> {
+  static class FormatForDatastore extends DoFn<KV<String, List<CompletionCandidate>>, Entity> {
     private String kind;
 
     public FormatForDatastore(String kind) {
       this.kind = kind;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       Entity.Builder entityBuilder = Entity.newBuilder();
       Key key = DatastoreHelper.makeKey(kind, c.element().getKey()).build();