You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:55:16 UTC
[02/50] incubator-beam git commit: Port Flink fork of examples to new
DoFn
Port Flink fork of examples to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87313f1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87313f1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87313f1c
Branch: refs/heads/gearpump-runner
Commit: 87313f1c3d8cf874e04aaf528161478afa030f38
Parents: ae1f6d1
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:24:24 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../beam/runners/flink/examples/TFIDF.java | 28 +++++++--------
.../beam/runners/flink/examples/WordCount.java | 5 +--
.../flink/examples/streaming/AutoComplete.java | 37 ++++++++++----------
.../flink/examples/streaming/JoinExamples.java | 14 ++++----
.../examples/streaming/KafkaIOExamples.java | 7 ++--
.../KafkaWindowedWordCountExample.java | 10 +++---
.../examples/streaming/WindowedWordCount.java | 10 +++---
7 files changed, 57 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 716c8ad..4deca12 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.PTransform;
@@ -230,10 +230,10 @@ public class TFIDF {
// Create a collection of pairs mapping a URI to each
// of the words in the document associated with that that URI.
PCollection<KV<URI, String>> uriToWords = uriToContent
- .apply("SplitWords", ParDo.of(new OldDoFn<KV<URI, String>, KV<URI, String>>() {
+ .apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
URI uri = c.element().getKey();
String line = c.element().getValue();
@@ -275,10 +275,10 @@ public class TFIDF {
// by the URI key.
PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
.apply("ShiftKeys", ParDo.of(
- new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+ new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
URI uri = c.element().getKey().getKey();
String word = c.element().getKey().getValue();
@@ -316,10 +316,10 @@ public class TFIDF {
// divided by the total number of words in the document.
PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
.apply("ComputeTermFrequencies", ParDo.of(
- new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
URI uri = c.element().getKey();
Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
@@ -339,14 +339,14 @@ public class TFIDF {
// documents in which the word appears divided by the total
// number of documents in the corpus. Note how the total number of
// documents is passed as a side input; the same value is
- // presented to each invocation of the OldDoFn.
+ // presented to each invocation of the DoFn.
PCollection<KV<String, Double>> wordToDf = wordToDocCount
.apply("ComputeDocFrequencies", ParDo
.withSideInputs(totalDocuments)
- .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() {
+ .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
String word = c.element().getKey();
Long documentCount = c.element().getValue();
@@ -375,10 +375,10 @@ public class TFIDF {
return wordToUriAndTfAndDf
.apply("ComputeTfIdf", ParDo.of(
- new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
String word = c.element().getKey();
Double df = c.element().getValue().getOnly(dfTag);
@@ -416,10 +416,10 @@ public class TFIDF {
@Override
public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
return wordToUriAndTfIdf
- .apply("Format", ParDo.of(new OldDoFn<KV<String, KV<URI, Double>>, String>() {
+ .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(String.format("%s,\t%s,\t%f",
c.element().getKey(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index 080cdc9..fdffd39 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
@@ -38,11 +39,11 @@ import org.apache.beam.sdk.values.PCollection;
public class WordCount {
- public static class ExtractWordsFn extends OldDoFn<String, String> {
+ public static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 068404a..aff1a35 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -92,10 +93,10 @@ public class AutoComplete {
// Map the KV outputs of Count into our own CompletionCandiate class.
.apply("CreateCompletionCandidates", ParDo.of(
- new OldDoFn<KV<String, Long>, CompletionCandidate>() {
+ new DoFn<KV<String, Long>, CompletionCandidate>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue());
c.output(cand);
@@ -182,10 +183,10 @@ public class AutoComplete {
}
private static class FlattenTops
- extends OldDoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+ extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
for (CompletionCandidate cc : c.element().getValue()) {
c.output(cc);
@@ -236,10 +237,10 @@ public class AutoComplete {
}
/**
- * A OldDoFn that keys each candidate by all its prefixes.
+ * A DoFn that keys each candidate by all its prefixes.
*/
private static class AllPrefixes
- extends OldDoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+ extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
private static final long serialVersionUID = 0;
private final int minPrefix;
@@ -251,7 +252,7 @@ public class AutoComplete {
this.minPrefix = minPrefix;
this.maxPrefix = maxPrefix;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
String word = c.element().value;
for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
@@ -314,11 +315,11 @@ public class AutoComplete {
}
}
- static class ExtractWordsFn extends OldDoFn<String, String> {
+ static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
@@ -337,21 +338,21 @@ public class AutoComplete {
}
/**
- * Takes as input a the top candidates per prefix, and emits an entity
- * suitable for writing to Datastore.
+ * Takes as input a the top candidates per prefix, and emits an entity suitable for writing to
+ * Datastore.
*/
- static class FormatForPerTaskLocalFile extends OldDoFn<KV<String, List<CompletionCandidate>>, String>
- implements OldDoFn.RequiresWindowAccess{
+ static class FormatForPerTaskLocalFile
+ extends DoFn<KV<String, List<CompletionCandidate>>, String> {
private static final long serialVersionUID = 0;
- @Override
- public void processElement(ProcessContext c) {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
StringBuilder str = new StringBuilder();
KV<String, List<CompletionCandidate>> elem = c.element();
- str.append(elem.getKey() +" @ "+ c.window() +" -> ");
- for(CompletionCandidate cand: elem.getValue()) {
+ str.append(elem.getKey() +" @ "+ window +" -> ");
+ for (CompletionCandidate cand: elem.getValue()) {
str.append(cand.toString() + " ");
}
System.out.println(str.toString());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index 7d7c0c7..458a263 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -76,10 +76,10 @@ public class JoinExamples {
// country code 'key' -> string of <event info>, <country name>
PCollection<KV<String, String>> finalResultCollection =
kvpCollection.apply("Process", ParDo.of(
- new OldDoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+ new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
String key = e.getKey();
@@ -98,10 +98,10 @@ public class JoinExamples {
}));
return finalResultCollection
- .apply("Format", ParDo.of(new OldDoFn<KV<String, String>, String>() {
+ .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
String result = c.element().getKey() + " -> " + c.element().getValue();
System.out.println(result);
@@ -110,10 +110,10 @@ public class JoinExamples {
}));
}
- static class ExtractEventDataFn extends OldDoFn<String, KV<String, String>> {
+ static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
String line = c.element().toLowerCase();
String key = line.split("\\s")[0];
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
index 395b409..68a9edc 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -30,9 +30,10 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
+
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
@@ -326,9 +327,9 @@ public class KafkaIOExamples {
* Print contents to stdout
* @param <T> type of the input
*/
- private static class PrintFn<T> extends OldDoFn<T, T> {
+ private static class PrintFn<T> extends DoFn<T, T> {
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
System.out.println(c.element().toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 8c31783..39ce225 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -49,11 +49,11 @@ public class KafkaWindowedWordCountExample {
static final String GROUP_ID = "myGroup"; // Default groupId
static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka
- public static class ExtractWordsFn extends OldDoFn<String, String> {
+ public static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
@@ -71,8 +71,8 @@ public class KafkaWindowedWordCountExample {
}
}
- public static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
- @Override
+ public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
System.out.println(row);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index d149e4e..fe8e627 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -59,19 +59,19 @@ public class WindowedWordCount {
static final long WINDOW_SIZE = 10; // Default window duration in seconds
static final long SLIDE_SIZE = 5; // Default window slide in seconds
- static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
- @Override
+ static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
c.output(row);
}
}
- static class ExtractWordsFn extends OldDoFn<String, String> {
+ static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);