You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/06 02:52:31 UTC

[13/51] [abbrv] incubator-beam git commit: Port WindowedWordCount example from OldDoFn to DoFn

Port WindowedWordCount example from OldDoFn to DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ca9e3372
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ca9e3372
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ca9e3372

Branch: refs/heads/python-sdk
Commit: ca9e337203208c7c5876f0710fb3a45430a5b3a8
Parents: 4ceec0e
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jul 22 14:29:01 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 3 18:25:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/examples/WindowedWordCount.java   | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca9e3372/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 17f7da3..842cb54 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -103,14 +103,14 @@ public class WindowedWordCount {
     static final int WINDOW_SIZE = 1;  // Default window duration in minutes
 
   /**
-   * Concept #2: A OldDoFn that sets the data element timestamp. This is a silly method, just for
+   * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for
    * this example, for the bounded data case.
    *
    * <p>Imagine that many ghosts of Shakespeare are all typing madly at the same time to recreate
    * his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a
    * 2-hour period.
    */
-  static class AddTimestampFn extends OldDoFn<String, String> {
+  static class AddTimestampFn extends DoFn<String, String> {
     private static final Duration RAND_RANGE = Duration.standardHours(2);
     private final Instant minTimestamp;
 
@@ -118,7 +118,7 @@ public class WindowedWordCount {
       this.minTimestamp = new Instant(System.currentTimeMillis());
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       // Generate a timestamp that falls somewhere in the past two hours.
       long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
@@ -130,9 +130,9 @@ public class WindowedWordCount {
     }
   }
 
-  /** A OldDoFn that converts a Word and Count into a BigQuery table row. */
-  static class FormatAsTableRowFn extends OldDoFn<KV<String, Long>, TableRow> {
-    @Override
+  /** A DoFn that converts a Word and Count into a BigQuery table row. */
+  static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       TableRow row = new TableRow()
           .set("word", c.element().getKey())