You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/06/30 12:20:10 UTC
git commit: Minor changes in Java POJO WordCount example
Repository: incubator-flink
Updated Branches:
refs/heads/master 0902829e4 -> 504228011
Minor changes in Java POJO WordCount example
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/50422801
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/50422801
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/50422801
Branch: refs/heads/master
Commit: 504228011f9e7cf339bffa64f0e18b7bd537787b
Parents: 0902829
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Jun 30 12:19:27 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jun 30 12:19:27 2014 +0200
----------------------------------------------------------------------
.../example/java/wordcount/WordCountPOJO.java | 79 +++++++++-----------
1 file changed, 35 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/50422801/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountPOJO.java
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountPOJO.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountPOJO.java
index 27142e9..90f3dee 100644
--- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountPOJO.java
+++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountPOJO.java
@@ -18,7 +18,6 @@ import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.ReduceFunction;
-import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.example.java.wordcount.util.WordCountData;
import eu.stratosphere.util.Collector;
@@ -49,24 +48,6 @@ public class WordCountPOJO {
// PROGRAM
// *************************************************************************
- public static class WC {
- public String word;
- public int count;
-
- public WC() {
- }
-
- public WC(String word, int count) {
- this.word = word;
- this.count = count;
- }
-
- @Override
- public String toString() {
- return word + " " + count;
- }
- }
-
public static void main(String[] args) throws Exception {
parseParameters(args);
@@ -76,26 +57,14 @@ public class WordCountPOJO {
// get input data
DataSet<String> text = getTextDataSet(env);
- DataSet<WC> tokenized = text.flatMap(new FlatMapFunction<String, WC>() {
- @Override
- public void flatMap(String value, Collector<WC> out) {
- String[] tokens = value.toLowerCase().split("\\W+");
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new WC(token, 1));
- }
- }
- }
- });
-
-
- DataSet<WC> counts = tokenized
- .groupBy("word")
- .reduce(new ReduceFunction<WC>() {
- public WC reduce(WC value1, WC value2) {
- return new WC(value1.word, value1.count + value2.count);
- }
- });
+ DataSet<WC> counts = text
+ .flatMap(new Tokenizer())
+ .groupBy("word")
+ .reduce(new ReduceFunction<WC>() {
+ public WC reduce(WC value1, WC value2) {
+ return new WC(value1.word, value1.count + value2.count);
+ }
+ });
// emit result
if(fileOutput) {
@@ -104,29 +73,51 @@ public class WordCountPOJO {
counts.print();
}
- env.execute("WordCount with custom data types Example");
+ env.execute("WordCount with custom data types example");
}
// *************************************************************************
+ // USER DATA TYPES (POJOs)
+ // *************************************************************************
+
+ public static class WC {
+ public String word;
+ public int count;
+
+ public WC() {
+ }
+
+ public WC(String word, int count) {
+ this.word = word;
+ this.count = count;
+ }
+
+ @Override
+ public String toString() {
+ return word + " " + count;
+ }
+ }
+
+ // *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into
- * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+ * multiple WC POJOs as "(word, 1)".
*/
- public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> {
+ public static final class Tokenizer extends FlatMapFunction<String, WC> {
@Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ public void flatMap(String value, Collector<WC> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
+ out.collect(new WC(token, 1));
}
}
}