You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/04/07 16:11:10 UTC

[1/3] flink git commit: [FLINK-1560] [streaming] Streaming example ITCases cleanup

Repository: flink
Updated Branches:
  refs/heads/master d33b44549 -> 954beca7e


[FLINK-1560] [streaming] Streaming example ITCases cleanup

This closes #519


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/954beca7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/954beca7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/954beca7

Branch: refs/heads/master
Commit: 954beca7ec9c8ebadfaf11c77c37e493f190554b
Parents: 464e782
Author: mbalassi <mb...@apache.org>
Authored: Mon Apr 6 23:46:06 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Apr 7 16:09:37 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/util/TestStreamEnvironment.java   | 1 +
 .../test/socket/SocketTextStreamWordCountITCase.java         | 8 +++++---
 .../examples/test/windowing/SessionWindowingITCase.java      | 1 -
 .../test/windowing/TopSpeedWindowingExampleITCase.java       | 2 +-
 .../main/java/org/apache/flink/test/util/TestBaseUtils.java  | 2 ++
 5 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 8ddf9e6..f7843cf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -46,6 +46,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 
 	public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism){
 		this.executor = executor;
+		setDefaultLocalParallelism(parallelism);
 		setParallelism(parallelism);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java
index b16a85f..0af8fe2 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.examples.test.socket;
 
+import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount;
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
@@ -29,14 +30,15 @@ import java.net.Socket;
 public class SocketTextStreamWordCountITCase extends StreamingProgramTestBase {
 
 	private static final String HOST = "localhost";
-	private static final String PORT = "9999";
+	private static Integer port;
 	protected String resultPath;
 
 	private ServerSocket temporarySocket;
 
 	@Override
 	protected void preSubmit() throws Exception {
-		temporarySocket = createSocket(HOST, Integer.valueOf(PORT), WordCountData.TEXT);
+		port = NetUtils.getAvailablePort();
+		temporarySocket = createSocket(HOST, port, WordCountData.TEXT);
 		resultPath = getTempDirPath("result");
 	}
 
@@ -48,7 +50,7 @@ public class SocketTextStreamWordCountITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		SocketTextStreamWordCount.main(new String[]{HOST, PORT, resultPath});
+		SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath});
 	}
 
 	public ServerSocket createSocket(String host, int port, String contents) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java
index 5e332f1..2318aa4 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java
@@ -27,7 +27,6 @@ public class SessionWindowingITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void preSubmit() throws Exception {
-		setParallelism(2);
 		resultPath = getTempDirPath("result");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
index f973bd1..d1fa9c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
@@ -27,7 +27,7 @@ public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void preSubmit() throws Exception {
-		setParallelism(1);
+		setParallelism(1); //needed to ensure total ordering for windows
 		textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA);
 		resultPath = getTempDirPath("result");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/954beca7/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 754314b..1fc9af2 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -351,6 +351,8 @@ public class TestBaseUtils {
 			}
 		} catch (URISyntaxException e) {
 			throw new IllegalArgumentException("This path does not describe a valid local file URI.");
+		} catch (NullPointerException e) {
+			throw new IllegalArgumentException("This path does not describe a valid local file URI.");
 		}
 	}
 


[3/3] flink git commit: [FLINK-1560] [streaming] Streaming examples rework

Posted by mb...@apache.org.
[FLINK-1560] [streaming] Streaming examples rework


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed7d1653
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed7d1653
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed7d1653

Branch: refs/heads/master
Commit: ed7d16534f55ca3ee43b2c5110c71bfa224e7144
Parents: d33b445
Author: szape <ne...@gmail.com>
Authored: Wed Mar 25 09:26:01 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Tue Apr 7 16:09:37 2015 +0200

----------------------------------------------------------------------
 .../examples/iteration/IterateExample.java      | 159 ++++++++++++++-----
 .../streaming/examples/join/WindowJoin.java     |  87 ++++++++--
 .../ml/IncrementalLearningSkeleton.java         | 113 ++++++++++---
 .../socket/SocketTextStreamWordCount.java       |  28 ++--
 .../examples/twitter/TwitterStream.java         |  35 ++--
 .../windowing/TopSpeedWindowingExample.java     |  60 +++++--
 .../examples/wordcount/PojoExample.java         |   3 +-
 .../test/wordcount/WordCountITCase.java         |   4 +-
 8 files changed, 371 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index bbd5433..d9a8167 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -17,29 +17,32 @@
 
 package org.apache.flink.streaming.examples.iteration;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Example illustrating iterations in Flink streaming.
- * 
+ * <p/>
  * <p>
  * The program sums up random numbers and counts additions it performs to reach
  * a specific threshold in an iterative streaming fashion.
  * </p>
- * 
- * <p>
+ * <p/>
+ * <p/>
  * This example shows how to use:
  * <ul>
  * <li>streaming iterations,
@@ -59,35 +62,44 @@ public class IterateExample {
 			return;
 		}
 
-		// set up input for the stream of (0,0) pairs
-		List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
-		for (int i = 0; i < 1000; i++) {
-			input.add(new Tuple2<Double, Integer>(0., 0));
-		}
+		// set up input for the stream of integer pairs
 
-		// obtain execution environment and set setBufferTimeout(0) to enable
+		// obtain execution environment and set setBufferTimeout to 1 to enable
 		// continuous flushing of the output buffers (lowest latency)
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
 				.setBufferTimeout(1);
 
+		// create input stream of integer pairs
+		DataStream<Tuple2<Integer, Integer>> inputStream;
+		if(fileInput) {
+			inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap());
+		} else {
+			inputStream = env.addSource(new RandomFibonacciSource());
+		}
+
 		// create an iterative data stream from the input with 5 second timeout
-		IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle()
+		IterativeDataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = inputStream.map(new InputMap())
 				.iterate(5000);
 
-		// apply the step function to add new random value to the tuple and to
+		// apply the step function to get the next Fibonacci number
 		// increment the counter and split the output with the output selector
-		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).split(new MySelector());
+		SplitDataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new Step())
+				.split(new MySelector());
 
 		// close the iteration by selecting the tuples that were directed to the
 		// 'iterate' channel in the output selector
 		it.closeWith(step.select("iterate"));
 
 		// to produce the final output select the tuples directed to the
-		// 'output' channel then project it to the desired second field
-
-		DataStream<Tuple1<Integer>> numbers = step.select("output").project(1).types(Integer.class);
-
-		// emit result
+		// 'output' channel then get the input pairs that have the greatest iteration counter
+		// on a 1 second sliding window
+		DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output")
+				.map(new OutputMap())
+				.window(Time.of(1L, TimeUnit.SECONDS))
+				.every(Time.of(500L, TimeUnit.MILLISECONDS))
+				.maxBy(1).flatten();
+
+		// emit results
 		if (fileOutput) {
 			numbers.writeAsText(outputPath, 1);
 		} else {
@@ -103,57 +115,124 @@ public class IterateExample {
 	// *************************************************************************
 
 	/**
-	 * Iteration step function which takes an input (Double , Integer) and
-	 * produces an output (Double + random, Integer + 1).
+	 * Generate random integer pairs from the range from 0 to BOUND/2
+	 */
+	private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Random rnd = new Random();
+
+		@Override
+		public void run(Collector<Tuple2<Integer, Integer>> collector) throws Exception {
+			while(true) {
+				int first = rnd.nextInt(BOUND/2 - 1) + 1;
+				int second = rnd.nextInt(BOUND/2 - 1) + 1;
+
+				collector.collect(new Tuple2<Integer, Integer>(first, second));
+				Thread.sleep(100L);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			// no cleanup needed
+		}
+	}
+
+	/**
+	 * Generate random integer pairs from the range from 0 to BOUND/2
 	 */
-	public static class Step extends
-			RichMapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
+	private static class FibonacciInputMap implements MapFunction<String, Tuple2<Integer, Integer>> {
 		private static final long serialVersionUID = 1L;
-		private transient Random rnd;
 
-		public void open(Configuration parameters) {
-			rnd = new Random();
+		@Override
+		public Tuple2<Integer, Integer> map(String value) throws Exception {
+			Thread.sleep(100L);
+			String record = value.substring(1, value.length()-1);
+			String[] splitted = record.split(",");
+			return new Tuple2<Integer, Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
 		}
+	}
+
+	/**
+	 * Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple
+	 * A counter is attached to the tuple and incremented in every iteration step
+	 */
+	public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
+
+		@Override
+		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws
+				Exception {
+			return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f0, value.f1, 0);
+		}
+	}
+
+	/**
+	 * Iteration step function that calculates the next Fibonacci number
+	 */
+	public static class Step implements
+			MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Tuple2<Double, Integer> map(Tuple2<Double, Integer> value) throws Exception {
-			return new Tuple2<Double, Integer>(value.f0 + rnd.nextDouble(), value.f1 + 1);
+		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception {
+			return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);
 		}
 	}
 
 	/**
 	 * OutputSelector testing which tuple needs to be iterated again.
 	 */
-	public static class MySelector implements OutputSelector<Tuple2<Double, Integer>> {
+	public static class MySelector implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Iterable<String> select(Tuple2<Double, Integer> value) {
+		public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) {
 			List<String> output = new ArrayList<String>();
-			if (value.f0 > 100) {
-				output.add("output");
-			} else {
+			if (value.f2 < BOUND && value.f3 < BOUND) {
 				output.add("iterate");
+			} else {
+				output.add("output");
 			}
+			output.add("output");
 			return output;
 		}
+	}
+
+	/**
+	 * Giving back the input pair and the counter
+	 */
+	public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple2<Tuple2<Integer, Integer>, Integer>> {
 
+		@Override
+		public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws
+				Exception {
+			return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1), value.f4);
+		}
 	}
 
 	// *************************************************************************
 	// UTIL METHODS
 	// *************************************************************************
 
+	private static boolean fileInput = false;
 	private static boolean fileOutput = false;
+	private static String inputPath;
 	private static String outputPath;
+	private static final int BOUND = 100;
 
 	private static boolean parseParameters(String[] args) {
 
 		if (args.length > 0) {
 			// parse input arguments
-			fileOutput = true;
 			if (args.length == 1) {
+				fileOutput = true;
 				outputPath = args[0];
+			} else if(args.length == 2) {
+				fileInput = true;
+				inputPath = args[0];
+				fileOutput = true;
+				outputPath = args[1];
 			} else {
 				System.err.println("Usage: IterateExample <result path>");
 				return false;
@@ -165,4 +244,4 @@ public class IterateExample {
 		}
 		return true;
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index a5a9577..bfc59ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -17,10 +17,8 @@
 
 package org.apache.flink.streaming.examples.join;
 
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
@@ -28,17 +26,20 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.RichSourceFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.util.Collector;
 
+import java.util.Random;
+
 /**
  * Example illustrating join over sliding windows of streams in Flink.
- * 
+ * <p/>
  * <p>
  * his example will join two streams with a sliding window. One which emits
  * grades and one which emits salaries of people.
  * </p>
- * 
- * <p>
+ * <p/>
+ * <p/>
  * This example shows how to:
  * <ul>
  * <li>do windowed joins,
@@ -51,6 +52,9 @@ public class WindowJoin {
 	// PROGRAM
 	// *************************************************************************
 
+	private static DataStream<Tuple2<String, Integer>> grades;
+	private static DataStream<Tuple2<String, Integer>> salaries;
+
 	public static void main(String[] args) throws Exception {
 
 		if (!parseParameters(args)) {
@@ -61,18 +65,17 @@ public class WindowJoin {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		// connect to the data sources for grades and salaries
-		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
-		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
+		setInputStreams(env);
 
 		// apply a temporal join over the two stream based on the names over one
 		// second windows
 		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades
-						.join(salaries)
-						.onWindow(1, TimeUnit.SECONDS)
-						.where(0)
-						.equalTo(0)
-						.with(new MyJoinFunction());
-		
+				.join(salaries)
+				.onWindow(1, new MyTimestamp(0), new MyTimestamp(0))
+				.where(0)
+				.equalTo(0)
+				.with(new MyJoinFunction());
+
 		// emit result
 		if (fileOutput) {
 			joinedStream.writeAsText(outputPath, 1);
@@ -88,7 +91,7 @@ public class WindowJoin {
 	// USER FUNCTIONS
 	// *************************************************************************
 
-	private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" };
+	private final static String[] names = {"tom", "jerry", "alice", "bob", "john", "grace"};
 	private final static int GRADE_COUNT = 5;
 	private final static int SALARY_MAX = 10000;
 	private final static int SLEEP_TIME = 10;
@@ -154,6 +157,21 @@ public class WindowJoin {
 		}
 	}
 
+	public static class MySourceMap extends RichMapFunction<String, Tuple2<String, Integer>> {
+
+		private String[] record;
+
+		public MySourceMap() {
+			record = new String[2];
+		}
+
+		@Override
+		public Tuple2<String, Integer> map(String line) throws Exception {
+			record = line.substring(1, line.length() - 1).split(",");
+			return new Tuple2<String, Integer>(record[0], Integer.parseInt(record[1]));
+		}
+	}
+
 	public static class MyJoinFunction
 			implements
 			JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
@@ -172,22 +190,47 @@ public class WindowJoin {
 		}
 	}
 
+	public static class MyTimestamp implements Timestamp<Tuple2<String, Integer>> {
+		private int counter;
+
+		public MyTimestamp(int starttime) {
+			this.counter = starttime;
+		}
+
+		@Override
+		public long getTimestamp(Tuple2<String, Integer> value) {
+			counter += SLEEP_TIME;
+			return counter;
+		}
+	}
+
 	// *************************************************************************
 	// UTIL METHODS
 	// *************************************************************************
 
+	private static boolean fileInput = false;
 	private static boolean fileOutput = false;
+
+	private static String gradesPath;
+	private static String salariesPath;
 	private static String outputPath;
 
 	private static boolean parseParameters(String[] args) {
 
 		if (args.length > 0) {
 			// parse input arguments
-			fileOutput = true;
 			if (args.length == 1) {
+				fileOutput = true;
 				outputPath = args[0];
+			} else if (args.length == 3) {
+				fileInput = true;
+				fileOutput = true;
+				gradesPath = args[0];
+				salariesPath = args[1];
+				outputPath = args[2];
 			} else {
-				System.err.println("Usage: WindowJoin <result path>");
+				System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> <input path 1> " +
+						"<result path>");
 				return false;
 			}
 		} else {
@@ -197,4 +240,14 @@ public class WindowJoin {
 		}
 		return true;
 	}
+
+	private static void setInputStreams(StreamExecutionEnvironment env) {
+		if (fileInput) {
+			grades = env.readTextFile(gradesPath).map(new MySourceMap());
+			salaries = env.readTextFile(salariesPath).map(new MySourceMap());
+		} else {
+			grades = env.addSource(new GradeSource());
+			salaries = env.addSource(new SalarySource());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 26895f2..68b105a 100755
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -17,28 +17,27 @@
 
 package org.apache.flink.streaming.examples.ml;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.WindowMapFunction;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.util.Collector;
 
 /**
  * Skeleton for incremental machine learning algorithm consisting of a
  * pre-computed model, which gets updated for the new inputs and new input data
  * for which the job provides predictions.
- * 
+ * <p/>
  * <p>
  * This may serve as a base of a number of algorithms, e.g. updating an
  * incremental Alternating Least Squares model while also providing the
  * predictions.
  * </p>
- * 
- * <p>
+ * <p/>
+ * <p/>
  * This example shows how to use:
  * <ul>
  * <li>Connected streams
@@ -48,6 +47,9 @@ import org.apache.flink.util.Collector;
  */
 public class IncrementalLearningSkeleton {
 
+	private static DataStream<Integer> trainingData = null;
+	private static DataStream<Integer> newData = null;
+
 	// *************************************************************************
 	// PROGRAM
 	// *************************************************************************
@@ -59,15 +61,15 @@ public class IncrementalLearningSkeleton {
 		}
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		// env.setDegreeOfParallelism(1);
+		createSourceStreams(env);
 
 		// build new model on every second of new data
-		DataStream<Double[]> model = env.addSource(new TrainingDataSource())
-				.window(Time.of(5000, TimeUnit.MILLISECONDS))
+		DataStream<Double[]> model = trainingData.window(Time.of(5000, new LinearTimestamp()))
 				.mapWindow(new PartialModelBuilder()).flatten();
 
-		// use partial model for prediction
-		DataStream<Integer> prediction = env.addSource(new NewDataSource()).connect(model)
-				.map(new Predictor());
+		// use partial model for newData
+		DataStream<Integer> prediction = newData.connect(model).map(new Predictor());
 
 		// emit result
 		if (fileOutput) {
@@ -85,7 +87,7 @@ public class IncrementalLearningSkeleton {
 	// *************************************************************************
 
 	/**
-	 * Feeds new data for prediction. By default it is implemented as constantly
+	 * Feeds new data for newData. By default it is implemented as constantly
 	 * emitting the Integer 1 in a loop.
 	 */
 	public static class NewDataSource implements SourceFunction<Integer> {
@@ -111,6 +113,34 @@ public class IncrementalLearningSkeleton {
 	}
 
 	/**
+	 * Feeds new data for newData. By default it is implemented as constantly
+	 * emitting the Integer 1 in a loop.
+	 */
+	public static class FiniteNewDataSource implements SourceFunction<Integer> {
+		private static final long serialVersionUID = 1L;
+		private int counter;
+
+		@Override
+		public void run(Collector<Integer> collector) throws Exception {
+			Thread.sleep(15);
+			while (counter < 50) {
+				collector.collect(getNewData());
+			}
+		}
+
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
+
+		private Integer getNewData() throws InterruptedException {
+			Thread.sleep(5);
+			counter++;
+			return 1;
+		}
+	}
+
+	/**
 	 * Feeds new training data for the partial model builder. By default it is
 	 * implemented as constantly emitting the Integer 1 in a loop.
 	 */
@@ -139,13 +169,50 @@ public class IncrementalLearningSkeleton {
 	}
 
 	/**
+	 * Feeds new training data for the partial model builder. By default it is
+	 * implemented as constantly emitting the Integer 1 in a loop.
+	 */
+	public static class FiniteTrainingDataSource implements SourceFunction<Integer> {
+		private static final long serialVersionUID = 1L;
+		private int counter = 0;
+
+		@Override
+		public void run(Collector<Integer> collector) throws Exception {
+			while (counter < 8200) {
+				collector.collect(getTrainingData());
+			}
+		}
+
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
+
+		private Integer getTrainingData() throws InterruptedException {
+			counter++;
+			return 1;
+		}
+	}
+
+	public static class LinearTimestamp implements Timestamp<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		private long counter = 0L;
+
+		@Override
+		public long getTimestamp(Integer value) {
+			return counter += 10L;
+		}
+	}
+
+	/**
 	 * Builds up-to-date partial models on new training data.
 	 */
 	public static class PartialModelBuilder implements WindowMapFunction<Integer, Double[]> {
 		private static final long serialVersionUID = 1L;
 
 		protected Double[] buildPartialModel(Iterable<Integer> values) {
-			return new Double[] { 1. };
+			return new Double[]{1.};
 		}
 
 		@Override
@@ -155,11 +222,11 @@ public class IncrementalLearningSkeleton {
 	}
 
 	/**
-	 * Creates prediction using the model produced in batch-processing and the
+	 * Creates newData using the model produced in batch-processing and the
 	 * up-to-date partial model.
-	 * 
+	 * <p/>
 	 * <p>
-	 * By defaults emits the Integer 0 for every prediction and the Integer 1
+	 * By defaults emits the Integer 0 for every newData and the Integer 1
 	 * for every model update.
 	 * </p>
 	 */
@@ -171,7 +238,7 @@ public class IncrementalLearningSkeleton {
 
 		@Override
 		public Integer map1(Integer value) {
-			// Return prediction
+			// Return newData
 			return predict(value);
 		}
 
@@ -185,10 +252,10 @@ public class IncrementalLearningSkeleton {
 
 		// pulls model built with batch-job on the old training data
 		protected Double[] getBatchModel() {
-			return new Double[] { 0. };
+			return new Double[]{0.};
 		}
 
-		// performs prediction using the two models
+		// performs newData using the two models
 		protected Integer predict(Integer inTuple) {
 			return 0;
 		}
@@ -220,4 +287,14 @@ public class IncrementalLearningSkeleton {
 		}
 		return true;
 	}
+
+	public static void createSourceStreams(StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			trainingData = env.addSource(new FiniteTrainingDataSource());
+			newData = env.addSource(new FiniteNewDataSource());
+		} else {
+			trainingData = env.addSource(new TrainingDataSource());
+			newData = env.addSource(new NewDataSource());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
index e9b60f4..1473097 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
@@ -26,26 +26,26 @@ import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
  * This example shows an implementation of WordCount with data from a text
  * socket. To run the example make sure that the service providing the text data
  * is already up and running.
- * 
- * <p>
+ * <p/>
+ * <p/>
  * To start an example socket text stream on your local machine run netcat from
  * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
  * port number.
- * 
- * 
- * <p>
+ * <p/>
+ * <p/>
+ * <p/>
  * Usage:
  * <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result path&gt;</code>
  * <br>
- * 
- * <p>
+ * <p/>
+ * <p/>
  * This example shows how to:
  * <ul>
  * <li>use StreamExecutionEnvironment.socketTextStream
  * <li>write a simple Flink program,
  * <li>write and use user-defined functions.
  * </ul>
- * 
+ *
  * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
  */
 public class SocketTextStreamWordCount {
@@ -60,14 +60,14 @@ public class SocketTextStreamWordCount {
 				.getExecutionEnvironment();
 
 		// get input data
-		DataStream<String> text = env.socketTextStream(hostName, port);
+		DataStream<String> text = env.socketTextStream(hostName, port, '\n', 0);
 
 		DataStream<Tuple2<String, Integer>> counts =
-		// split up the lines in pairs (2-tuples) containing: (word,1)
-		text.flatMap(new Tokenizer())
-		// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0)
-				.sum(1);
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+						// group by the tuple field "0" and sum up tuple field "1"
+						.groupBy(0)
+						.sum(1);
 
 		if (fileOutput) {
 			counts.writeAsText(outputPath, 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index 1901475..272381f 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.examples.twitter;
 
-import java.util.StringTokenizer;
-
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -29,26 +27,27 @@ import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
 import org.apache.flink.util.Collector;
 import org.apache.sling.commons.json.JSONException;
 
+import java.util.StringTokenizer;
+
 /**
  * Implements the "TwitterStream" program that computes a most used word
  * occurrence over JSON files in a streaming fashion.
- * 
- * <p>
+ * <p/>
+ * <p/>
  * The input is a JSON text file with lines separated by newline characters.
- * 
- * <p>
+ * <p/>
+ * <p/>
  * Usage: <code>TwitterStream &lt;text path&gt;</code><br>
  * If no parameters are provided, the program is run with default data from
  * {@link TwitterStreamData}.
- * 
- * <p>
+ * <p/>
+ * <p/>
  * This example shows how to:
  * <ul>
  * <li>acquire external data,
  * <li>use in-line defined functions,
  * <li>handle flattened stream inputs.
  * </ul>
- * 
  */
 public class TwitterStream {
 
@@ -70,9 +69,9 @@ public class TwitterStream {
 		DataStream<String> streamSource = getTextDataStream(env);
 
 		DataStream<Tuple2<String, Integer>> tweets = streamSource
-		// selecting English tweets and splitting to words
+				// selecting English tweets and splitting to words
 				.flatMap(new SelectEnglishAndTokenizeFlatMap())
-				// returning (word, 1)
+						// returning (word, 1)
 				.map(new MapFunction<String, Tuple2<String, Integer>>() {
 					private static final long serialVersionUID = 1L;
 
@@ -81,14 +80,14 @@ public class TwitterStream {
 						return new Tuple2<String, Integer>(value, 1);
 					}
 				})
-				// group by words and sum their occurence
+						// group by words and sum their occurence
 				.groupBy(0).sum(1)
-				// select word with maximum occurence
+						// select word with maximum occurence
 				.flatMap(new SelectMaxOccurence());
 
 		// emit result
 		if (fileOutput) {
-			tweets.writeAsText(outputPath, 1);
+			tweets.writeAsText(outputPath, 1L);
 		} else {
 			tweets.print();
 		}
@@ -103,7 +102,7 @@ public class TwitterStream {
 
 	/**
 	 * Makes sentences from English tweets.
-	 * 
+	 * <p/>
 	 * <p>
 	 * Implements a string tokenizer that splits sentences into words as a
 	 * user-defined FlatMapFunction. The function takes a line (String) and
@@ -167,6 +166,7 @@ public class TwitterStream {
 	// UTIL METHODS
 	// *************************************************************************
 
+	private static boolean fileInput = false;
 	private static boolean fileOutput = false;
 	private static String textPath;
 	private static String outputPath;
@@ -176,8 +176,11 @@ public class TwitterStream {
 			// parse input arguments
 			fileOutput = true;
 			if (args.length == 2) {
+				fileInput = true;
 				textPath = args[0];
 				outputPath = args[1];
+			} else if (args.length == 1) {
+				outputPath = args[0];
 			} else {
 				System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile> <result path>");
 				return false;
@@ -191,7 +194,7 @@ public class TwitterStream {
 	}
 
 	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
-		if (fileOutput) {
+		if (fileInput) {
 			// read the text file from given input path
 			return env.readTextFile(textPath);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
index 311c6b2..bf3802b 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -24,11 +25,11 @@ import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.util.Collector;
 
 import java.util.Arrays;
 import java.util.Random;
-import java.util.concurrent.TimeUnit;
 
 /**
  * An example of grouped stream windowing where different eviction and trigger
@@ -45,13 +46,17 @@ public class TopSpeedWindowingExample {
 			return;
 		}
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		@SuppressWarnings({ "rawtypes", "serial" })
-		DataStream topSpeeds = env
-				.addSource(CarSource.create(numOfCars))
-				.groupBy(0)
-				.window(Time.of(evictionSec, TimeUnit.SECONDS))
+		@SuppressWarnings({"rawtypes", "serial"})
+		DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
+		if (fileInput) {
+			carData = env.readTextFile(inputPath).map(new ParseCarData());
+		} else {
+			carData = env.addSource(CarSource.create(numOfCars));
+		}
+		DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData.groupBy(0)
+				.window(Time.of(evictionSec, new CarTimestamp()))
 				.every(Delta.of(triggerMeters,
 						new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
 							@Override
@@ -61,8 +66,12 @@ public class TopSpeedWindowingExample {
 								return newDataPoint.f2 - oldDataPoint.f2;
 							}
 						}, new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 0l))).local().maxBy(1).flatten();
+		if (fileOutput) {
+			topSpeeds.writeAsText(outputPath);
+		} else {
+			topSpeeds.print();
+		}
 
-		topSpeeds.print();
 		env.execute("CarTopSpeedWindowingExample");
 	}
 
@@ -99,8 +108,9 @@ public class TopSpeedWindowingExample {
 						speeds[carId] = Math.max(0, speeds[carId] - 5);
 					}
 					distances[carId] += speeds[carId] / 3.6d;
-					collector.collect(new Tuple4<Integer, Integer, Double, Long>(carId,
-							speeds[carId], distances[carId], System.currentTimeMillis()));
+					Tuple4<Integer, Integer, Double, Long> record = new Tuple4<Integer, Integer, Double, Long>(carId,
+							speeds[carId], distances[carId], System.currentTimeMillis());
+					collector.collect(record);
 				}
 			}
 		}
@@ -110,9 +120,34 @@ public class TopSpeedWindowingExample {
 		}
 	}
 
+	private static class ParseCarData extends
+			RichMapFunction<String, Tuple4<Integer, Integer, Double, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple4<Integer, Integer, Double, Long> map(String record) {
+			String rawData = record.substring(1, record.length() - 1);
+			String[] data = rawData.split(",");
+			return new Tuple4<Integer, Integer, Double, Long>(Integer.valueOf(data[0]),
+					Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3]));
+		}
+	}
+
+	private static class CarTimestamp implements Timestamp<Tuple4<Integer, Integer, Double, Long>> {
+
+		@Override
+		public long getTimestamp(Tuple4<Integer, Integer, Double, Long> value) {
+			return value.f3;
+		}
+	}
+
+	private static boolean fileInput = false;
+	private static boolean fileOutput = false;
 	private static int numOfCars = 2;
 	private static int evictionSec = 10;
 	private static double triggerMeters = 50;
+	private static String inputPath;
+	private static String outputPath;
 
 	private static boolean parseParameters(String[] args) {
 
@@ -121,6 +156,11 @@ public class TopSpeedWindowingExample {
 				numOfCars = Integer.valueOf(args[0]);
 				evictionSec = Integer.valueOf(args[1]);
 				triggerMeters = Double.valueOf(args[2]);
+			} else if (args.length == 2) {
+				fileInput = true;
+				fileOutput = true;
+				inputPath = args[0];
+				outputPath = args[1];
 			} else {
 				System.err
 						.println("Usage: TopSpeedWindowingExample <numCars> <evictSec> <triggerMeters>");

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
index d582c10..5ff3fc1 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.examples.wordcount;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d1653/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java
index 51da0d6..85e1dca 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/wordcount/WordCountITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.examples.test.wordcount;
 
 import org.apache.flink.streaming.examples.wordcount.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
 
 public class WordCountITCase extends StreamingProgramTestBase {
 
@@ -40,6 +40,6 @@ public class WordCountITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		WordCount.main(new String[] { textPath, resultPath });
+		WordCount.main(new String[]{textPath, resultPath});
 	}
 }


[2/3] flink git commit: [FLINK-1560] [streaming] Added ITCases to streaming examples

Posted by mb...@apache.org.
[FLINK-1560] [streaming] Added ITCases to streaming examples


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/464e7828
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/464e7828
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/464e7828

Branch: refs/heads/master
Commit: 464e782868e1f697809d681ce7f8528bef4f2bdb
Parents: ed7d165
Author: szape <ne...@gmail.com>
Authored: Wed Mar 25 09:35:39 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Tue Apr 7 16:09:37 2015 +0200

----------------------------------------------------------------------
 .../examples/iteration/IterateExample.java      |  70 ++++----
 .../iteration/util/IterateExampleData.java      |  32 ++++
 .../examples/join/util/WindowJoinData.java      |  66 ++++++++
 .../util/IncrementalLearningSkeletonData.java   |  34 ++++
 .../twitter/util/TwitterStreamData.java         |  27 +++
 .../examples/windowing/SessionWindowing.java    |  55 ++++++-
 .../windowing/util/SessionWindowingData.java    |  27 +++
 .../util/TopSpeedWindowingExampleData.java      | 165 +++++++++++++++++++
 .../test/iteration/IterateExampleITCase.java    |  45 +++++
 .../examples/test/join/WindowJoinITCase.java    |  48 ++++++
 .../ml/IncrementalLearningSkeletonITCase.java   |  42 +++++
 .../socket/SocketTextStreamWordCountITCase.java |  94 +++++++++++
 .../test/twitter/TwitterStreamITCase.java       |  42 +++++
 .../test/windowing/SessionWindowingITCase.java  |  43 +++++
 .../TopSpeedWindowingExampleITCase.java         |  45 +++++
 15 files changed, 789 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index d9a8167..f5f2cd7 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -26,32 +26,24 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.util.Collector;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.TimeUnit;
 
 /**
- * Example illustrating iterations in Flink streaming.
+ * Example illustrating iterations in Flink streaming. <p/> <p> The program sums up random numbers and counts additions
+ * it performs to reach a specific threshold in an iterative streaming fashion. </p>
  * <p/>
- * <p>
- * The program sums up random numbers and counts additions it performs to reach
- * a specific threshold in an iterative streaming fashion.
- * </p>
  * <p/>
- * <p/>
- * This example shows how to use:
- * <ul>
- * <li>streaming iterations,
- * <li>buffer timeout to enhance latency,
- * <li>directed outputs.
- * </ul>
+ * This example shows how to use: <ul> <li>streaming iterations, <li>buffer timeout to enhance latency, <li>directed
+ * outputs. </ul>
  */
 public class IterateExample {
 
+	private static final int BOUND = 100;
+
 	// *************************************************************************
 	// PROGRAM
 	// *************************************************************************
@@ -71,7 +63,7 @@ public class IterateExample {
 
 		// create input stream of integer pairs
 		DataStream<Tuple2<Integer, Integer>> inputStream;
-		if(fileInput) {
+		if (fileInput) {
 			inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap());
 		} else {
 			inputStream = env.addSource(new RandomFibonacciSource());
@@ -94,10 +86,7 @@ public class IterateExample {
 		// 'output' channel then get the input pairs that have the greatest iteration counter
 		// on a 1 second sliding window
 		DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output")
-				.map(new OutputMap())
-				.window(Time.of(1L, TimeUnit.SECONDS))
-				.every(Time.of(500L, TimeUnit.MILLISECONDS))
-				.maxBy(1).flatten();
+				.map(new OutputMap());
 
 		// emit results
 		if (fileOutput) {
@@ -124,12 +113,12 @@ public class IterateExample {
 
 		@Override
 		public void run(Collector<Tuple2<Integer, Integer>> collector) throws Exception {
-			while(true) {
-				int first = rnd.nextInt(BOUND/2 - 1) + 1;
-				int second = rnd.nextInt(BOUND/2 - 1) + 1;
+			while (true) {
+				int first = rnd.nextInt(BOUND / 2 - 1) + 1;
+				int second = rnd.nextInt(BOUND / 2 - 1) + 1;
 
 				collector.collect(new Tuple2<Integer, Integer>(first, second));
-				Thread.sleep(100L);
+				Thread.sleep(500L);
 			}
 		}
 
@@ -147,18 +136,19 @@ public class IterateExample {
 
 		@Override
 		public Tuple2<Integer, Integer> map(String value) throws Exception {
-			Thread.sleep(100L);
-			String record = value.substring(1, value.length()-1);
+			String record = value.substring(1, value.length() - 1);
 			String[] splitted = record.split(",");
 			return new Tuple2<Integer, Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
 		}
 	}
 
 	/**
-	 * Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple
-	 * A counter is attached to the tuple and incremented in every iteration step
+	 * Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple A
+	 * counter is attached to the tuple and incremented in every iteration step
 	 */
-	public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
+	public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer,
+			Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
 
 		@Override
 		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws
@@ -171,12 +161,15 @@ public class IterateExample {
 	 * Iteration step function that calculates the next Fibonacci number
 	 */
 	public static class Step implements
-			MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
+			MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer,
+					Integer, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception {
-			return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);
+		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
+				Integer> value) throws Exception {
+			return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 +
+					value.f3, ++value.f4);
 		}
 	}
 
@@ -194,7 +187,6 @@ public class IterateExample {
 			} else {
 				output.add("output");
 			}
-			output.add("output");
 			return output;
 		}
 	}
@@ -202,12 +194,16 @@ public class IterateExample {
 	/**
 	 * Giving back the input pair and the counter
 	 */
-	public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple2<Tuple2<Integer, Integer>, Integer>> {
+	public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
+			Tuple2<Tuple2<Integer, Integer>, Integer>> {
+		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws
+		public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer>
+				value) throws
 				Exception {
-			return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1), value.f4);
+			return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1),
+					value.f4);
 		}
 	}
 
@@ -219,7 +215,6 @@ public class IterateExample {
 	private static boolean fileOutput = false;
 	private static String inputPath;
 	private static String outputPath;
-	private static final int BOUND = 100;
 
 	private static boolean parseParameters(String[] args) {
 
@@ -228,7 +223,7 @@ public class IterateExample {
 			if (args.length == 1) {
 				fileOutput = true;
 				outputPath = args[0];
-			} else if(args.length == 2) {
+			} else if (args.length == 2) {
 				fileInput = true;
 				inputPath = args[0];
 				fileOutput = true;
@@ -244,4 +239,5 @@ public class IterateExample {
 		}
 		return true;
 	}
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
new file mode 100644
index 0000000..0077459
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.iteration.util;
+
+public class IterateExampleData {
+	public static final String INPUT_PAIRS = "(1,40)\n" + "(29,38)\n" + "(11,15)\n" + "(17,39)\n" + "(24,41)\n" +
+			"(7,33)\n" + "(20,2)\n" + "(11,5)\n" + "(3,16)\n" + "(23,36)\n" + "(15,23)\n" + "(28,13)\n" + "(1,1)\n" +
+			"(10,6)\n" + "(21,5)\n" + "(14,36)\n" + "(17,15)\n" + "(7,9)";
+
+	public static final String RESULTS = "((1,40),3)\n" + "((24,41),2)\n" + "((3,16),5)\n" + "((1,1),10)\n" +
+			"((17,15),4)\n" + "((29,38),2)\n" + "((7,33),3)\n" + "((23,36),3)\n" + "((10,6),6)\n" + "((7,9),5)\n" +
+			"((11,15),4)\n" + "((20,2),5)\n" + "((15,23),4)\n" + "((21,5),5)\n" +
+			"((17,39),3)\n" + "((11,5),6)\n" + "((28,13),4)\n" + "((14,36),3)";
+
+	private IterateExampleData() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
new file mode 100644
index 0000000..7d0c746
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.join.util;
+
+public class WindowJoinData {
+
+	public static final String GRADES_INPUT = "(john,5)\n" + "(tom,3)\n" + "(alice,1)\n" + "(grace,5)\n" +
+			"(john,4)\n" + "(bob,1)\n" + "(alice,2)\n" + "(alice,3)\n" + "(bob,5)\n" + "(alice,3)\n" + "(tom,5)\n" +
+			"(john,2)\n" + "(john,1)\n" + "(grace,2)\n" + "(jerry,2)\n" + "(tom,4)\n" + "(bob,4)\n" + "(bob,2)\n" +
+			"(tom,2)\n" + "(alice,5)\n" + "(grace,5)\n" + "(grace,1)\n" + "(alice,1)\n" + "(grace,3)\n" + "(tom,1)\n" +
+			"(jerry,5)\n" + "(john,3)\n" + "(john,4)\n" + "(john,1)\n" + "(jerry,3)\n" + "(grace,3)\n" + "(bob,3)\n" +
+			"(john,3)\n" + "(jerry,4)\n" + "(tom,5)\n" + "(tom,4)\n" + "(john,2)\n" + "(jerry,1)\n" + "(bob,1)\n" +
+			"(john,5)\n" + "(grace,4)\n" + "(tom,5)\n" + "(john,4)\n" + "(tom,1)\n" + "(grace,1)\n" + "(john,2)\n" +
+			"(jerry,3)\n" + "(jerry,5)\n" + "(tom,2)\n" + "(tom,2)\n" + "(alice,4)\n" + "(tom,4)\n" + "(jerry,4)\n" +
+			"(john,3)\n" + "(grace,4)\n" + "(tom,3)\n" + "(jerry,4)\n" + "(john,5)\n" + "(john,4)\n" + "(jerry,1)\n" +
+			"(john,5)\n" + "(alice,2)\n" + "(tom,1)\n" + "(alice,5)\n" + "(grace,4)\n" + "(bob,4)\n" + "(jerry,1)\n" +
+			"(john,5)\n" + "(tom,4)\n" + "(tom,5)\n" + "(jerry,5)\n" + "(tom,1)\n" + "(grace,3)\n" + "(bob,5)\n" +
+			"(john,1)\n" + "(alice,1)\n" + "(grace,3)\n" + "(grace,1)\n" + "(jerry,1)\n" + "(jerry,4)\n" +
+			"(bob,4)\n" + "(alice,3)\n" + "(tom,5)\n" + "(alice,4)\n" + "(alice,4)\n" + "(grace,4)\n" + "(john,5)\n" +
+			"(john,5)\n" + "(grace,4)\n" + "(tom,4)\n" + "(john,4)\n" + "(john,5)\n" + "(alice,5)\n" + "(jerry,5)\n" +
+			"(john,3)\n" + "(tom,5)\n" + "(jerry,4)\n" + "(grace,4)\n" + "(john,3)\n" + "(bob,2)";
+
+	public static final String SALARIES_INPUT = "(john,6469)\n" + "(jerry,6760)\n" + "(jerry,8069)\n" +
+			"(tom,3662)\n" + "(grace,8427)\n" + "(john,9425)\n" + "(bob,9018)\n" + "(john,352)\n" + "(tom,3770)\n" +
+			"(grace,7622)\n" + "(jerry,7441)\n" + "(alice,1468)\n" + "(bob,5472)\n" + "(grace,898)\n" +
+			"(tom,3849)\n" + "(grace,1865)\n" + "(alice,5582)\n" + "(john,9511)\n" + "(alice,1541)\n" +
+			"(john,2477)\n" + "(grace,3561)\n" + "(john,1670)\n" + "(grace,7290)\n" + "(grace,6565)\n" +
+			"(tom,6179)\n" + "(tom,1601)\n" + "(john,2940)\n" + "(bob,4685)\n" + "(bob,710)\n" + "(bob,5936)\n" +
+			"(jerry,1412)\n" + "(grace,6515)\n" + "(grace,3321)\n" + "(tom,8088)\n" + "(john,2876)\n" +
+			"(bob,9896)\n" + "(grace,7368)\n" + "(grace,9749)\n" + "(bob,2048)\n" + "(alice,4782)\n" +
+			"(alice,3375)\n" + "(tom,5841)\n" + "(bob,958)\n" + "(bob,5258)\n" + "(tom,3935)\n" + "(jerry,4394)\n" +
+			"(alice,102)\n" + "(alice,4931)\n" + "(alice,5240)\n" + "(jerry,7951)\n" + "(john,5675)\n" +
+			"(bob,609)\n" + "(alice,5997)\n" + "(jerry,9651)\n" + "(alice,1328)\n" + "(bob,1022)\n" +
+			"(grace,2578)\n" + "(jerry,9704)\n" + "(tom,4476)\n" + "(grace,3784)\n" + "(alice,6144)\n" +
+			"(bob,6213)\n" + "(alice,7525)\n" + "(jerry,2908)\n" + "(grace,8464)\n" + "(jerry,9920)\n" +
+			"(bob,3720)\n" + "(bob,7612)\n" + "(alice,7211)\n" + "(jerry,6484)\n" + "(alice,1711)\n" +
+			"(jerry,5994)\n" + "(grace,928)\n" + "(jerry,2492)\n" + "(grace,9080)\n" + "(tom,4330)\n" +
+			"(bob,8302)\n" + "(john,4981)\n" + "(tom,1781)\n" + "(grace,1379)\n" + "(jerry,3700)\n" +
+			"(jerry,3584)\n" + "(jerry,2038)\n" + "(jerry,3902)\n" + "(tom,1336)\n" + "(jerry,7500)\n" +
+			"(tom,3648)\n" + "(alice,2533)\n" + "(tom,8685)\n" + "(bob,3968)\n" + "(tom,3241)\n" + "(bob,7461)\n" +
+			"(jerry,2138)\n" + "(alice,7503)\n" + "(alice,6424)\n" + "(tom,140)\n" + "(john,9802)\n" +
+			"(grace,2977)\n" + "(grace,889)\n" + "(john,1338)";
+
+	public static final String WINDOW_JOIN_RESULTS = "(bob,2,9018)\n" + "(bob,2,5472)\n" + "(bob,2,4685)\n" +
+			"(bob,2,710)\n" + "(bob,2,5936)\n" + "(bob,2,9896)\n" + "(bob,2,2048)\n" + "(bob,2,958)\n" +
+			"(bob,2,5258)\n" + "(bob,2,609)\n" + "(bob,2,1022)\n" + "(bob,2,6213)\n" + "(bob,2,3720)\n" +
+			"(bob,2,7612)\n" + "(bob,2,8302)\n" + "(bob,2,3968)\n" + "(bob,2,7461)";
+
+	private WindowJoinData() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
new file mode 100644
index 0000000..dedc5ee
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.ml.util;
+
+public class IncrementalLearningSkeletonData {
+
+	public static final String RESULTS = "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n"
+			+ "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"
+			+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
+			"1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
+			"1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
+			"1\n";
+
+	private IncrementalLearningSkeletonData() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
index 0e7976c..f0eb753 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
@@ -252,4 +252,31 @@ public class TwitterStreamData {
 			"{\"created_at\":\"Wed Oct 01 15:40:10 +0000 2014\",\"id\":517338189300645888,\"id_str\":\"517338189300645888\",\"text\":\"\\uadc0\\uc5ec\\uc6e4\\u314b\\u314b\\u314b\\u314b\\u314b\\u314b\\u314b\\u314b\\u314b\\u314b\\u314b\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/download\\/android\\\" rel=\\\"nofollow\\\"\\u003eTwitter for Android\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":2751941412,\"id_str\":\"2751941412\",\"name\":\"BJ\",\"screen_name\":\"beuljin\",\"location\":\"\",\"url\":\"http:\\/\\/twpf.jp\\/beuljin\",\"description\":\"\\ud504\\uc0ac\\ub294 \\uc580\\ub2d8\\uc774 \\uadf8\\ub824\\uc900 \\uc9c4\\uc9dc\\uc5c4\\uccad\\uc815\\ub9d0\\uc9f1\\uc9f1 \\uc798\\uc0dd\\uae30\\uace0 \\uba4b\\uc788\\uace0 \\uadc0\\uc5fd\\uace0 \\uc774\\uc05c \\ud788\\uc5b4\\ub85c\",\"protected\":false,\"verified\":
 false,\"followers_count\":6,\"friends_count\":17,\"listed_count\":0,\"favourites_count\":26,\"statuses_count\":423,\"created_at\":\"Thu Aug 21 11:51:58 +0000 2014\",\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"lang\":\"ko\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"B2DFDA\",\"profile_background_image_url\":\"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme13\\/bg.gif\",\"profile_background_image_url_https\":\"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme13\\/bg.gif\",\"profile_background_tile\":false,\"profile_link_color\":\"00B2B8\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"FFFFFF\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/515876874082197504\\/u_gw6uoS_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/515876874082197504\\/u_gw6uoS_normal.jpeg\
 ",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/2751941412\\/1410318404\",\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"trends\":[],\"urls\":[],\"user_mentions\":[],\"symbols\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"ko\",\"timestamp_ms\":\"1412178010689\"}",
 			"{\"created_at\":\"Wed Oct 01 15:40:10 +0000 2014\",\"id\":517338189304836096,\"id_str\":\"517338189304836096\",\"text\":\"RT @rukdd: \\u0e40\\u0e27\\u0e25\\u0e32\\u0e21\\u0e35\\u0e41\\u0e08\\u0e49\\u0e07\\u0e40\\u0e15\\u0e37\\u0e2d\\u0e19\\u0e44\\u0e25\\u0e19\\u0e4c\\u0e40\\u0e02\\u0e49\\u0e32\\u0e21\\u0e32 \\u0e40\\u0e23\\u0e32\\u0e01\\u0e47\\u0e2b\\u0e27\\u0e31\\u0e07\\u0e43\\u0e2b\\u0e49\\u0e40\\u0e1b\\u0e47\\u0e19\\u0e41\\u0e01\\u0e19\\u0e30\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/download\\/android\\\" rel=\\\"nofollow\\\"\\u003eTwitter for Android\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":2534725814,\"id_str\":\"2534725814\",\"name\":\"PtrsyMk.           \\u0e49\",\"screen_name\":\"PtrsyMk_pig\",\"location\":\"\",\"url\":null,\"description\":\"\\u0e0a\\u0e37\\u0e48\\u0e2d\\u0e40\\u0
 e2d\\u0e25\\u0e1f\\u0e4c  \\u0e40\\u0e1e\\u0e37\\u0e48\\u0e2d\\u0e19\\u0e40\\u0e23\\u0e35\\u0e22\\u0e01 \\u0e2d\\u0e49\\u0e27\\u0e19 \\u0e2d\\u0e35\",\"protected\":false,\"verified\":false,\"followers_count\":21,\"friends_count\":136,\"listed_count\":0,\"favourites_count\":2905,\"statuses_count\":1851,\"created_at\":\"Fri May 30 10:00:58 +0000 2014\",\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":true,\"lang\":\"th\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_image_url_https\":\"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_tile\":false,\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg
 .com\\/profile_images\\/515551616275996672\\/G7zVgThg_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/515551616275996672\\/G7zVgThg_normal.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/2534725814\\/1411141386\",\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Mon Sep 22 15:05:10 +0000 2014\",\"id\":514067891071627265,\"id_str\":\"514067891071627265\",\"text\":\"\\u0e40\\u0e27\\u0e25\\u0e32\\u0e21\\u0e35\\u0e41\\u0e08\\u0e49\\u0e07\\u0e40\\u0e15\\u0e37\\u0e2d\\u0e19\\u0e44\\u0e25\\u0e19\\u0e4c\\u0e40\\u0e02\\u0e49\\u0e32\\u0e21\\u0e32 \\u0e40\\u0e23\\u0e32\\u0e01\\u0e47\\u0e2b\\u0e27\\u0e31\\u0e07\\u0e43\\u0e2b\\u0e49\\u0e40\\u0e1b\\u0e47\\u0e19\\u0e41\\u0e01\\u0e19\\u0e30\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.c
 om\\/download\\/iphone\\\" rel=\\\"nofollow\\\"\\u003eTwitter for iPhone\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":374499538,\"id_str\":\"374499538\",\"name\":\"rukdd\",\"screen_name\":\"rukdd\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"verified\":false,\"followers_count\":65917,\"friends_count\":19,\"listed_count\":39,\"favourites_count\":6,\"statuses_count\":11763,\"created_at\":\"Fri Sep 16 12:34:29 +0000 2011\",\"utc_offset\":-25200,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"lang\":\"th\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_background_images\\/622183294\\/njxkahnghgkbsv5cmg8x.jpeg\",\"profile_background_image_url_https\":\
 "https:\\/\\/pbs.twimg.com\\/profile_background_images\\/622183294\\/njxkahnghgkbsv5cmg8x.jpeg\",\"profile_background_tile\":false,\"profile_link_color\":\"EB8DB3\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"E2F5DC\",\"profile_text_color\":\"6CAD0A\",\"profile_use_background_image\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/492971634190327808\\/xbxo82GM_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/492971634190327808\\/xbxo82GM_normal.jpeg\",\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":3298,\"favorite_count\":396,\"entities\":{\"hashtags\":[],\"trends\":[],\"urls\":[],\"user_mentions\":[],\"symbols\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"low\",\"lang\":
 \"th\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"trends\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"rukdd\",\"name\":\"rukdd\",\"id\":374499538,\"id_str\":\"374499538\",\"indices\":[3,9]}],\"symbols\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"th\",\"timestamp_ms\":\"1412178010716\"}" 
 	};
+
+	public static final String STREAMING_COUNTS_AS_TUPLES = "(Immigration:,1)\n" + "(Never,1)\n" + "(Report,1)\n" +
+			"(http://t.co/89DNq2IUFK,1)\n" + "(fans,1)\n" + "(celebrating,1)\n" + "(night,1)\n" +
+			"(http://t.co/XrRhhiQE0A”,1)\n" + "(have,1)\n" + "(deep,1)\n" + "(arts,1)\n" + "(Super-cute!,1)\n" +
+			"(#ARTP,1)\n" + "(Tired,1)\n" + "(#hungry,1)\n" + "(95,1)\n" + "(Fast,1)\n" + "(4,1)\n" + "(she,1)\n" +
+			"(want,1)\n" + "(just,1)\n" + "(us.,1)\n" + "(Brooks,1)\n" + "(drops,1)\n" + "(application,1)\n" +
+			"(Sun,1)\n" + "(Elizabeth,1)\n" + "(RT.,1)\n" + "(#Romance,1)\n" + "(Free,1)\n" + "(Movie,1)\n" +
+			"(Girl,1)\n" + "((@GoneGirlMovie),1)\n" + "(TX,1)\n" + "(Code),1)\n" + "(money,1)\n" + "(my,1)\n" +
+			"(dreams,1)\n" + "(money,2)\n" + "(Tired,2)\n" + "(am,2)\n" + "(just,2)\n" + "(my,2)\n" + "(me,2)\n" +
+			"(me,3)\n" + "(am,3)\n" + "(just,3)\n" + "(my,3)\n" + "(am,4)\n" + "(just,4)\n" + "(just,5)\n" +
+			"(my,5)\n" + "(just,6)\n" + "(just,7)\n" + "(70,1)\n" + "(Percent,1)\n" + "(Federal,1)\n" +
+			"(@BD_Lay,1)\n" + "(I,1)\n" + "(Knew,1)\n" + "(\uD83D\uDE02\uD83D\uDE2D,1)\n" + "(“@theawayfans:,1)\n" + "(Roma,1)\n" +
+			"(The,1)\n" + "(Etihad,1)\n" + "(in,1)\n" + "(the,1)\n" + "(ther…,1)\n" + "(a,1)\n" + "(the,2)\n" +
+			"(the,3)\n" + "(the,4)\n" + "(I,4)\n" + "(I,5)\n" + "(the,5)\n" + "(I,6)\n" + "(I,7)\n" + "(the,7)\n" +
+			"(the,8)\n" + "(the,9)\n" + "(the,10)\n" + "(the,11)\n" + "(the,12)\n" + "(the,13)\n" + "(the,14)\n" +
+			"(the,15)\n" + "(the,16)\n" + "(the,17)\n" + "(the,18)\n" + "(the,19)\n" + "(the,20)\n" + "(the,21)\n" +
+			"(the,22)\n" + "(the,23)\n" + "(RT,1)\n" + "(@jennybethm:,1)\n" + "(Illegal,1)\n" + "(Families,1)\n" +
+			"(Agents,1)\n" + "(RT,2)\n" + "(RT,3)\n" + "(RT,4)\n" + "(RT,5)\n" + "(RT,6)\n" + "(RT,7)\n" + "(RT,8)\n" +
+			"(RT,9)\n" + "(RT,10)\n" + "(RT,11)\n" + "(RT,12)\n" + "(RT,13)\n" + "(RT,14)\n" + "(RT,15)\n" +
+			"(RT,16)\n" + "(RT,17)\n" + "(RT,18)\n" + "(RT,19)\n" + "(RT,20)\n" + "(RT,21)\n" + "(of,1)\n" +
+			"(Released,1)\n" + "(Back,1)\n" + "(To,1)\n" + "(#teaparty,1)\n" + "(It,1)\n" + "(goal,1)\n" + "(at,1)\n" +
+			"(lad,1)\n" + "(hat,1)\n" + "(You,1)\n" + "(appreciation,1)\n" + "(of,2)\n" + "(At,2)\n" + "(of,3)\n" +
+			"(of,4)\n" + "(of,5)\n" + "(of,6)\n" + "(of,7)\n" + "(you,7)\n" + "(of,8)\n" + "(of,9)\n" + "(of,10)\n" +
+			"(of,11)\n" + "(of,12)\n" + "(of,13)\n" + "(of,14)";
+
+	private TwitterStreamData() {
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 5fa8689..dc5ce42 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -17,9 +17,6 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -28,11 +25,20 @@ import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 import org.apache.flink.util.Collector;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class SessionWindowing {
 
 	@SuppressWarnings("serial")
 	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(2);
 
 		final List<Tuple3<String, Long, Integer>> input = new ArrayList<Tuple3<String, Long, Integer>>();
 
@@ -57,9 +63,11 @@ public class SessionWindowing {
 							// We sleep three seconds between every output so we
 							// can see whether we properly detect sessions
 							// before the next start for a specific id
-							Thread.sleep(3000);
 							collector.collect(value);
-							System.out.println("Collected: " + value);
+							if (!fileOutput) {
+								System.out.println("Collected: " + value);
+								Thread.sleep(3000);
+							}
 						}
 					}
 
@@ -69,10 +77,16 @@ public class SessionWindowing {
 				});
 
 		// We create sessions for each id with max timeout of 3 time units
-		source.groupBy(0)
+		DataStream<Tuple3<String, Long, Integer>> aggregated = source.groupBy(0)
 				.window(new SessionTriggerPolicy(3L),
 						new TumblingEvictionPolicy<Tuple3<String, Long, Integer>>()).sum(2)
-				.flatten().print();
+				.flatten();
+
+		if (fileOutput) {
+			aggregated.writeAsText(outputPath);
+		} else {
+			aggregated.print();
+		}
 
 		env.execute();
 	}
@@ -115,7 +129,7 @@ public class SessionWindowing {
 			// belongs to a different group
 
 			if (timeSinceLastEvent > sessionTimeout) {
-				return new Object[] { datapoint };
+				return new Object[]{datapoint};
 			} else {
 				return null;
 			}
@@ -127,4 +141,27 @@ public class SessionWindowing {
 		}
 
 	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			if (args.length == 1) {
+				fileOutput = true;
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: SessionWindowing <result path>");
+				return false;
+			}
+		}
+		return true;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
new file mode 100644
index 0000000..bb4a123
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.util;
+
+public class SessionWindowingData {
+
+	public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + "(c,11,1)\n" + "(b,5,3)\n" +
+			"(a,10,1)";
+
+	private SessionWindowingData() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
new file mode 100644
index 0000000..c390ec2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.util;
+
+public class TopSpeedWindowingExampleData {
+
+	public static final String CAR_DATA =
+			"(0,55,15.277777777777777,1424951918630)\n" + "(1,45,12.5,1424951918632)\n" +
+					"(0,50,29.166666666666664,1424951919632)\n" + "(1,50,26.38888888888889,1424951919632)\n" +
+					"(0,55,44.44444444444444,1424951920633)\n" + "(1,45,38.888888888888886,1424951920633)\n" +
+					"(0,50,58.33333333333333,1424951921634)\n" + "(1,40,50.0,1424951921634)\n" +
+					"(0,55,73.6111111111111,1424951922634)\n" + "(1,35,59.72222222222222,1424951922634)\n" +
+					"(0,60,90.27777777777777,1424951923634)\n" + "(1,40,70.83333333333333,1424951923634)\n" +
+					"(0,65,108.33333333333333,1424951924635)\n" + "(1,35,80.55555555555554,1424951924635)\n" +
+					"(0,60,125.0,1424951925635)\n" + "(1,40,91.66666666666666,1424951925635)\n" +
+					"(0,55,140.27777777777777,1424951926635)\n" + "(1,45,104.16666666666666,1424951926636)\n" +
+					"(0,60,156.94444444444443,1424951927636)\n" + "(1,50,118.05555555555554,1424951927636)\n" +
+					"(0,55,172.2222222222222,1424951928636)\n" + "(1,45,130.55555555555554,1424951928636)\n" +
+					"(0,50,186.1111111111111,1424951929636)\n" + "(1,50,144.44444444444443,1424951929637)\n" +
+					"(0,55,201.38888888888886,1424951930637)\n" + "(1,55,159.7222222222222,1424951930637)\n" +
+					"(0,60,218.05555555555551,1424951931637)\n" + "(1,60,176.38888888888886,1424951931637)\n" +
+					"(0,55,233.3333333333333,1424951932637)\n" + "(1,65,194.4444444444444,1424951932638)\n" +
+					"(0,50,247.22222222222217,1424951933638)\n" + "(1,70,213.88888888888886,1424951933638)\n" +
+					"(0,45,259.7222222222222,1424951934638)\n" + "(1,65,231.9444444444444,1424951934638)\n" +
+					"(0,50,273.6111111111111,1424951935638)\n" + "(1,70,251.38888888888886,1424951935639)\n" +
+					"(0,55,288.88888888888886,1424951936639)\n" + "(1,75,272.2222222222222,1424951936639)\n" +
+					"(0,50,302.77777777777777,1424951937639)\n" + "(1,70,291.66666666666663,1424951937639)\n" +
+					"(0,45,315.27777777777777,1424951938640)\n" + "(1,65,309.7222222222222,1424951938640)\n" +
+					"(0,50,329.1666666666667,1424951939640)\n" + "(1,70,329.16666666666663,1424951939640)\n" +
+					"(0,55,344.44444444444446,1424951940640)\n" + "(1,65,347.2222222222222,1424951940640)\n" +
+					"(0,50,358.33333333333337,1424951941641)\n" + "(1,70,366.66666666666663,1424951941641)\n" +
+					"(0,55,373.61111111111114,1424951942641)\n" + "(1,65,384.7222222222222,1424951942641)\n" +
+					"(0,50,387.50000000000006,1424951943641)\n" + "(1,70,404.16666666666663,1424951943641)\n" +
+					"(0,45,400.00000000000006,1424951944642)\n" + "(1,65,422.2222222222222,1424951944642)\n" +
+					"(0,50,413.88888888888897,1424951945642)\n" + "(1,60,438.88888888888886,1424951945642)\n" +
+					"(0,45,426.38888888888897,1424951946642)\n" + "(1,65,456.9444444444444,1424951946642)\n" +
+					"(0,40,437.50000000000006,1424951947643)\n" + "(1,70,476.38888888888886,1424951947643)\n" +
+					"(0,45,450.00000000000006,1424951948643)\n" + "(1,75,497.2222222222222,1424951948643)\n" +
+					"(0,40,461.11111111111114,1424951949643)\n" + "(1,80,519.4444444444443,1424951949644)\n" +
+					"(0,45,473.61111111111114,1424951950644)\n" + "(1,75,540.2777777777777,1424951950644)\n" +
+					"(0,50,487.50000000000006,1424951951644)\n" + "(1,80,562.4999999999999,1424951951644)\n" +
+					"(0,45,500.00000000000006,1424951952644)\n" + "(1,85,586.111111111111,1424951952645)\n" +
+					"(0,40,511.11111111111114,1424951953645)\n" + "(1,80,608.3333333333331,1424951953645)\n" +
+					"(0,35,520.8333333333334,1424951954645)\n" + "(1,75,629.1666666666665,1424951954645)\n" +
+					"(0,40,531.9444444444445,1424951955645)\n" + "(1,70,648.611111111111,1424951955646)\n" +
+					"(0,45,544.4444444444445,1424951956646)\n" + "(1,75,669.4444444444443,1424951956646)\n" +
+					"(0,50,558.3333333333334,1424951957646)\n" + "(1,80,691.6666666666665,1424951957646)\n" +
+					"(0,55,573.6111111111112,1424951958646)\n" + "(1,85,715.2777777777776,1424951958647)\n" +
+					"(0,60,590.2777777777778,1424951959647)\n" + "(1,80,737.4999999999998,1424951959647)\n" +
+					"(0,65,608.3333333333334,1424951960647)\n" + "(1,85,761.1111111111109,1424951960647)\n" +
+					"(0,70,627.7777777777778,1424951961647)\n" + "(1,80,783.333333333333,1424951961648)\n" +
+					"(0,75,648.6111111111112,1424951962648)\n" + "(1,85,806.9444444444441,1424951962648)\n" +
+					"(0,80,670.8333333333334,1424951963648)\n" + "(1,90,831.9444444444441,1424951963648)\n" +
+					"(0,75,691.6666666666667,1424951964649)\n" + "(1,95,858.333333333333,1424951964649)\n" +
+					"(0,70,711.1111111111112,1424951965649)\n" + "(1,90,883.333333333333,1424951965649)\n" +
+					"(0,75,731.9444444444446,1424951966649)\n" + "(1,95,909.722222222222,1424951966649)\n" +
+					"(0,70,751.388888888889,1424951967649)\n" + "(1,100,937.4999999999998,1424951967650)\n" +
+					"(0,75,772.2222222222224,1424951968650)\n" + "(1,100,965.2777777777776,1424951968650)\n" +
+					"(0,80,794.4444444444446,1424951969650)\n" + "(1,100,993.0555555555554,1424951969650)\n" +
+					"(0,75,815.2777777777779,1424951970651)\n" + "(1,100,1020.8333333333333,1424951970651)\n" +
+					"(0,80,837.5000000000001,1424951971651)\n" + "(1,100,1048.611111111111,1424951971651)\n" +
+					"(0,85,861.1111111111112,1424951972651)\n" + "(1,100,1076.388888888889,1424951972651)\n" +
+					"(0,80,883.3333333333334,1424951973652)\n" + "(1,95,1102.7777777777778,1424951973652)\n" +
+					"(0,75,904.1666666666667,1424951974652)\n" + "(1,100,1130.5555555555557,1424951974652)\n" +
+					"(0,70,923.6111111111112,1424951975652)\n" + "(1,100,1158.3333333333335,1424951975652)\n" +
+					"(0,75,944.4444444444446,1424951976653)\n" + "(1,100,1186.1111111111113,1424951976653)\n" +
+					"(0,80,966.6666666666667,1424951977653)\n" + "(1,95,1212.5000000000002,1424951977653)\n" +
+					"(0,75,987.5000000000001,1424951978653)\n" + "(1,100,1240.277777777778,1424951978653)\n" +
+					"(0,80,1009.7222222222223,1424951979654)\n" + "(1,100,1268.0555555555559,1424951979654)\n" +
+					"(0,85,1033.3333333333335,1424951980654)\n" + "(1,100,1295.8333333333337,1424951980654)\n" +
+					"(0,90,1058.3333333333335,1424951981654)\n" + "(1,100,1323.6111111111115,1424951981654)\n" +
+					"(0,85,1081.9444444444446,1424951982655)\n" + "(1,100,1351.3888888888894,1424951982655)\n" +
+					"(0,90,1106.9444444444446,1424951983655)\n" + "(1,100,1379.1666666666672,1424951983655)\n" +
+					"(0,95,1133.3333333333335,1424951984655)\n" + "(1,100,1406.944444444445,1424951984656)\n" +
+					"(0,90,1158.3333333333335,1424951985656)\n" + "(1,95,1433.333333333334,1424951985656)\n" +
+					"(0,95,1184.7222222222224,1424951986656)\n" + "(1,90,1458.333333333334,1424951986656)\n" +
+					"(0,90,1209.7222222222224,1424951987656)\n" + "(1,95,1484.7222222222229,1424951987657)\n" +
+					"(0,85,1233.3333333333335,1424951988657)\n" + "(1,90,1509.7222222222229,1424951988657)\n" +
+					"(0,80,1255.5555555555557,1424951989657)\n" + "(1,95,1536.1111111111118,1424951989657)\n" +
+					"(0,85,1279.1666666666667,1424951990657)\n" + "(1,100,1563.8888888888896,1424951990658)\n" +
+					"(0,90,1304.1666666666667,1424951991658)\n" + "(1,95,1590.2777777777785,1424951991658)\n" +
+					"(0,95,1330.5555555555557,1424951992658)\n" + "(1,90,1615.2777777777785,1424951992658)\n" +
+					"(0,100,1358.3333333333335,1424951993659)\n" + "(1,95,1641.6666666666674,1424951993659)\n" +
+					"(0,100,1386.1111111111113,1424951994659)\n" + "(1,100,1669.4444444444453,1424951994659)\n" +
+					"(0,95,1412.5000000000002,1424951995659)\n" + "(1,95,1695.8333333333342,1424951995660)\n" +
+					"(0,100,1440.277777777778,1424951996660)\n" + "(1,90,1720.8333333333342,1424951996660)\n" +
+					"(0,100,1468.0555555555559,1424951997660)\n" + "(1,85,1744.4444444444453,1424951997660)\n" +
+					"(0,95,1494.4444444444448,1424951998660)\n" + "(1,80,1766.6666666666674,1424951998661)\n" +
+					"(0,100,1522.2222222222226,1424951999661)\n" + "(1,75,1787.5000000000007,1424951999661)\n" +
+					"(0,95,1548.6111111111115,1424952000661)\n" + "(1,80,1809.7222222222229,1424952000661)\n" +
+					"(0,90,1573.6111111111115,1424952001662)\n" + "(1,75,1830.555555555556,1424952001662)\n" +
+					"(0,95,1600.0000000000005,1424952002662)\n" + "(1,80,1852.7777777777783,1424952002662)\n" +
+					"(0,100,1627.7777777777783,1424952003662)\n" + "(1,85,1876.3888888888894,1424952003662)\n" +
+					"(0,100,1655.555555555556,1424952004663)\n" + "(1,80,1898.6111111111115,1424952004663)\n" +
+					"(0,95,1681.944444444445,1424952005663)\n" + "(1,85,1922.2222222222226,1424952005663)\n" +
+					"(0,100,1709.7222222222229,1424952006663)\n" + "(1,90,1947.2222222222226,1424952006664)\n" +
+					"(0,100,1737.5000000000007,1424952007664)\n" + "(1,95,1973.6111111111115,1424952007664)\n" +
+					"(0,95,1763.8888888888896,1424952008664)\n" + "(1,90,1998.6111111111115,1424952008664)\n" +
+					"(0,100,1791.6666666666674,1424952009664)\n" + "(1,85,2022.2222222222226,1424952009665)\n" +
+					"(0,95,1818.0555555555563,1424952010665)\n" + "(1,80,2044.4444444444448,1424952010665)\n" +
+					"(0,90,1843.0555555555563,1424952011665)\n" + "(1,75,2065.2777777777783,1424952011665)\n" +
+					"(0,95,1869.4444444444453,1424952012666)\n" + "(1,80,2087.5000000000005,1424952012666)\n" +
+					"(0,100,1897.222222222223,1424952013666)\n" + "(1,85,2111.1111111111118,1424952013666)\n" +
+					"(0,95,1923.611111111112,1424952014666)\n" + "(1,90,2136.1111111111118,1424952014666)\n" +
+					"(0,100,1951.3888888888898,1424952015667)\n" + "(1,85,2159.722222222223,1424952015667)\n" +
+					"(0,95,1977.7777777777787,1424952016667)\n" + "(1,90,2184.722222222223,1424952016667)\n" +
+					"(0,100,2005.5555555555566,1424952017667)\n" + "(1,95,2211.1111111111118,1424952017668)";
+
+	public static final String TOP_SPEEDS =
+			"(0,55,44.44444444444444,1424951920633)\n" + "(1,40,50.0,1424951921634)\n" +
+					"(0,65,108.33333333333333,1424951924635)\n" + "(1,45,104.16666666666666,1424951926636)\n" +
+					"(0,55,172.2222222222222,1424951928636)\n" + "(1,55,159.7222222222222,1424951930637)\n" +
+					"(0,55,233.3333333333333,1424951932637)\n" + "(1,70,213.88888888888886,1424951933638)\n" +
+					"(0,55,288.88888888888886,1424951936639)\n" + "(1,75,272.2222222222222,1424951936639)\n" +
+					"(1,70,329.16666666666663,1424951939640)\n" + "(0,55,344.44444444444446,1424951940640)\n" +
+					"(1,65,384.7222222222222,1424951942641)\n" + "(0,45,400.00000000000006,1424951944642)\n" +
+					"(1,60,438.88888888888886,1424951945642)\n" + "(1,75,497.2222222222222,1424951948643)\n" +
+					"(0,40,461.11111111111114,1424951949643)\n" + "(1,80,562.4999999999999,1424951951644)\n" +
+					"(0,35,520.8333333333334,1424951954645)\n" + "(1,75,629.1666666666665,1424951954645)\n" +
+					"(1,80,691.6666666666665,1424951957646)\n" + "(0,55,573.6111111111112,1424951958646)\n" +
+					"(1,85,761.1111111111109,1424951960647)\n" + "(0,70,627.7777777777778,1424951961647)\n" +
+					"(1,90,831.9444444444441,1424951963648)\n" + "(0,75,691.6666666666667,1424951964649)\n" +
+					"(1,90,883.333333333333,1424951965649)\n" + "(0,70,751.388888888889,1424951967649)\n" +
+					"(1,100,937.4999999999998,1424951967650)\n" + "(1,100,993.0555555555554,1424951969650)\n" +
+					"(0,75,815.2777777777779,1424951970651)\n" + "(1,100,1048.611111111111,1424951971651)\n" +
+					"(0,80,883.3333333333334,1424951973652)\n" + "(1,95,1102.7777777777778,1424951973652)\n" +
+					"(1,100,1158.3333333333335,1424951975652)\n" + "(0,75,944.4444444444446,1424951976653)\n" +
+					"(1,95,1212.5000000000002,1424951977653)\n" + "(0,80,1009.7222222222223,1424951979654)\n" +
+					"(1,100,1268.0555555555559,1424951979654)\n" + "(1,100,1323.6111111111115,1424951981654)\n" +
+					"(0,85,1081.9444444444446,1424951982655)\n" + "(1,100,1379.1666666666672,1424951983655)\n" +
+					"(0,95,1133.3333333333335,1424951984655)\n" + "(1,95,1433.333333333334,1424951985656)\n" +
+					"(0,95,1184.7222222222224,1424951986656)\n" + "(1,95,1484.7222222222229,1424951987657)\n" +
+					"(0,80,1255.5555555555557,1424951989657)\n" + "(1,95,1536.1111111111118,1424951989657)\n" +
+					"(0,90,1304.1666666666667,1424951991658)\n" + "(1,95,1590.2777777777785,1424951991658)\n" +
+					"(0,100,1358.3333333333335,1424951993659)\n" + "(1,95,1641.6666666666674,1424951993659)\n" +
+					"(0,95,1412.5000000000002,1424951995659)\n" + "(1,95,1695.8333333333342,1424951995660)\n" +
+					"(0,100,1468.0555555555559,1424951997660)\n" + "(1,80,1766.6666666666674,1424951998661)\n" +
+					"(0,100,1522.2222222222226,1424951999661)\n" + "(0,90,1573.6111111111115,1424952001662)\n" +
+					"(1,75,1830.555555555556,1424952001662)\n" + "(0,100,1627.7777777777783,1424952003662)\n" +
+					"(1,80,1898.6111111111115,1424952004663)\n" + "(0,95,1681.944444444445,1424952005663)\n" +
+					"(1,90,1947.2222222222226,1424952006664)\n" + "(0,100,1737.5000000000007,1424952007664)\n" +
+					"(0,100,1791.6666666666674,1424952009664)\n" + "(1,85,2022.2222222222226,1424952009665)\n" +
+					"(0,90,1843.0555555555563,1424952011665)\n" + "(1,80,2087.5000000000005,1424952012666)\n" +
+					"(0,100,1897.222222222223,1424952013666)\n" + "(0,100,1951.3888888888898,1424952015667)\n" +
+					"(1,85,2159.722222222223,1424952015667)\n" + "(0,100,2005.5555555555566,1424952017667)\n" +
+					"(1,95,2211.1111111111118,1424952017668)";
+
+	private TopSpeedWindowingExampleData() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/iteration/IterateExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/iteration/IterateExampleITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/iteration/IterateExampleITCase.java
new file mode 100644
index 0000000..7c971be
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/iteration/IterateExampleITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.test.iteration;
+
+import org.apache.flink.streaming.examples.iteration.IterateExample;
+import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class IterateExampleITCase extends StreamingProgramTestBase {
+
+
+	protected String inputPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(IterateExampleData.RESULTS, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		IterateExample.main(new String[]{inputPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
new file mode 100644
index 0000000..ddab597
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.test.join;
+
+import org.apache.flink.streaming.examples.join.WindowJoin;
+import org.apache.flink.streaming.examples.join.util.WindowJoinData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class WindowJoinITCase extends StreamingProgramTestBase {
+
+	protected String gradesPath;
+	protected String salariesPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		setParallelism(1);
+		gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
+		salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WindowJoinData.WINDOW_JOIN_RESULTS, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/ml/IncrementalLearningSkeletonITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/ml/IncrementalLearningSkeletonITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/ml/IncrementalLearningSkeletonITCase.java
new file mode 100644
index 0000000..d10aacd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/ml/IncrementalLearningSkeletonITCase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.test.ml;
+
+import org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton;
+import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class IncrementalLearningSkeletonITCase extends StreamingProgramTestBase {
+
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		IncrementalLearningSkeleton.main(new String[]{resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java
new file mode 100644
index 0000000..b16a85f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/socket/SocketTextStreamWordCountITCase.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.test.socket;
+
+import org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.junit.Assert;
+
+import java.io.PrintWriter;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class SocketTextStreamWordCountITCase extends StreamingProgramTestBase {
+
+	private static final String HOST = "localhost";
+	private static final String PORT = "9999";
+	protected String resultPath;
+
+	private ServerSocket temporarySocket;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		temporarySocket = createSocket(HOST, Integer.valueOf(PORT), WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+		temporarySocket.close();
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		SocketTextStreamWordCount.main(new String[]{HOST, PORT, resultPath});
+	}
+
+	public ServerSocket createSocket(String host, int port, String contents) throws Exception {
+		ServerSocket serverSocket = new ServerSocket(port);
+		ServerThread st = new ServerThread(serverSocket, contents);
+		st.start();
+		return serverSocket;
+	}
+
+	private static class ServerThread extends Thread {
+
+		private ServerSocket serverSocket;
+		private String contents;
+		private Thread t;
+
+		public ServerThread(ServerSocket serverSocket, String contents) {
+			this.serverSocket = serverSocket;
+			this.contents = contents;
+			t = new Thread(this);
+		}
+
+		public void waitForAccept() throws Exception {
+			Socket socket = serverSocket.accept();
+			PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
+			writer.println(contents);
+			writer.close();
+			socket.close();
+		}
+
+		public void run() {
+			try {
+				waitForAccept();
+			} catch (Exception e) {
+				Assert.fail();
+			}
+		}
+
+		@Override
+		public void start() {
+			t.start();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/twitter/TwitterStreamITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/twitter/TwitterStreamITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/twitter/TwitterStreamITCase.java
new file mode 100644
index 0000000..1dc5eb5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/twitter/TwitterStreamITCase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.test.twitter;
+
+import org.apache.flink.streaming.examples.twitter.TwitterStream;
+import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class TwitterStreamITCase extends StreamingProgramTestBase {
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(TwitterStreamData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		TwitterStream.main(new String[]{resultPath});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java
new file mode 100644
index 0000000..5e332f1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/SessionWindowingITCase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.test.windowing;
+
+import org.apache.flink.streaming.examples.windowing.SessionWindowing;
+import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class SessionWindowingITCase extends StreamingProgramTestBase {
+
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		setParallelism(2);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		SessionWindowing.main(new String[]{resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/464e7828/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
new file mode 100644
index 0000000..f973bd1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.test.windowing;
+
+import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
+import org.apache.flink.streaming.examples.windowing.TopSpeedWindowingExample;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		setParallelism(1);
+		textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		TopSpeedWindowingExample.main(new String[]{textPath, resultPath});
+
+	}
+}