You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/04/08 11:03:05 UTC

[2/2] flink git commit: [FLINK-1839] Reworked TwitterStreamITCase for testability

[FLINK-1839] Reworked TwitterStreamITCase for testability

The job is now commutative and associative and thus testable in parallel.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5dd42c1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5dd42c1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5dd42c1a

Branch: refs/heads/master
Commit: 5dd42c1ab4dc4d1842ec360d077e1c7c3c6b1981
Parents: 0afed4d
Author: mbalassi <mb...@apache.org>
Authored: Wed Apr 8 10:34:35 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Apr 8 11:02:24 2015 +0200

----------------------------------------------------------------------
 .../examples/twitter/TwitterStream.java         |  68 ++----
 .../twitter/util/TwitterStreamData.java         | 215 ++-----------------
 2 files changed, 31 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5dd42c1a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index 272381f..8305ce7 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -17,12 +17,11 @@
 
 package org.apache.flink.streaming.examples.twitter;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.streaming.connectors.twitter.TwitterSource;
 import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
 import org.apache.flink.util.Collector;
 import org.apache.sling.commons.json.JSONException;
@@ -63,31 +62,18 @@ public class TwitterStream {
 		// set up the execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		env.setBufferTimeout(1000);
-
 		// get input data
 		DataStream<String> streamSource = getTextDataStream(env);
 
 		DataStream<Tuple2<String, Integer>> tweets = streamSource
-				// selecting English tweets and splitting to words
+				// selecting English tweets and splitting to (word, 1)
 				.flatMap(new SelectEnglishAndTokenizeFlatMap())
-						// returning (word, 1)
-				.map(new MapFunction<String, Tuple2<String, Integer>>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Tuple2<String, Integer> map(String value) throws Exception {
-						return new Tuple2<String, Integer>(value, 1);
-					}
-				})
-						// group by words and sum their occurence
-				.groupBy(0).sum(1)
-						// select word with maximum occurence
-				.flatMap(new SelectMaxOccurence());
+				// group by words and sum their occurrences
+				.groupBy(0).sum(1);
 
 		// emit result
 		if (fileOutput) {
-			tweets.writeAsText(outputPath, 1L);
+			tweets.writeAsText(outputPath);
 		} else {
 			tweets.print();
 		}
@@ -110,14 +96,14 @@ public class TwitterStream {
 	 * Integer>).
 	 * </p>
 	 */
-	public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, String> {
+	public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, Tuple2<String, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		/**
 		 * Select the language from the incoming JSON text
 		 */
 		@Override
-		public void flatMap(String value, Collector<String> out) throws Exception {
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
 			try {
 				if (getString(value, "lang").equals("en")) {
 					// message of tweet
@@ -125,39 +111,15 @@ public class TwitterStream {
 
 					// split the message
 					while (tokenizer.hasMoreTokens()) {
-						String result = tokenizer.nextToken().replaceAll("\\s*", "");
+						String result = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase();
 
 						if (result != null && !result.equals("")) {
-							out.collect(result);
+							out.collect(new Tuple2<String, Integer>(result, 1));
 						}
 					}
 				}
 			} catch (JSONException e) {
-
-			}
-		}
-	}
-
-	/**
-	 * Implements a user-defined FlatMapFunction that checks if the current
-	 * occurence is higher than the maximum occurence. If so, returns the word
-	 * and changes the maximum.
-	 */
-	public static class SelectMaxOccurence implements
-			FlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-		private Integer maximum;
-
-		public SelectMaxOccurence() {
-			this.maximum = 0;
-		}
-
-		@Override
-		public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out)
-				throws Exception {
-			if ((Integer) value.getField(1) >= maximum) {
-				out.collect(value);
-				maximum = (Integer) value.getField(1);
+				// the JSON was not parsed correctly
 			}
 		}
 	}
@@ -168,7 +130,7 @@ public class TwitterStream {
 
 	private static boolean fileInput = false;
 	private static boolean fileOutput = false;
-	private static String textPath;
+	private static String propertiesPath;
 	private static String outputPath;
 
 	private static boolean parseParameters(String[] args) {
@@ -177,18 +139,18 @@ public class TwitterStream {
 			fileOutput = true;
 			if (args.length == 2) {
 				fileInput = true;
-				textPath = args[0];
+				propertiesPath = args[0];
 				outputPath = args[1];
 			} else if (args.length == 1) {
 				outputPath = args[0];
 			} else {
-				System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile> <result path>");
+				System.err.println("USAGE:\nTwitterStream [<pathToPropertiesFile>] <result path>");
 				return false;
 			}
 		} else {
 			System.out.println("Executing TwitterStream example with built-in default data.");
 			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  USAGE: TwitterStream <pathToPropertiesFile>");
+			System.out.println("  USAGE: TwitterStream [<pathToPropertiesFile>] <result path>");
 		}
 		return true;
 	}
@@ -196,7 +158,7 @@ public class TwitterStream {
 	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
 		if (fileInput) {
 			// read the text file from given input path
-			return env.readTextFile(textPath);
+			return env.addSource(new TwitterSource(propertiesPath));
 		} else {
 			// get default test text data
 			return env.fromElements(TwitterStreamData.TEXTS);