You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:21 UTC

[05/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
new file mode 100644
index 0000000..591ef51
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.wordcount;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example shows an implementation of WordCount without using the Tuple2
+ * type, but a custom class.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>use POJO data types,
+ * <li>write a simple Flink program,
+ * <li>write and use user-defined functions. 
+ * </ul>
+ */
+public class PojoExample {
+	
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> text = getTextDataStream(env);
+
+		DataStream<Word> counts =
+		// split up the lines into Word objects
+		text.flatMap(new Tokenizer())
+		// group by the field word and sum up the frequency
+				.keyBy("word").sum("frequency");
+
+		if (fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("WordCount Pojo Example");
+	}
+
+	// *************************************************************************
+	// DATA TYPES
+	// *************************************************************************
+
+	/**
+	 * This is the POJO (Plain Old Java Object) that is being used for all the
+	 * operations. As long as all fields are public or have a getter/setter, the
+	 * system can handle them
+	 */
+	public static class Word {
+
+		private String word;
+		private Integer frequency;
+
+		public Word() {
+		}
+
+		public Word(String word, int i) {
+			this.word = word;
+			this.frequency = i;
+		}
+
+		public String getWord() {
+			return word;
+		}
+
+		public void setWord(String word) {
+			this.word = word;
+		}
+
+		public Integer getFrequency() {
+			return frequency;
+		}
+
+		public void setFrequency(Integer frequency) {
+			this.frequency = frequency;
+		}
+
+		@Override
+		public String toString() {
+			return "(" + word + "," + frequency + ")";
+		}
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a
+	 * user-defined FlatMapFunction. The function takes a line (String) and
+	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+	 * Integer>).
+	 */
+	public static final class Tokenizer implements FlatMapFunction<String, Word> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<Word> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Word(token, 1));
+				}
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: PojoExample <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing PojoExample example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: PojoExample <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return env.fromElements(WordCountData.WORDS);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
new file mode 100644
index 0000000..a594c94
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.wordcount;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence
+ * histogram over text files in a streaming fashion.
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a simple Flink Streaming program,
+ * <li>use tuple data types,
+ * <li>write and use user-defined functions.
+ * </ul>
+ * 
+ */
+public class WordCount {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> text = getTextDataStream(env);
+
+		DataStream<Tuple2<String, Integer>> counts =
+		// split up the lines in pairs (2-tuples) containing: (word,1)
+		text.flatMap(new Tokenizer())
+		// group by the tuple field "0" and sum up tuple field "1"
+				.keyBy(0).sum(1);
+
+		// emit result
+		if (fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("Streaming WordCount");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a
+	 * user-defined FlatMapFunction. The function takes a line (String) and
+	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+	 * Integer>).
+	 */
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
+				throws Exception {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return env.fromElements(WordCountData.WORDS);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
new file mode 100644
index 0000000..42484e8
--- /dev/null
+++ b/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.join
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.Stream._
+import scala.language.postfixOps
+import scala.util.Random
+
+object WindowJoin {
+
+  // *************************************************************************
+  // PROGRAM
+  // *************************************************************************
+
+  case class Grade(time: Long, name: String, grade: Int)
+  case class Salary(time: Long, name: String, salary: Int)
+  case class Person(name: String, grade: Int, salary: Int)
+
+  def main(args: Array[String]) {
+
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    //Create streams for grades and salaries by mapping the inputs to the corresponding objects
+    val grades = setGradesInput(env)
+    val salaries = setSalariesInput(env)
+
+    //Join the two input streams by name on the last 2 seconds every second and create new
+    //Person objects containing both grade and salary
+    val joined = grades.join(salaries)
+        .where(_.name)
+        .equalTo(_.name)
+        .window(SlidingTimeWindows.of(Time.of(2, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)))
+        .apply { (g, s) => Person(g.name, g.grade, s.salary) }
+
+    if (fileOutput) {
+      joined.writeAsText(outputPath)
+    } else {
+      joined.print()
+    }
+
+    env.execute("WindowJoin")
+  }
+
+  // *************************************************************************
+  // USER FUNCTIONS
+  // *************************************************************************
+
+  val names = Array("tom", "jerry", "alice", "bob", "john", "grace")
+  val gradeCount = 5
+  val salaryMax = 10000
+  val sleepInterval = 100
+  
+  def gradeStream: Stream[(Long, String, Int)] = {
+    def gradeMapper(names: Array[String])(x: Int): (Long, String, Int) =
+      {
+        if (x % sleepInterval == 0) Thread.sleep(sleepInterval)
+        (System.currentTimeMillis(),names(Random.nextInt(names.length)),Random.nextInt(gradeCount))
+      }
+    range(1, 100).map(gradeMapper(names))
+  }
+
+  def salaryStream: Stream[(Long, String, Int)] = {
+    def salaryMapper(x: Int): (Long, String, Int) =
+      {
+        if (x % sleepInterval == 0) Thread.sleep(sleepInterval)
+        (System.currentTimeMillis(), names(Random.nextInt(names.length)), Random.nextInt(salaryMax))
+      }
+    range(1, 100).map(salaryMapper)
+  }
+
+  def parseMap(line : String): (Long, String, Int) = {
+    val record = line.substring(1, line.length - 1).split(",")
+    (record(0).toLong, record(1), record(2).toInt)
+  }
+
+  // *************************************************************************
+  // UTIL METHODS
+  // *************************************************************************
+
+  private var fileInput: Boolean = false
+  private var fileOutput: Boolean = false
+
+  private var gradesPath: String = null
+  private var salariesPath: String = null
+  private var outputPath: String = null
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      if (args.length == 1) {
+        fileOutput = true
+        outputPath = args(0)
+      }
+      else if (args.length == 3) {
+        fileInput = true
+        fileOutput = true
+        gradesPath = args(0)
+        salariesPath = args(1)
+        outputPath = args(2)
+      } else {
+        System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> " +
+          "<input path 2> <result path>")
+        return false
+      }
+    } else {
+      System.out.println("Executing WindowJoin with generated data.")
+      System.out.println("  Provide parameter to write to file.")
+      System.out.println("  Usage: WindowJoin <result path>")
+    }
+    true
+  }
+
+  private def setGradesInput(env: StreamExecutionEnvironment) : DataStream[Grade] = {
+    if (fileInput) {
+      env.readTextFile(gradesPath).map(parseMap _ ).map(x => Grade(x._1, x._2, x._3))
+    } else {
+      env.fromCollection(gradeStream).map(x => Grade(x._1, x._2, x._3))
+    }
+  }
+
+  private def setSalariesInput(env: StreamExecutionEnvironment) : DataStream[Salary] = {
+    if (fileInput) {
+      env.readTextFile(salariesPath).map(parseMap _).map(x => Salary(x._1, x._2, x._3))
+    }
+    else {
+      env.fromCollection(salaryStream).map(x => Salary(x._1, x._2, x._3))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala b/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
new file mode 100644
index 0000000..9ec17d4
--- /dev/null
+++ b/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.socket
+
+import org.apache.flink.streaming.api.scala._
+
+import scala.language.postfixOps
+
+/**
+ * This example shows an implementation of WordCount with data from a text socket. 
+ * To run the example make sure that the service providing the text data is already up and running.
+ *
+ * To start an example socket text stream on your local machine run netcat from a command line, 
+ * where the parameter specifies the port number:
+ *
+ * {{{
+ *   nc -lk 9999
+ * }}}
+ *
+ * Usage:
+ * {{{
+ *   SocketTextStreamWordCount <hostname> <port> <output path>
+ * }}}
+ *
+ * This example shows how to:
+ *
+ *   - use StreamExecutionEnvironment.socketTextStream
+ *   - write a simple Flink Streaming program in scala.
+ *   - write and use user-defined functions.
+ */
+object SocketTextStreamWordCount {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Create streams for names and ages by mapping the inputs to the corresponding objects
+    val text = env.socketTextStream(hostName, port)
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+      .map { (_, 1) }
+      .keyBy(0)
+      .sum(1)
+
+    if (fileOutput) {
+      counts.writeAsText(outputPath, 1)
+    } else {
+      counts print
+    }
+
+    env.execute("Scala SocketTextStreamWordCount Example")
+  }
+
+  private def parseParameters(args: Array[String]): Boolean = {
+      if (args.length == 3) {
+        fileOutput = true
+        hostName = args(0)
+        port = args(1).toInt
+        outputPath = args(2)
+      } else if (args.length == 2) {
+        hostName = args(0)
+        port = args(1).toInt
+      } else {
+        System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]")
+        return false
+      }
+    true
+  }
+
+  private var fileOutput: Boolean = false
+  private var hostName: String = null
+  private var port: Int = 0
+  private var outputPath: String = null
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
new file mode 100644
index 0000000..f26f32c
--- /dev/null
+++ b/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
+
+import scala.Stream._
+import scala.math._
+import scala.language.postfixOps
+import scala.util.Random
+
+/**
+ * An example of grouped stream windowing where different eviction and 
+ * trigger policies can be used. A source fetches events from cars 
+ * every 1 sec containing their id, their current speed (kmh),
+ * overall elapsed distance (m) and a timestamp. The streaming
+ * example triggers the top speed of each car every x meters elapsed 
+ * for the last y seconds.
+ */
+object TopSpeedWindowing {
+
+  // *************************************************************************
+  // PROGRAM
+  // *************************************************************************
+
+  case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long)
+
+  val numOfCars = 2
+  val evictionSec = 10
+  val triggerMeters = 50d
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val cars = setCarsInput(env)
+
+    val topSeed = cars
+      .assignAscendingTimestamps( _.time )
+      .keyBy("carId")
+      .window(GlobalWindows.create)
+      .evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS)))
+      .trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] {
+        def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = newSp.distance - oldSp.distance
+      }))
+//      .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
+//      .every(Delta.of[CarEvent](triggerMeters,
+//          (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
+      .maxBy("speed")
+
+    if (fileOutput) {
+      topSeed.writeAsText(outputPath)
+    } else {
+      topSeed.print
+    }
+
+    env.execute("TopSpeedWindowing")
+
+  }
+
+  // *************************************************************************
+  // USER FUNCTIONS
+  // *************************************************************************
+
+  def genCarStream(): Stream[CarEvent] = {
+
+    def nextSpeed(carEvent : CarEvent) : CarEvent =
+    {
+      val next =
+        if (Random.nextBoolean) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5)
+      CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis)
+    }
+    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
+    {
+      Thread.sleep(1000)
+      speeds.append(carStream(speeds.map(nextSpeed)))
+    }
+    carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
+  }
+
+  def parseMap(line : String): (Int, Int, Double, Long) = {
+    val record = line.substring(1, line.length - 1).split(",")
+    (record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
+  }
+
+  // *************************************************************************
+  // UTIL METHODS
+  // *************************************************************************
+
+  var fileInput = false
+  var fileOutput = false
+  var inputPath : String = null
+  var outputPath : String = null
+
+  def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      if (args.length == 2) {
+        fileInput = true
+        fileOutput = true
+        inputPath = args(0)
+        outputPath = args(1)
+        true
+      } else {
+        System.err.println("Usage: TopSpeedWindowing <input path> <output path>")
+        false
+      }
+    } else {
+      true
+    }
+  }
+
+  private def setCarsInput(env: StreamExecutionEnvironment) : DataStream[CarEvent] = {
+    if (fileInput) {
+      env.readTextFile(inputPath).map(parseMap(_)).map(x => CarEvent(x._1, x._2, x._3, x._4))
+    } else {
+      env.fromCollection(genCarStream())
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java
new file mode 100644
index 0000000..07d6766
--- /dev/null
+++ b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.iteration;
+
+import org.apache.flink.streaming.examples.iteration.IterateExample;
+import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class IterateExampleITCase extends StreamingProgramTestBase {
+
+
+	protected String inputPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(IterateExampleData.RESULTS, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		IterateExample.main(new String[]{inputPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
new file mode 100644
index 0000000..e657b67
--- /dev/null
+++ b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.join;
+
+import org.apache.flink.streaming.examples.join.WindowJoin;
+import org.apache.flink.streaming.examples.join.util.WindowJoinData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class WindowJoinITCase extends StreamingProgramTestBase {
+
+	protected String gradesPath;
+	protected String salariesPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
+		salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		// since the two sides of the join might have different speed
+		// the exact output can not be checked just whether it is well-formed
+		// checks that the result lines look like e.g. (bob, 2, 2015)
+		checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java
new file mode 100644
index 0000000..83569dc
--- /dev/null
+++ b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.ml;
+
+import org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton;
+import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class IncrementalLearningSkeletonITCase extends StreamingProgramTestBase {
+
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		IncrementalLearningSkeleton.main(new String[]{resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
new file mode 100644
index 0000000..838834b
--- /dev/null
+++ b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.socket;
+
+import org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount;
+import org.apache.flink.streaming.util.SocketProgramITCaseBase;
+
+public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
+
+	@Override
+	protected void testProgram() throws Exception {
+		SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath});
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/twitter/TwitterStreamITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/twitter/TwitterStreamITCase.java b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/twitter/TwitterStreamITCase.java
new file mode 100644
index 0000000..7850082
--- /dev/null
+++ b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/twitter/TwitterStreamITCase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.twitter;
+
+import org.apache.flink.streaming.examples.twitter.TwitterStream;
+import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class TwitterStreamITCase extends StreamingProgramTestBase {
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(TwitterStreamData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		TwitterStream.main(new String[]{resultPath});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/SessionWindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/SessionWindowingITCase.java b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/SessionWindowingITCase.java
new file mode 100644
index 0000000..7f46be9
--- /dev/null
+++ b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/SessionWindowingITCase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.windowing;
+
+import org.apache.flink.streaming.examples.windowing.SessionWindowing;
+import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class SessionWindowingITCase extends StreamingProgramTestBase {
+
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		SessionWindowing.main(new String[]{resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
new file mode 100644
index 0000000..37812c9
--- /dev/null
+++ b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.windowing;
+
+import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing;
+import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
+	
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		setParallelism(1); //needed to ensure total ordering for windows
+		textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		TopSpeedWindowing.main(new String[]{textPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/WindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/WindowWordCountITCase.java b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/WindowWordCountITCase.java
new file mode 100644
index 0000000..e7cce60
--- /dev/null
+++ b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/WindowWordCountITCase.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.windowing;
+
+import org.apache.flink.streaming.examples.windowing.WindowWordCount;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class WindowWordCountITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+	protected String windowSize = "250";
+	protected String slideSize = "150";
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		// since the parallel tokenizers might have different speed
+		// the exact output can not be checked just whether it is well-formed
+		// checks that the result lines look like e.g. (faust, 2)
+		checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WindowWordCount.main(new String[]{textPath, resultPath, windowSize, slideSize});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/PojoExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/PojoExampleITCase.java b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/PojoExampleITCase.java
new file mode 100644
index 0000000..6e3c213
--- /dev/null
+++ b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/PojoExampleITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.wordcount;
+
+import org.apache.flink.streaming.examples.wordcount.PojoExample;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class PojoExampleITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		PojoExample.main(new String[]{textPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/WordCountITCase.java b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/WordCountITCase.java
new file mode 100644
index 0000000..fcf568e
--- /dev/null
+++ b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/WordCountITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.wordcount;
+
+import org.apache.flink.streaming.examples.wordcount.WordCount;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class WordCountITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WordCount.main(new String[]{textPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
new file mode 100644
index 0000000..08ce890
--- /dev/null
+++ b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleScalaPrograms.join;
+
+import org.apache.flink.streaming.scala.examples.join.WindowJoin;
+import org.apache.flink.streaming.examples.join.util.WindowJoinData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class WindowJoinITCase extends StreamingProgramTestBase {
+
+	protected String gradesPath;
+	protected String salariesPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
+		salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		// since the two sides of the join might have different speed
+		// the exact output can not be checked just whether it is well-formed
+		// checks that the result lines look like e.g. Person(bob, 2, 2015)
+		checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
new file mode 100644
index 0000000..b3629ad
--- /dev/null
+++ b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleScalaPrograms.socket;
+
+import org.apache.flink.streaming.scala.examples.socket.SocketTextStreamWordCount;
+import org.apache.flink.streaming.util.SocketProgramITCaseBase;
+
+public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
+
+	@Override
+	protected void testProgram() throws Exception {
+		SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath});
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
new file mode 100644
index 0000000..ef4e47f
--- /dev/null
+++ b/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleScalaPrograms.windowing;
+
+import org.apache.flink.streaming.scala.examples.windowing.TopSpeedWindowing;
+import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		setParallelism(1); //needed to ensure total ordering for windows
+		textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_CASE_CLASS_SPEEDS, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		TopSpeedWindowing.main(new String[]{textPath, resultPath});
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
new file mode 100644
index 0000000..25d4031
--- /dev/null
+++ b/flink-streaming-java/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-streaming-java</artifactId>
+	<name>flink-streaming-java</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-math</artifactId>
+			<version>2.2</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.sling</groupId>
+			<artifactId>org.apache.sling.commons.json</artifactId>
+			<version>2.0.6</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+            <version>3.5</version>
+        </dependency>
+
+    </dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- disable fork reuse for the streaming project, because of
+			incorrect declaration of tests -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<reuseForks>false</reuseForks>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
new file mode 100644
index 0000000..db46d00
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+/**
+ * The checkpointing mode defines what consistency guarantees the system gives in the presence of
+ * failures.
+ * 
+ * <p>When checkpointing is activated, the data streams are replayed such that lost parts of the
+ * processing are repeated. For stateful operations and functions, the checkpointing mode defines
+ * whether the system draws checkpoints such that a recovery behaves as if the operators/functions
+ * see each record "exactly once" ({@link #EXACTLY_ONCE}), or whether the checkpoints are drawn
+ * in a simpler fashion that typically encounteres some duplicates upon recovery
+ * ({@link #AT_LEAST_ONCE})</p> 
+ */
+public enum CheckpointingMode {
+
+	/**
+	 * Sets the checkpointing mode to "exactly once". This mode means that the system will
+	 * checkpoint the operator and user function state in such a way that, upon recovery,
+	 * every record will be reflected exactly once in the operator state.
+	 * 
+	 * <p>For example, if a user function counts the number of elements in a stream, 
+	 * this number will consistently be equal to the number of actual elements in the stream,
+	 * regardless of failures and recovery.</p>
+	 * 
+	 * <p>Note that this does not mean that each record flows through the streaming data flow
+	 * only once. It means that upon recovery, the state of operators/functions is restored such
+	 * that the resumed data streams pick up exactly at after the last modification to the state.</p> 
+	 *  
+	 * <p>Note that this mode does not guarantee exactly-once behavior in the interaction with
+	 * external systems (only state in Flink's operators and user functions). The reason for that
+	 * is that a certain level of "collaboration" is required between two systems to achieve
+	 * exactly-once guarantees. However, for certain systems, connectors can be written that facilitate
+	 * this collaboration.</p>
+	 * 
+	 * <p>This mode sustains high throughput. Depending on the data flow graph and operations,
+	 * this mode may increase the record latency, because operators need to align their input
+	 * streams, in order to create a consistent snapshot point. The latency increase for simple
+	 * dataflows (no repartitioning) is negligible. For simple dataflows with repartitioning, the average
+	 * latency remains small, but the slowest records typically have an increased latency.</p>
+	 */
+	EXACTLY_ONCE,
+
+	/**
+	 * Sets the checkpointing mode to "at least once". This mode means that the system will
+	 * checkpoint the operator and user function state in a simpler way. Upon failure and recovery,
+	 * some records may be reflected multiple times in the operator state.
+	 * 
+	 * <p>For example, if a user function counts the number of elements in a stream, 
+	 * this number will equal to, or larger, than the actual number of elements in the stream,
+	 * in the presence of failure and recovery.</p>
+	 * 
+	 * <p>This mode has minimal impact on latency and may be preferable in very-low latency
+	 * scenarios, where a sustained very-low latency (such as few milliseconds) is needed,
+	 * and where occasional duplicate messages (on recovery) do not matter.</p>
+	 */
+	AT_LEAST_ONCE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
new file mode 100644
index 0000000..125ca65
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+/**
+ * The time characteristic defines how the system determines time for time-dependent
+ * order and operations that depend on time (such as time windows).
+ */
+public enum TimeCharacteristic {
+
+	/**
+	 * Processing time for operators means that the operator uses the system clock of the machine
+	 * to determine the current time of the data stream. Processing-time windows trigger based
+	 * on wall-clock time and include whatever elements happen to have arrived at the operator at
+	 * that point in time.
+	 * <p>
+	 * Using processing time for window operations results in general in quite non-deterministic results,
+	 * because the contents of the windows depends on the speed in which elements arrive. It is, however,
+	 * the cheapest method of forming windows and the method that introduces the least latency.
+	 */
+	ProcessingTime,
+
+	/**
+	 * Ingestion time means that the time of each individual element in the stream is determined
+	 * when the element enters the Flink streaming data flow. Operations like windows group the
+	 * elements based on that time, meaning that processing speed within the streaming dataflow
+	 * does not affect windowing, but only the speed at which sources receive elements.
+	 * <p>
+	 * Ingestion time is often a good compromise between processing time and event time.
+	 * It does not need and special manual form of watermark generation, and events are typically
+	 * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can 
+	 * only be introduced by streaming shuffles or split/join/union operations. The fact that elements
+	 * are not very much out-of-order means that the latency increase is moderate, compared to event
+	 * time.
+	 */
+	IngestionTime,
+
+	/**
+	 * Event time means that the time of each individual element in the stream (also called event)
+	 * is determined by the event's individual custom timestamp. These timestamps either exist in the
+	 * elements from before they entered the Flink streaming dataflow, or are user-assigned at the sources.
+	 * The big implication of this is that elements arrive in the sources and in all operators generally
+	 * out of order, meaning that elements with earlier timestamps may arrive after elements with
+	 * later timestamps.
+	 * <p>
+	 * Operators that window or order data with respect to event time must buffer data until they can
+	 * be sure that all timestamps for a certain time interval have been received. This is handled by
+	 * the so called "time watermarks".
+	 * <p>
+	 * Operations based on event time are very predictable - the result of windowing operations
+	 * is typically identical no matter when the window is executed and how fast the streams operate.
+	 * At the same time, the buffering and tracking of event time is also costlier than operating
+	 * with processing time, and typically also introduces more latency. The amount of extra
+	 * cost depends mostly on how much out of order the elements arrive, i.e., how long the time span
+	 * between the arrival of early and late elements is. With respect to the "time watermarks", this
+	 * means that the cost typically depends on how early or late the watermarks can be generated
+	 * for their timestamp.
+	 * <p>
+	 * In relation to {@link #IngestionTime}, the event time is similar, but refers the the event's
+	 * original time, rather than the time assigned at the data source. Practically, that means that
+	 * event time has generally more meaning, but also that it takes longer to determine that all
+	 * elements for a certain time have arrived.
+	 */
+	EventTime
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java
new file mode 100644
index 0000000..c2d2182
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.checkpoint;
+
+/**
+ * This interface must be implemented by functions/operations that want to receive
+ * a commit notification once a checkpoint has been completely acknowledged by all
+ * participants.
+ */
+public interface CheckpointNotifier {
+
+	/**
+	 * This method is called as a notification once a distributed checkpoint has been completed.
+	 * 
+	 * Note that any exception during this method will not cause the checkpoint to
+	 * fail any more.
+	 * 
+	 * @param checkpointId The ID of the checkpoint that has been completed.
+	 * @throws Exception
+	 */
+	void notifyCheckpointComplete(long checkpointId) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
new file mode 100644
index 0000000..ac1cbfb
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.checkpoint;
+
+import java.io.Serializable;
+
+/**
+ * This method must be implemented by functions that have state that needs to be
+ * checkpointed. The functions get a call whenever a checkpoint should take place
+ * and return a snapshot of their state, which will be checkpointed.
+ * 
+ * <p>This interface marks a function as <i>synchronously</i> checkpointed. While the
+ * state is written, the function is not called, so the function needs not return a
+ * copy of its state, but may return a reference to its state. Functions that can
+ * continue to work and mutate the state, even while the state snapshot is being accessed,
+ * can implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously}
+ * interface.</p>
+ * 
+ * @param <T> The type of the operator state.
+ */
+public interface Checkpointed<T extends Serializable> {
+
+	/**
+	 * Gets the current state of the function of operator. The state must reflect the result of all
+	 * prior invocations to this function. 
+	 * 
+	 * @param checkpointId The ID of the checkpoint.
+	 * @param checkpointTimestamp The timestamp of the checkpoint, as derived by
+	 *                            System.currentTimeMillis() on the JobManager.
+	 *                            
+	 * @return A snapshot of the operator state.
+	 * 
+	 * @throws Exception Thrown if the creation of the state object failed. This causes the
+	 *                   checkpoint to fail. The system may decide to fail the operation (and trigger
+	 *                   recovery), or to discard this checkpoint attempt and to continue running
+	 *                   and to try again with the next checkpoint attempt.
+	 */
+	T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
+
+	/**
+	 * Restores the state of the function or operator to that of a previous checkpoint.
+	 * This method is invoked when a function is executed as part of a recovery run.
+	 *
+	 * Note that restoreState() is called before open().
+	 *
+	 * @param state The state to be restored. 
+	 */
+	void restoreState(T state) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
new file mode 100644
index 0000000..4bd89c4
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.checkpoint;
+
+import java.io.Serializable;
+
+/**
+ * This interface marks a function/operator as <i>asynchronously checkpointed</i>.
+ * Similar to the {@link Checkpointed} interface, the function must produce a
+ * snapshot of its state. However, the function must be able to continue working
+ * and mutating its state without mutating the returned state snapshot.
+ * 
+ * <p>Asynchronous checkpoints are desirable, because they allow the data streams at the
+ * point of the checkpointed function/operator to continue running while the checkpoint
+ * is in progress.</p>
+ * 
+ * <p>To be able to support asynchronous snapshots, the state returned by the
+ * {@link #snapshotState(long, long)} method is typically a copy or shadow copy
+ * of the actual state.</p>
+ */
+public interface CheckpointedAsynchronously<T extends Serializable> extends Checkpointed<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
new file mode 100644
index 0000000..7034b11
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.util.ArrayList;
+
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+
+public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
+
+	private static final long serialVersionUID = 1L;
+	
+	private final ArrayList<Collector<StreamRecord<OUT>>> outputs;
+
+	public BroadcastOutputSelectorWrapper() {
+		outputs = new ArrayList<Collector<StreamRecord<OUT>>>();
+	}
+	
+	@Override
+	public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) {
+		outputs.add(output);
+	}
+
+	@Override
+	public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record) {
+		return outputs;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
new file mode 100644
index 0000000..84558fc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(DirectedOutputSelectorWrapper.class);
+
+	private List<OutputSelector<OUT>> outputSelectors;
+
+	private HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>> outputMap;
+	private HashSet<Collector<StreamRecord<OUT>>> selectAllOutputs;
+
+	public DirectedOutputSelectorWrapper(List<OutputSelector<OUT>> outputSelectors) {
+		this.outputSelectors = outputSelectors;
+		this.selectAllOutputs = new HashSet<Collector<StreamRecord<OUT>>>();
+		this.outputMap = new HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>>();
+	}
+	
+	@Override
+	public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) {
+		List<String> selectedNames = edge.getSelectedNames();
+
+		if (selectedNames.isEmpty()) {
+			selectAllOutputs.add(output);
+		}
+		else {
+			for (String selectedName : selectedNames) {
+				if (!outputMap.containsKey(selectedName)) {
+					outputMap.put(selectedName, new ArrayList<Collector<StreamRecord<OUT>>>());
+					outputMap.get(selectedName).add(output);
+				}
+				else {
+					if (!outputMap.get(selectedName).contains(output)) {
+						outputMap.get(selectedName).add(output);
+					}
+				}
+			}
+		}
+	}
+
+	@Override
+	public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record) {
+		Set<Collector<StreamRecord<OUT>>> selectedOutputs = new HashSet<Collector<StreamRecord<OUT>>>(selectAllOutputs);
+
+		for (OutputSelector<OUT> outputSelector : outputSelectors) {
+			Iterable<String> outputNames = outputSelector.select(record);
+
+			for (String outputName : outputNames) {
+				List<Collector<StreamRecord<OUT>>> outputList = outputMap.get(outputName);
+
+				try {
+					selectedOutputs.addAll(outputList);
+				} catch (NullPointerException e) {
+					if (LOG.isErrorEnabled()) {
+						String format = String.format(
+								"Cannot emit because no output is selected with the name: %s",
+								outputName);
+						LOG.error(format);
+					}
+				}
+			}
+		}
+
+		return selectedOutputs;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
new file mode 100644
index 0000000..9c6eede
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+
+/**
+ * Interface for defining an OutputSelector for a {@link SplitStream} using
+ * the {@link SingleOutputStreamOperator#split} call. Every output object of a
+ * {@link SplitStream} will run through this operator to select outputs.
+ * 
+ * @param <OUT>
+ *            Type parameter of the split values.
+ */
+public interface OutputSelector<OUT> extends Serializable {
+	/**
+	 * Method for selecting output names for the emitted objects when using the
+	 * {@link SingleOutputStreamOperator#split} method. The values will be
+	 * emitted only to output names which are contained in the returned
+	 * iterable.
+	 * 
+	 * @param value
+	 *            Output object for which the output selection should be made.
+	 */
+	public Iterable<String> select(OUT value);
+}