You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/04/08 11:03:05 UTC
[2/2] flink git commit: [FLINK-1839] Reworked TwitterStreamITCase for
testability
[FLINK-1839] Reworked TwitterStreamITCase for testability
The job is now commutative and associative and thus testable in parallel.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5dd42c1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5dd42c1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5dd42c1a
Branch: refs/heads/master
Commit: 5dd42c1ab4dc4d1842ec360d077e1c7c3c6b1981
Parents: 0afed4d
Author: mbalassi <mb...@apache.org>
Authored: Wed Apr 8 10:34:35 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Apr 8 11:02:24 2015 +0200
----------------------------------------------------------------------
.../examples/twitter/TwitterStream.java | 68 ++----
.../twitter/util/TwitterStreamData.java | 215 ++-----------------
2 files changed, 31 insertions(+), 252 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5dd42c1a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index 272381f..8305ce7 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -17,12 +17,11 @@
package org.apache.flink.streaming.examples.twitter;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
import org.apache.flink.util.Collector;
import org.apache.sling.commons.json.JSONException;
@@ -63,31 +62,18 @@ public class TwitterStream {
// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setBufferTimeout(1000);
-
// get input data
DataStream<String> streamSource = getTextDataStream(env);
DataStream<Tuple2<String, Integer>> tweets = streamSource
- // selecting English tweets and splitting to words
+ // selecting English tweets and splitting to (word, 1)
.flatMap(new SelectEnglishAndTokenizeFlatMap())
- // returning (word, 1)
- .map(new MapFunction<String, Tuple2<String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<String, Integer> map(String value) throws Exception {
- return new Tuple2<String, Integer>(value, 1);
- }
- })
- // group by words and sum their occurence
- .groupBy(0).sum(1)
- // select word with maximum occurence
- .flatMap(new SelectMaxOccurence());
+ // group by words and sum their occurrences
+ .groupBy(0).sum(1);
// emit result
if (fileOutput) {
- tweets.writeAsText(outputPath, 1L);
+ tweets.writeAsText(outputPath);
} else {
tweets.print();
}
@@ -110,14 +96,14 @@ public class TwitterStream {
* Integer>).
* </p>
*/
- public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, String> {
+ public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
/**
* Select the language from the incoming JSON text
*/
@Override
- public void flatMap(String value, Collector<String> out) throws Exception {
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
try {
if (getString(value, "lang").equals("en")) {
// message of tweet
@@ -125,39 +111,15 @@ public class TwitterStream {
// split the message
while (tokenizer.hasMoreTokens()) {
- String result = tokenizer.nextToken().replaceAll("\\s*", "");
+ String result = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase();
if (result != null && !result.equals("")) {
- out.collect(result);
+ out.collect(new Tuple2<String, Integer>(result, 1));
}
}
}
} catch (JSONException e) {
-
- }
- }
- }
-
- /**
- * Implements a user-defined FlatMapFunction that checks if the current
- * occurence is higher than the maximum occurence. If so, returns the word
- * and changes the maximum.
- */
- public static class SelectMaxOccurence implements
- FlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
- private Integer maximum;
-
- public SelectMaxOccurence() {
- this.maximum = 0;
- }
-
- @Override
- public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out)
- throws Exception {
- if ((Integer) value.getField(1) >= maximum) {
- out.collect(value);
- maximum = (Integer) value.getField(1);
+ // the JSON was not parsed correctly
}
}
}
@@ -168,7 +130,7 @@ public class TwitterStream {
private static boolean fileInput = false;
private static boolean fileOutput = false;
- private static String textPath;
+ private static String propertiesPath;
private static String outputPath;
private static boolean parseParameters(String[] args) {
@@ -177,18 +139,18 @@ public class TwitterStream {
fileOutput = true;
if (args.length == 2) {
fileInput = true;
- textPath = args[0];
+ propertiesPath = args[0];
outputPath = args[1];
} else if (args.length == 1) {
outputPath = args[0];
} else {
- System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile> <result path>");
+ System.err.println("USAGE:\nTwitterStream [<pathToPropertiesFile>] <result path>");
return false;
}
} else {
System.out.println("Executing TwitterStream example with built-in default data.");
System.out.println(" Provide parameters to read input data from a file.");
- System.out.println(" USAGE: TwitterStream <pathToPropertiesFile>");
+ System.out.println(" USAGE: TwitterStream [<pathToPropertiesFile>] <result path>");
}
return true;
}
@@ -196,7 +158,7 @@ public class TwitterStream {
private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
if (fileInput) {
// read the text file from given input path
- return env.readTextFile(textPath);
+ return env.addSource(new TwitterSource(propertiesPath));
} else {
// get default test text data
return env.fromElements(TwitterStreamData.TEXTS);