You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/06/30 12:20:10 UTC

git commit: Minor changes in Java POJO WordCount example

Repository: incubator-flink
Updated Branches:
  refs/heads/master 0902829e4 -> 504228011


Minor changes in Java POJO WordCount example


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/50422801
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/50422801
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/50422801

Branch: refs/heads/master
Commit: 504228011f9e7cf339bffa64f0e18b7bd537787b
Parents: 0902829
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Jun 30 12:19:27 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jun 30 12:19:27 2014 +0200

----------------------------------------------------------------------
 .../example/java/wordcount/WordCountPOJO.java   | 79 +++++++++-----------
 1 file changed, 35 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/50422801/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountPOJO.java
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountPOJO.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountPOJO.java
index 27142e9..90f3dee 100644
--- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountPOJO.java
+++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountPOJO.java
@@ -18,7 +18,6 @@ import eu.stratosphere.api.java.DataSet;
 import eu.stratosphere.api.java.ExecutionEnvironment;
 import eu.stratosphere.api.java.functions.FlatMapFunction;
 import eu.stratosphere.api.java.functions.ReduceFunction;
-import eu.stratosphere.api.java.tuple.Tuple2;
 import eu.stratosphere.example.java.wordcount.util.WordCountData;
 import eu.stratosphere.util.Collector;
 
@@ -49,24 +48,6 @@ public class WordCountPOJO {
 	//     PROGRAM
 	// *************************************************************************
 
-	public static class WC {
-		public String word;
-		public int count;
-
-		public WC() {
-		}
-
-		public WC(String word, int count) {
-			this.word = word;
-			this.count = count;
-		}
-
-		@Override
-		public String toString() {
-			return word + " " + count;
-		}
-	}
-
 	public static void main(String[] args) throws Exception {
 
 		parseParameters(args);
@@ -76,26 +57,14 @@ public class WordCountPOJO {
 		// get input data
 		DataSet<String> text = getTextDataSet(env);
 
-		DataSet<WC> tokenized = text.flatMap(new FlatMapFunction<String, WC>() {
-			@Override
-			public void flatMap(String value, Collector<WC> out) {
-				String[] tokens = value.toLowerCase().split("\\W+");
-				for (String token : tokens) {
-					if (token.length() > 0) {
-						out.collect(new WC(token, 1));
-					}
-				}
-			}
-		});
-
-
-		DataSet<WC> counts = tokenized
-				.groupBy("word")
-				.reduce(new ReduceFunction<WC>() {
-					public WC reduce(WC value1, WC value2) {
-						return new WC(value1.word, value1.count + value2.count);
-					}
-				});
+		DataSet<WC> counts = text 
+					.flatMap(new Tokenizer())
+					.groupBy("word")
+					.reduce(new ReduceFunction<WC>() {
+							public WC reduce(WC value1, WC value2) {
+								return new WC(value1.word, value1.count + value2.count);
+							}
+						});
 
 		// emit result
 		if(fileOutput) {
@@ -104,29 +73,51 @@ public class WordCountPOJO {
 			counts.print();
 		}
 
-		env.execute("WordCount with custom data types Example");
+		env.execute("WordCount with custom data types example");
 	}
 
 	// *************************************************************************
+	//     USER DATA TYPES (POJOs)
+	// *************************************************************************
+	
+	public static class WC {
+		public String word;
+		public int count;
+
+		public WC() {
+		}
+
+		public WC(String word, int count) {
+			this.word = word;
+			this.count = count;
+		}
+
+		@Override
+		public String toString() {
+			return word + " " + count;
+		}
+	}
+	
+	// *************************************************************************
 	//     USER FUNCTIONS
 	// *************************************************************************
 
 	/**
 	 * Implements the string tokenizer that splits sentences into words as a user-defined
 	 * FlatMapFunction. The function takes a line (String) and splits it into
-	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+	 * multiple WC POJOs as "(word, 1)".
 	 */
-	public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> {
+	public static final class Tokenizer extends FlatMapFunction<String, WC> {
 
 		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+		public void flatMap(String value, Collector<WC> out) {
 			// normalize and split the line
 			String[] tokens = value.toLowerCase().split("\\W+");
 
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
+					out.collect(new WC(token, 1));
 				}
 			}
 		}