You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/06 02:52:29 UTC
[11/51] [abbrv] incubator-beam git commit: Port AutoComplete example
from OldDoFn to DoFn
Port AutoComplete example from OldDoFn to DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3236eec2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3236eec2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3236eec2
Branch: refs/heads/python-sdk
Commit: 3236eec22a8902393e6becefb771b9a4768ccc50
Parents: 49d2f17
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jul 22 14:29:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 3 18:25:53 2016 -0700
----------------------------------------------------------------------
.../beam/examples/complete/AutoComplete.java | 30 ++++++++++----------
1 file changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3236eec2/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index 7b44af8..1ab39c9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -36,9 +36,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Partition;
@@ -130,8 +130,8 @@ public class AutoComplete {
// Map the KV outputs of Count into our own CompletionCandiate class.
.apply("CreateCompletionCandidates", ParDo.of(
- new OldDoFn<KV<String, Long>, CompletionCandidate>() {
- @Override
+ new DoFn<KV<String, Long>, CompletionCandidate>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(new CompletionCandidate(c.element().getKey(), c.element().getValue()));
}
@@ -209,8 +209,8 @@ public class AutoComplete {
}
private static class FlattenTops
- extends OldDoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
- @Override
+ extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+ @ProcessElement
public void processElement(ProcessContext c) {
for (CompletionCandidate cc : c.element().getValue()) {
c.output(cc);
@@ -260,10 +260,10 @@ public class AutoComplete {
}
/**
- * A OldDoFn that keys each candidate by all its prefixes.
+ * A DoFn that keys each candidate by all its prefixes.
*/
private static class AllPrefixes
- extends OldDoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+ extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
private final int minPrefix;
private final int maxPrefix;
public AllPrefixes(int minPrefix) {
@@ -273,8 +273,8 @@ public class AutoComplete {
this.minPrefix = minPrefix;
this.maxPrefix = maxPrefix;
}
- @Override
- public void processElement(ProcessContext c) {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
String word = c.element().value;
for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
c.output(KV.of(word.substring(0, i), c.element()));
@@ -341,8 +341,8 @@ public class AutoComplete {
/**
* Takes as input a set of strings, and emits each #hashtag found therein.
*/
- static class ExtractHashtags extends OldDoFn<String, String> {
- @Override
+ static class ExtractHashtags extends DoFn<String, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
Matcher m = Pattern.compile("#\\S+").matcher(c.element());
while (m.find()) {
@@ -351,8 +351,8 @@ public class AutoComplete {
}
}
- static class FormatForBigquery extends OldDoFn<KV<String, List<CompletionCandidate>>, TableRow> {
- @Override
+ static class FormatForBigquery extends DoFn<KV<String, List<CompletionCandidate>>, TableRow> {
+ @ProcessElement
public void processElement(ProcessContext c) {
List<TableRow> completions = new ArrayList<>();
for (CompletionCandidate cc : c.element().getValue()) {
@@ -385,14 +385,14 @@ public class AutoComplete {
* Takes as input a the top candidates per prefix, and emits an entity
* suitable for writing to Datastore.
*/
- static class FormatForDatastore extends OldDoFn<KV<String, List<CompletionCandidate>>, Entity> {
+ static class FormatForDatastore extends DoFn<KV<String, List<CompletionCandidate>>, Entity> {
private String kind;
public FormatForDatastore(String kind) {
this.kind = kind;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
Entity.Builder entityBuilder = Entity.newBuilder();
Key key = DatastoreHelper.makeKey(kind, c.element().getKey()).build();