You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 01:26:23 UTC
[15/19] incubator-beam git commit: Port WindowedWordCount example
from OldDoFn to DoFn
Port WindowedWordCount example from OldDoFn to DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ca9e3372
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ca9e3372
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ca9e3372
Branch: refs/heads/master
Commit: ca9e337203208c7c5876f0710fb3a45430a5b3a8
Parents: 4ceec0e
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jul 22 14:29:01 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 3 18:25:53 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/examples/WindowedWordCount.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca9e3372/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 17f7da3..842cb54 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -103,14 +103,14 @@ public class WindowedWordCount {
static final int WINDOW_SIZE = 1; // Default window duration in minutes
/**
- * Concept #2: A OldDoFn that sets the data element timestamp. This is a silly method, just for
+ * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for
* this example, for the bounded data case.
*
* <p>Imagine that many ghosts of Shakespeare are all typing madly at the same time to recreate
* his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a
* 2-hour period.
*/
- static class AddTimestampFn extends OldDoFn<String, String> {
+ static class AddTimestampFn extends DoFn<String, String> {
private static final Duration RAND_RANGE = Duration.standardHours(2);
private final Instant minTimestamp;
@@ -118,7 +118,7 @@ public class WindowedWordCount {
this.minTimestamp = new Instant(System.currentTimeMillis());
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
// Generate a timestamp that falls somewhere in the past two hours.
long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
@@ -130,9 +130,9 @@ public class WindowedWordCount {
}
}
- /** A OldDoFn that converts a Word and Count into a BigQuery table row. */
- static class FormatAsTableRowFn extends OldDoFn<KV<String, Long>, TableRow> {
- @Override
+ /** A DoFn that converts a Word and Count into a BigQuery table row. */
+ static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> {
+ @ProcessElement
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
.set("word", c.element().getKey())