You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:22 UTC

[06/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
new file mode 100644
index 0000000..3355f1c
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.join;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example illustrating join over sliding windows of streams in Flink.
+ *
+ * <p>
+ * This example will join two streams with a sliding window. One which emits grades and one which
+ * emits salaries of people. The input format for both sources has an additional timestamp
+ * as field 0. This is used to to event-time windowing. Time timestamps must be
+ * monotonically increasing.
+ *
+ * This example shows how to:
+ * <ul>
+ *   <li>do windowed joins,
+ *   <li>use tuple data types,
+ *   <li>write a simple streaming program.
+ * </ul>
+ */
+public class WindowJoin {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// obtain execution environment
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		// connect to the data sources for grades and salaries
+		Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> input = getInputStreams(env);
+		DataStream<Tuple3<Long, String, Integer>> grades = input.f0;
+		DataStream<Tuple3<Long, String, Integer>> salaries = input.f1;
+
+		// extract the timestamps
+		grades = grades.assignTimestamps(new MyTimestampExtractor());
+		salaries = salaries.assignTimestamps(new MyTimestampExtractor());
+
+		// apply a temporal join over the two stream based on the names over one
+		// second windows
+		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades
+				.join(salaries)
+				.where(new NameKeySelector())
+				.equalTo(new NameKeySelector())
+				.window(TumblingTimeWindows.of(Time.of(5, TimeUnit.MILLISECONDS)))
+				.apply(new MyJoinFunction());
+
+		// emit result
+		if (fileOutput) {
+			joinedStream.writeAsText(outputPath, 1);
+		} else {
+			joinedStream.print();
+		}
+
+		// execute program
+		env.execute("Windowed Join Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private final static String[] names = {"tom", "jerry", "alice", "bob", "john", "grace"};
+	private final static int GRADE_COUNT = 5;
+	private final static int SALARY_MAX = 10000;
+	private final static int SLEEP_TIME = 10;
+
+	/**
+	 * Continuously emit tuples with random names and integers (grades).
+	 */
+	public static class GradeSource implements SourceFunction<Tuple3<Long, String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Random rand;
+		private Tuple3<Long, String, Integer> outTuple;
+		private volatile boolean isRunning = true;
+		private int counter;
+
+		public GradeSource() {
+			rand = new Random();
+			outTuple = new Tuple3<>();
+		}
+
+		@Override
+		public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
+			while (isRunning && counter < 100) {
+				outTuple.f0 = System.currentTimeMillis();
+				outTuple.f1 = names[rand.nextInt(names.length)];
+				outTuple.f2 = rand.nextInt(GRADE_COUNT) + 1;
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+				counter++;
+				ctx.collect(outTuple);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+	}
+
+	/**
+	 * Continuously emit tuples with random names and integers (salaries).
+	 */
+	public static class SalarySource extends RichSourceFunction<Tuple3<Long, String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private transient Random rand;
+		private transient Tuple3<Long, String, Integer> outTuple;
+		private volatile boolean isRunning;
+		private int counter;
+
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			rand = new Random();
+			outTuple = new Tuple3<Long, String, Integer>();
+			isRunning = true;
+		}
+
+
+		@Override
+		public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
+			while (isRunning && counter < 100) {
+				outTuple.f0 = System.currentTimeMillis();
+				outTuple.f1 = names[rand.nextInt(names.length)];
+				outTuple.f2 = rand.nextInt(SALARY_MAX) + 1;
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+				counter++;
+				ctx.collect(outTuple);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+	}
+
+	public static class MySourceMap extends RichMapFunction<String, Tuple3<Long, String, Integer>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private String[] record;
+
+		public MySourceMap() {
+			record = new String[2];
+		}
+
+		@Override
+		public Tuple3<Long, String, Integer> map(String line) throws Exception {
+			record = line.substring(1, line.length() - 1).split(",");
+			return new Tuple3<>(Long.parseLong(record[0]), record[1], Integer.parseInt(record[2]));
+		}
+	}
+
+	public static class MyJoinFunction
+			implements
+			JoinFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple3<String, Integer, Integer>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private Tuple3<String, Integer, Integer> joined = new Tuple3<>();
+
+		@Override
+		public Tuple3<String, Integer, Integer> join(Tuple3<Long, String, Integer> first,
+				Tuple3<Long, String, Integer> second) throws Exception {
+			joined.f0 = first.f1;
+			joined.f1 = first.f2;
+			joined.f2 = second.f2;
+			return joined;
+		}
+	}
+
+	private static class MyTimestampExtractor implements TimestampExtractor<Tuple3<Long, String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long extractTimestamp(Tuple3<Long, String, Integer> element, long currentTimestamp) {
+			return element.f0;
+		}
+
+		@Override
+		public long extractWatermark(Tuple3<Long, String, Integer> element, long currentTimestamp) {
+			return element.f0 - 1;
+		}
+
+		@Override
+		public long getCurrentWatermark() {
+			return Long.MIN_VALUE;
+		}
+	}
+
+	private static class NameKeySelector implements KeySelector<Tuple3<Long, String, Integer>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Tuple3<Long, String, Integer> value) throws Exception {
+			return value.f1;
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileInput = false;
+	private static boolean fileOutput = false;
+
+	private static String gradesPath;
+	private static String salariesPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			if (args.length == 1) {
+				fileOutput = true;
+				outputPath = args[0];
+			} else if (args.length == 3) {
+				fileInput = true;
+				fileOutput = true;
+				gradesPath = args[0];
+				salariesPath = args[1];
+				outputPath = args[2];
+			} else {
+				System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> <input path 2> " +
+						"<result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WindowJoin with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: WindowJoin <result path>");
+		}
+		return true;
+	}
+
+	private static Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> getInputStreams(
+			StreamExecutionEnvironment env) {
+
+		DataStream<Tuple3<Long, String, Integer>> grades;
+		DataStream<Tuple3<Long, String, Integer>> salaries;
+
+		if (fileInput) {
+			grades = env.readTextFile(gradesPath).map(new MySourceMap());
+			salaries = env.readTextFile(salariesPath).map(new MySourceMap());
+		} else {
+			grades = env.addSource(new GradeSource());
+			salaries = env.addSource(new SalarySource());
+		}
+
+		return Tuple2.of(grades, salaries);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
new file mode 100644
index 0000000..15c1280
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.join.util;
+
+public class WindowJoinData {
+
+	public static final String GRADES_INPUT = "(0,john,5)\n" + "(0,tom,3)\n" + "(0,alice,1)\n" + "(0,grace,5)\n" +
+			"(1,john,4)\n" + "(1,bob,1)\n" + "(1,alice,2)\n" + "(1,alice,3)\n" + "(1,bob,5)\n" + "(1,alice,3)\n" + "(1,tom,5)\n" +
+			"(2,john,2)\n" + "(2,john,1)\n" + "(2,grace,2)\n" + "(2,jerry,2)\n" + "(2,tom,4)\n" + "(2,bob,4)\n" + "(2,bob,2)\n" +
+			"(3, tom,2)\n" + "(3,alice,5)\n" + "(3,grace,5)\n" + "(3,grace,1)\n" + "(3,alice,1)\n" + "(3,grace,3)\n" + "(3,tom,1)\n" +
+			"(4,jerry,5)\n" + "(4,john,3)\n" + "(4,john,4)\n" + "(4,john,1)\n" + "(4,jerry,3)\n" + "(4,grace,3)\n" + "(4,bob,3)\n" +
+			"(5,john,3)\n" + "(5,jerry,4)\n" + "(5,tom,5)\n" + "(5,tom,4)\n" + "(5,john,2)\n" + "(5,jerry,1)\n" + "(5,bob,1)\n" +
+			"(6,john,5)\n" + "(6,grace,4)\n" + "(6,tom,5)\n" + "(6,john,4)\n" + "(6,tom,1)\n" + "(6,grace,1)\n" + "(6,john,2)\n" +
+			"(7,jerry,3)\n" + "(7,jerry,5)\n" + "(7,tom,2)\n" + "(7,tom,2)\n" + "(7,alice,4)\n" + "(7,tom,4)\n" + "(7,jerry,4)\n" +
+			"(8,john,3)\n" + "(8,grace,4)\n" + "(8,tom,3)\n" + "(8,jerry,4)\n" + "(8,john,5)\n" + "(8,john,4)\n" + "(8,jerry,1)\n" +
+			"(9,john,5)\n" + "(9,alice,2)\n" + "(9,tom,1)\n" + "(9,alice,5)\n" + "(9,grace,4)\n" + "(9,bob,4)\n" + "(9,jerry,1)\n" +
+			"(10,john,5)\n" + "(10,tom,4)\n" + "(10,tom,5)\n" + "(10,jerry,5)\n" + "(10,tom,1)\n" + "(10,grace,3)\n" + "(10,bob,5)\n" +
+			"(11,john,1)\n" + "(11,alice,1)\n" + "(11,grace,3)\n" + "(11,grace,1)\n" + "(11,jerry,1)\n" + "(11,jerry,4)\n" +
+			"(12,bob,4)\n" + "(12,alice,3)\n" + "(12,tom,5)\n" + "(12,alice,4)\n" + "(12,alice,4)\n" + "(12,grace,4)\n" + "(12,john,5)\n" +
+			"(13,john,5)\n" + "(13,grace,4)\n" + "(13,tom,4)\n" + "(13,john,4)\n" + "(13,john,5)\n" + "(13,alice,5)\n" + "(13,jerry,5)\n" +
+			"(14,john,3)\n" + "(14,tom,5)\n" + "(14,jerry,4)\n" + "(14,grace,4)\n" + "(14,john,3)\n" + "(14,bob,2)";
+
+	public static final String SALARIES_INPUT = "(0,john,6469)\n" + "(0,jerry,6760)\n" + "(0,jerry,8069)\n" +
+			"(1,tom,3662)\n" + "(1,grace,8427)\n" + "(1,john,9425)\n" + "(1,bob,9018)\n" + "(1,john,352)\n" + "(1,tom,3770)\n" +
+			"(2,grace,7622)\n" + "(2,jerry,7441)\n" + "(2,alice,1468)\n" + "(2,bob,5472)\n" + "(2,grace,898)\n" +
+			"(3,tom,3849)\n" + "(3,grace,1865)\n" + "(3,alice,5582)\n" + "(3,john,9511)\n" + "(3,alice,1541)\n" +
+			"(4,john,2477)\n" + "(4,grace,3561)\n" + "(4,john,1670)\n" + "(4,grace,7290)\n" + "(4,grace,6565)\n" +
+			"(5,tom,6179)\n" + "(5,tom,1601)\n" + "(5,john,2940)\n" + "(5,bob,4685)\n" + "(5,bob,710)\n" + "(5,bob,5936)\n" +
+			"(6,jerry,1412)\n" + "(6,grace,6515)\n" + "(6,grace,3321)\n" + "(6,tom,8088)\n" + "(6,john,2876)\n" +
+			"(7,bob,9896)\n" + "(7,grace,7368)\n" + "(7,grace,9749)\n" + "(7,bob,2048)\n" + "(7,alice,4782)\n" +
+			"(8,alice,3375)\n" + "(8,tom,5841)\n" + "(8,bob,958)\n" + "(8,bob,5258)\n" + "(8,tom,3935)\n" + "(8,jerry,4394)\n" +
+			"(9,alice,102)\n" + "(9,alice,4931)\n" + "(9,alice,5240)\n" + "(9,jerry,7951)\n" + "(9,john,5675)\n" +
+			"(10,bob,609)\n" + "(10,alice,5997)\n" + "(10,jerry,9651)\n" + "(10,alice,1328)\n" + "(10,bob,1022)\n" +
+			"(11,grace,2578)\n" + "(11,jerry,9704)\n" + "(11,tom,4476)\n" + "(11,grace,3784)\n" + "(11,alice,6144)\n" +
+			"(12,bob,6213)\n" + "(12,alice,7525)\n" + "(12,jerry,2908)\n" + "(12,grace,8464)\n" + "(12,jerry,9920)\n" +
+			"(13,bob,3720)\n" + "(13,bob,7612)\n" + "(13,alice,7211)\n" + "(13,jerry,6484)\n" + "(13,alice,1711)\n" +
+			"(14,jerry,5994)\n" + "(14,grace,928)\n" + "(14,jerry,2492)\n" + "(14,grace,9080)\n" + "(14,tom,4330)\n" +
+			"(15,bob,8302)\n" + "(15,john,4981)\n" + "(15,tom,1781)\n" + "(15,grace,1379)\n" + "(15,jerry,3700)\n" +
+			"(16,jerry,3584)\n" + "(16,jerry,2038)\n" + "(16,jerry,3902)\n" + "(16,tom,1336)\n" + "(16,jerry,7500)\n" +
+			"(17,tom,3648)\n" + "(17,alice,2533)\n" + "(17,tom,8685)\n" + "(17,bob,3968)\n" + "(17,tom,3241)\n" + "(17,bob,7461)\n" +
+			"(18,jerry,2138)\n" + "(18,alice,7503)\n" + "(18,alice,6424)\n" + "(18,tom,140)\n" + "(18,john,9802)\n" +
+			"(19,grace,2977)\n" + "(19,grace,889)\n" + "(19,john,1338)";
+
+	private WindowJoinData() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
new file mode 100644
index 0000000..ce227e4
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.ml;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Skeleton for incremental machine learning algorithm consisting of a
+ * pre-computed model, which gets updated for the new inputs and new input data
+ * for which the job provides predictions.
+ *
+ * <p>
+ * This may serve as a base of a number of algorithms, e.g. updating an
+ * incremental Alternating Least Squares model while also providing the
+ * predictions.
+ *
+ * <p/>
+ * This example shows how to use:
+ * <ul>
+ *   <li>Connected streams
+ *   <li>CoFunctions
+ *   <li>Tuple data types
+ * </ul>
+ */
+public class IncrementalLearningSkeleton {
+
+	private static DataStream<Integer> trainingData = null;
+	private static DataStream<Integer> newData = null;
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		trainingData = env.addSource(new FiniteTrainingDataSource());
+		newData = env.addSource(new FiniteNewDataSource());
+
+		// build new model on every second of new data
+		DataStream<Double[]> model = trainingData
+				.assignTimestamps(new LinearTimestamp())
+				.timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+				.apply(new PartialModelBuilder());
+
+		// use partial model for newData
+		DataStream<Integer> prediction = newData.connect(model).map(new Predictor());
+
+		// emit result
+		if (fileOutput) {
+			prediction.writeAsText(outputPath, 1);
+		} else {
+			prediction.print();
+		}
+
+		// execute program
+		env.execute("Streaming Incremental Learning");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Feeds new data for newData. By default it is implemented as constantly
+	 * emitting the Integer 1 in a loop.
+	 */
+	public static class FiniteNewDataSource implements SourceFunction<Integer> {
+		private static final long serialVersionUID = 1L;
+		private int counter;
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			Thread.sleep(15);
+			while (counter < 50) {
+				ctx.collect(getNewData());
+			}
+		}
+
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
+
+		private Integer getNewData() throws InterruptedException {
+			Thread.sleep(5);
+			counter++;
+			return 1;
+		}
+	}
+
+	/**
+	 * Feeds new training data for the partial model builder. By default it is
+	 * implemented as constantly emitting the Integer 1 in a loop.
+	 */
+	public static class FiniteTrainingDataSource implements SourceFunction<Integer> {
+		private static final long serialVersionUID = 1L;
+		private int counter = 0;
+
+		@Override
+		public void run(SourceContext<Integer> collector) throws Exception {
+			while (counter < 8200) {
+				collector.collect(getTrainingData());
+			}
+		}
+
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
+
+		private Integer getTrainingData() throws InterruptedException {
+			counter++;
+			return 1;
+		}
+	}
+
+	public static class LinearTimestamp implements TimestampExtractor<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		private long counter = 0L;
+
+		@Override
+		public long extractTimestamp(Integer element, long currentTimestamp) {
+			return counter += 10L;
+		}
+
+		@Override
+		public long extractWatermark(Integer element, long currentTimestamp) {
+			return counter - 1;
+		}
+
+		@Override
+		public long getCurrentWatermark() {
+			return Long.MIN_VALUE;
+		}
+
+	}
+
+	/**
+	 * Builds up-to-date partial models on new training data.
+	 */
+	public static class PartialModelBuilder implements AllWindowFunction<Integer, Double[], TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		protected Double[] buildPartialModel(Iterable<Integer> values) {
+			return new Double[]{1.};
+		}
+
+		@Override
+		public void apply(TimeWindow window, Iterable<Integer> values, Collector<Double[]> out) throws Exception {
+			out.collect(buildPartialModel(values));
+		}
+	}
+
+	/**
+	 * Creates newData using the model produced in batch-processing and the
+	 * up-to-date partial model.
+	 * <p/>
+	 * <p>
+	 * By defaults emits the Integer 0 for every newData and the Integer 1
+	 * for every model update.
+	 * </p>
+	 */
+	public static class Predictor implements CoMapFunction<Integer, Double[], Integer> {
+		private static final long serialVersionUID = 1L;
+
+		Double[] batchModel = null;
+		Double[] partialModel = null;
+
+		@Override
+		public Integer map1(Integer value) {
+			// Return newData
+			return predict(value);
+		}
+
+		@Override
+		public Integer map2(Double[] value) {
+			// Update model
+			partialModel = value;
+			batchModel = getBatchModel();
+			return 1;
+		}
+
+		// pulls model built with batch-job on the old training data
+		protected Double[] getBatchModel() {
+			return new Double[]{0.};
+		}
+
+		// performs newData using the two models
+		protected Integer predict(Integer inTuple) {
+			return 0;
+		}
+
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: IncrementalLearningSkeleton <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing IncrementalLearningSkeleton with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: IncrementalLearningSkeleton <result path>");
+		}
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
new file mode 100644
index 0000000..8a6cd88
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.ml.util;
+
+public class IncrementalLearningSkeletonData {
+
+	public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
+			"1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n";
+
+	private IncrementalLearningSkeletonData() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
new file mode 100644
index 0000000..17add2c
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.socket;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
+
+/**
+ * This example shows an implementation of WordCount with data from a text
+ * socket. To run the example make sure that the service providing the text data
+ * is already up and running.
+ * <p/>
+ * <p/>
+ * To start an example socket text stream on your local machine run netcat from
+ * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
+ * port number.
+ * <p/>
+ * <p/>
+ * <p/>
+ * Usage:
+ * <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result path&gt;</code>
+ * <br>
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use StreamExecutionEnvironment.socketTextStream
+ * <li>write a simple Flink program,
+ * <li>write and use user-defined functions.
+ * </ul>
+ *
+ * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
+ */
+public class SocketTextStreamWordCount {
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> text = env.socketTextStream(hostName, port, '\n', 0);
+
+		DataStream<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+						// group by the tuple field "0" and sum up tuple field "1"
+						.keyBy(0)
+						.sum(1);
+
+		if (fileOutput) {
+			counts.writeAsText(outputPath, 1);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("WordCount from SocketTextStream Example");
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String hostName;
+	private static int port;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		// parse input arguments
+		if (args.length == 3) {
+			fileOutput = true;
+			hostName = args[0];
+			port = Integer.valueOf(args[1]);
+			outputPath = args[2];
+		} else if (args.length == 2) {
+			hostName = args[0];
+			port = Integer.valueOf(args[1]);
+		} else {
+			System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]");
+			return false;
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
new file mode 100644
index 0000000..c2477b5
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.twitter;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.streaming.connectors.twitter.TwitterSource;
+import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
+import org.apache.flink.util.Collector;
+import org.apache.sling.commons.json.JSONException;
+
+import java.util.StringTokenizer;
+
+/**
+ * Implements the "TwitterStream" program that computes a most used word
+ * occurrence over JSON files in a streaming fashion.
+ * <p/>
+ * <p/>
+ * The input is a JSON text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>TwitterStream &lt;text path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link TwitterStreamData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>acquire external data,
+ * <li>use in-line defined functions,
+ * <li>handle flattened stream inputs.
+ * </ul>
+ */
+public class TwitterStream {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> streamSource = getTextDataStream(env);
+
+		DataStream<Tuple2<String, Integer>> tweets = streamSource
+				// selecting English tweets and splitting to (word, 1)
+				.flatMap(new SelectEnglishAndTokenizeFlatMap())
+				// group by words and sum their occurrences
+				.keyBy(0).sum(1);
+
+		// emit result
+		if (fileOutput) {
+			tweets.writeAsText(outputPath);
+		} else {
+			tweets.print();
+		}
+
+		// execute program
+		env.execute("Twitter Streaming Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Makes sentences from English tweets.
+	 * <p/>
+	 * <p>
+	 * Implements a string tokenizer that splits sentences into words as a
+	 * user-defined FlatMapFunction. The function takes a line (String) and
+	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+	 * Integer>).
+	 * </p>
+	 */
+	public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * Select the language from the incoming JSON text
+		 */
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
+			try {
+				if (getString(value, "user.lang").equals("en")) {
+					// message of tweet
+					StringTokenizer tokenizer = new StringTokenizer(getString(value, "text"));
+
+					// split the message
+					while (tokenizer.hasMoreTokens()) {
+						String result = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase();
+
+						if (result != null && !result.equals("")) {
+							out.collect(new Tuple2<String, Integer>(result, 1));
+						}
+					}
+				}
+			} catch (JSONException e) {
+				// the JSON was not parsed correctly
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileInput = false;
+	private static boolean fileOutput = false;
+	private static String propertiesPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				fileInput = true;
+				propertiesPath = args[0];
+				outputPath = args[1];
+			} else if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("USAGE:\nTwitterStream [<pathToPropertiesFile>] <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing TwitterStream example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  USAGE: TwitterStream [<pathToPropertiesFile>] <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileInput) {
+			// read the text file from given input path
+			return env.addSource(new TwitterSource(propertiesPath));
+		} else {
+			// get default test text data
+			return env.fromElements(TwitterStreamData.TEXTS);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
new file mode 100644
index 0000000..b06d193
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.twitter.util;
+
+//example data looking like tweets, but not acquired from Twitter
+public class TwitterStreamData {
+	public static final String[] TEXTS = new String[] {
+			"{\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"id\":000000000000000000,\"id_str\":\"000000000000000000\",\"text\":\"Apache Flink\",\"source\":null,\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":0000000000,\"id_str\":\"0000000000\",\"name\":\"Apache Flink\",\"screen_name\":\"Apache Flink\",\"location\":\"Berlin\",\"protected\":false,\"verified\":false,\"followers_count\":999999,\"friends_count\":99999,\"listed_count\":999,\"favourites_count\":9999,\"statuses_count\":999,\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"utc_offset\":7200,\"time_zone\":\"Amsterdam\",\"geo_enabled\":false,\"lang\":\"en\",\"entities\":{\"hashtags\":[{\"text\":\"example1\",\"indices\":[0,0]},{\"text\":\"tweet1\",\"indices\":[0,0]}]},\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C6E2EE\",\"profile_background_
 tile\":false,\"profile_link_color\":\"1F98C7\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"252429\",\"profile_text_color\":\"666666\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null}",
+			"{\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"id\":000000000000000001,\"id_str\":\"000000000000000000\",\"text\":\"Apache Flink\",\"source\":null,\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":0000000000,\"id_str\":\"0000000000\",\"name\":\"Apache Flink\",\"screen_name\":\"Apache Flink\",\"location\":\"Berlin\",\"protected\":false,\"verified\":false,\"followers_count\":999999,\"friends_count\":99999,\"listed_count\":999,\"favourites_count\":9999,\"statuses_count\":999,\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"utc_offset\":7200,\"time_zone\":\"Amsterdam\",\"geo_enabled\":false,\"lang\":\"en\",\"entities\":{\"hashtags\":[{\"text\":\"example2\",\"indices\":[0,0]},{\"text\":\"tweet2\",\"indices\":[0,0]}]},\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C6E2EE\",\"profile_background_
 tile\":false,\"profile_link_color\":\"1F98C7\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"252429\",\"profile_text_color\":\"666666\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null}",
+			"{\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"id\":000000000000000002,\"id_str\":\"000000000000000000\",\"text\":\"Apache Flink\",\"source\":null,\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":0000000000,\"id_str\":\"0000000000\",\"name\":\"Apache Flink\",\"screen_name\":\"Apache Flink\",\"location\":\"Berlin\",\"protected\":false,\"verified\":false,\"followers_count\":999999,\"friends_count\":99999,\"listed_count\":999,\"favourites_count\":9999,\"statuses_count\":999,\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"utc_offset\":7200,\"time_zone\":\"Amsterdam\",\"geo_enabled\":false,\"lang\":\"en\",\"entities\":{\"hashtags\":[{\"text\":\"example3\",\"indices\":[0,0]},{\"text\":\"tweet3\",\"indices\":[0,0]}]},\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C6E2EE\",\"profile_background_
 tile\":false,\"profile_link_color\":\"1F98C7\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"252429\",\"profile_text_color\":\"666666\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null}",
+	};
+
+	public static final String STREAMING_COUNTS_AS_TUPLES = "(apache,1)\n" + "(apache,2)\n" + "(apache,3)\n" + "(flink,1)\n" + "(flink,2)\n" + "(flink,3)\n";
+
+	private TwitterStreamData() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
new file mode 100644
index 0000000..982b73d
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+@SuppressWarnings("serial")
+public class GroupedProcessingTimeWindowExample {
+	
+	public static void main(String[] args) throws Exception {
+		
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+		
+		DataStream<Tuple2<Long, Long>> stream = env
+				.addSource(new RichParallelSourceFunction<Tuple2<Long, Long>>() {
+					
+					private volatile boolean running = true;
+					
+					@Override
+					public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+						
+						final long startTime = System.currentTimeMillis();
+						
+						final long numElements = 20000000;
+						final long numKeys = 10000;
+						long val = 1L;
+						long count = 0L;
+						
+						
+						while (running && count < numElements) {
+							count++;
+							ctx.collect(new Tuple2<Long, Long>(val++, 1L));
+							
+							if (val > numKeys) {
+								val = 1L;
+							}
+						}
+
+						final long endTime = System.currentTimeMillis();
+						System.out.println("Took " + (endTime-startTime) + " msecs for " + numElements + " values");
+					}
+
+					@Override
+					public void cancel() {
+						running = false;
+					}
+				});
+		
+		stream
+			.keyBy(0)
+			.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
+			.reduce(new SummingReducer())
+
+			// alternative: use a apply function which does not pre-aggregate
+//			.keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
+//			.window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
+//			.apply(new SummingWindowFunction())
+				
+			.addSink(new SinkFunction<Tuple2<Long, Long>>() {
+				@Override
+				public void invoke(Tuple2<Long, Long> value) {
+				}
+			});
+		
+		env.execute();
+	}
+	
+	public static class FirstFieldKeyExtractor<Type extends Tuple, Key> implements KeySelector<Type, Key> {
+		
+		@Override
+		@SuppressWarnings("unchecked")
+		public Key getKey(Type value) {
+			return (Key) value.getField(0);
+		}
+	}
+
+	public static class SummingWindowFunction implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
+
+		@Override
+		public void apply(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
+			long sum = 0L;
+			for (Tuple2<Long, Long> value : values) {
+				sum += value.f1;
+			}
+
+			out.collect(new Tuple2<>(key, sum));
+		}
+	}
+
+	public static class SummingReducer implements ReduceFunction<Tuple2<Long, Long>> {
+
+		@Override
+		public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2) {
+			return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
new file mode 100644
index 0000000..3c63156
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SessionWindowing {
+
+	@SuppressWarnings("serial")
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(2);
+
+		final List<Tuple3<String, Long, Integer>> input = new ArrayList<>();
+
+		input.add(new Tuple3<>("a", 1L, 1));
+		input.add(new Tuple3<>("b", 1L, 1));
+		input.add(new Tuple3<>("b", 3L, 1));
+		input.add(new Tuple3<>("b", 5L, 1));
+		input.add(new Tuple3<>("c", 6L, 1));
+		// We expect to detect the session "a" earlier than this point (the old
+		// functionality can only detect here when the next starts)
+		input.add(new Tuple3<>("a", 10L, 1));
+		// We expect to detect session "b" and "c" at this point as well
+		input.add(new Tuple3<>("c", 11L, 1));
+
+		DataStream<Tuple3<String, Long, Integer>> source = env
+				.addSource(new EventTimeSourceFunction<Tuple3<String,Long,Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void run(SourceContext<Tuple3<String, Long, Integer>> ctx) throws Exception {
+						for (Tuple3<String, Long, Integer> value : input) {
+							ctx.collectWithTimestamp(value, value.f1);
+							ctx.emitWatermark(new Watermark(value.f1 - 1));
+							if (!fileOutput) {
+								System.out.println("Collected: " + value);
+							}
+						}
+						ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
+					}
+
+					@Override
+					public void cancel() {
+					}
+				});
+
+		// We create sessions for each id with max timeout of 3 time units
+		DataStream<Tuple3<String, Long, Integer>> aggregated = source
+				.keyBy(0)
+				.window(GlobalWindows.create())
+				.trigger(new SessionTrigger(3L))
+				.sum(2);
+
+		if (fileOutput) {
+			aggregated.writeAsText(outputPath);
+		} else {
+			aggregated.print();
+		}
+
+		env.execute();
+	}
+
+	private static class SessionTrigger implements Trigger<Tuple3<String, Long, Integer>, GlobalWindow> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Long sessionTimeout;
+
+		public SessionTrigger(Long sessionTimeout) {
+			this.sessionTimeout = sessionTimeout;
+
+		}
+
+		@Override
+		public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
+
+			OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
+			Long lastSeen = lastSeenState.value();
+
+			Long timeSinceLastEvent = timestamp - lastSeen;
+
+			// Update the last seen event time
+			lastSeenState.update(timestamp);
+
+			ctx.registerEventTimeTimer(lastSeen + sessionTimeout);
+
+			if (timeSinceLastEvent > sessionTimeout) {
+				return TriggerResult.FIRE_AND_PURGE;
+			} else {
+				return TriggerResult.CONTINUE;
+			}
+		}
+
+		@Override
+		public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception {
+			OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
+			Long lastSeen = lastSeenState.value();
+
+			if (time - lastSeen >= sessionTimeout) {
+				return TriggerResult.FIRE_AND_PURGE;
+			}
+			return TriggerResult.CONTINUE;
+		}
+
+		@Override
+		public TriggerResult onProcessingTime(long time,
+				TriggerContext ctx) throws Exception {
+			return TriggerResult.CONTINUE;
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			if (args.length == 1) {
+				fileOutput = true;
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: SessionWindowing <result path>");
+				return false;
+			}
+		}
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
new file mode 100644
index 0000000..df3402e
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An example of grouped stream windowing where different eviction and trigger
+ * policies can be used. A source fetches events from cars every 1 sec
+ * containing their id, their current speed (kmh), overall elapsed distance (m)
+ * and a timestamp. The streaming example triggers the top speed of each car
+ * every x meters elapsed for the last y seconds.
+ */
+public class TopSpeedWindowing {
+
+	private static final int NUM_CAR_EVENTS = 100;
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		@SuppressWarnings({"rawtypes", "serial"})
+		DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
+
+		if (fileInput) {
+			carData = env.readTextFile(inputPath).map(new ParseCarData());
+		} else {
+			carData = env.addSource(CarSource.create(numOfCars));
+		}
+
+		DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
+				.assignTimestamps(new CarTimestamp())
+				.keyBy(0)
+				.window(GlobalWindows.create())
+				.evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS)))
+				.trigger(DeltaTrigger.of(triggerMeters,
+						new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
+							private static final long serialVersionUID = 1L;
+
+							@Override
+							public double getDelta(
+									Tuple4<Integer, Integer, Double, Long> oldDataPoint,
+									Tuple4<Integer, Integer, Double, Long> newDataPoint) {
+								return newDataPoint.f2 - oldDataPoint.f2;
+							}
+						}))
+				.maxBy(1);
+
+		if (fileOutput) {
+			topSpeeds.print();
+			topSpeeds.writeAsText(outputPath);
+		} else {
+			topSpeeds.print();
+		}
+
+		env.execute("CarTopSpeedWindowingExample");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private static class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
+
+		private static final long serialVersionUID = 1L;
+		private Integer[] speeds;
+		private Double[] distances;
+
+		private Random rand = new Random();
+
+		private volatile boolean isRunning = true;
+		private int counter;
+
+		private CarSource(int numOfCars) {
+			speeds = new Integer[numOfCars];
+			distances = new Double[numOfCars];
+			Arrays.fill(speeds, 50);
+			Arrays.fill(distances, 0d);
+		}
+
+		public static CarSource create(int cars) {
+			return new CarSource(cars);
+		}
+
+		@Override
+		public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception {
+
+			while (isRunning && counter < NUM_CAR_EVENTS) {
+				Thread.sleep(100);
+				for (int carId = 0; carId < speeds.length; carId++) {
+					if (rand.nextBoolean()) {
+						speeds[carId] = Math.min(100, speeds[carId] + 5);
+					} else {
+						speeds[carId] = Math.max(0, speeds[carId] - 5);
+					}
+					distances[carId] += speeds[carId] / 3.6d;
+					Tuple4<Integer, Integer, Double, Long> record = new Tuple4<Integer, Integer, Double, Long>(carId,
+							speeds[carId], distances[carId], System.currentTimeMillis());
+					ctx.collect(record);
+					counter++;
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+	}
+
+	private static class ParseCarData extends
+			RichMapFunction<String, Tuple4<Integer, Integer, Double, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple4<Integer, Integer, Double, Long> map(String record) {
+			String rawData = record.substring(1, record.length() - 1);
+			String[] data = rawData.split(",");
+			return new Tuple4<>(Integer.valueOf(data[0]), Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3]));
+		}
+	}
+
+	private static class CarTimestamp implements TimestampExtractor<Tuple4<Integer, Integer, Double, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long extractTimestamp(Tuple4<Integer, Integer, Double, Long> element,
+				long currentTimestamp) {
+			return element.f3;
+		}
+
+		@Override
+		public long extractWatermark(Tuple4<Integer, Integer, Double, Long> element,
+				long currentTimestamp) {
+			return element.f3 - 1;
+		}
+
+		@Override
+		public long getCurrentWatermark() {
+			return Long.MIN_VALUE;
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileInput = false;
+	private static boolean fileOutput = false;
+	private static int numOfCars = 2;
+	private static int evictionSec = 10;
+	private static double triggerMeters = 50;
+	private static String inputPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			if (args.length == 2) {
+				fileInput = true;
+				fileOutput = true;
+				inputPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: TopSpeedWindowingExample <input path> <output path>");
+				return false;
+			}
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
new file mode 100644
index 0000000..f3d57bf
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.examples.wordcount.WordCount;
+
+/**
+ * Implements a windowed version of the streaming "WordCount" program.
+ *
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a simple Flink Streaming program,
+ * <li>use tuple data types,
+ * <li>use basic windowing abstractions.
+ * </ul>
+ *
+ */
+public class WindowWordCount {
+
+	// window parameters with default values
+	private static int windowSize = 250;
+	private static int slideSize = 150;
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> text = getTextDataStream(env);
+
+		DataStream<Tuple2<String, Integer>> counts =
+		// split up the lines in pairs (2-tuples) containing: (word,1)
+		text.flatMap(new WordCount.Tokenizer())
+				// create windows of windowSize records slided every slideSize records
+				.keyBy(0)
+				.countWindow(windowSize, slideSize)
+				// group by the tuple field "0" and sum up tuple field "1"
+				.sum(1);
+
+		// emit result
+		if (fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("WindowWordCount");
+	}
+
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length >= 2 && args.length <= 4) {
+				textPath = args[0];
+				outputPath = args[1];
+				if (args.length >= 3){
+					windowSize = Integer.parseInt(args[2]);
+
+					// if no slide size is specified use the
+					slideSize = args.length == 3 ? windowSize : Integer.parseInt(args[2]);
+				}
+			} else {
+				System.err.println("Usage: WindowWordCount <text path> <result path> [<window size>] [<slide size>]");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WindowWordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WindowWordCount <text path> <result path> [<window size>] [<slide size>]");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return env.fromElements(WordCountData.WORDS);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
new file mode 100644
index 0000000..c1a99a8
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.util;
+
+public class SessionWindowingData {
+
+	public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + "(c,11,1)\n" + "(b,1,3)\n" +
+			"(a,10,1)";
+
+	private SessionWindowingData() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
new file mode 100644
index 0000000..bf63695
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.util;
+
+public class TopSpeedWindowingExampleData {
+
+	public static final String CAR_DATA =
+					"(0,55,15.277777777777777,1424951918630)\n" + "(1,45,12.5,1424951918632)\n" +
+					"(0,50,29.166666666666664,1424951919632)\n" + "(1,50,26.38888888888889,1424951919632)\n" +
+					"(0,55,44.44444444444444,1424951920633)\n" + "(1,45,38.888888888888886,1424951920633)\n" +
+					"(0,50,58.33333333333333,1424951921634)\n" + "(1,40,50.0,1424951921634)\n" +
+					"(0,55,73.6111111111111,1424951922634)\n" + "(1,35,59.72222222222222,1424951922634)\n" +
+					"(0,60,90.27777777777777,1424951923634)\n" + "(1,40,70.83333333333333,1424951923634)\n" +
+					"(0,65,108.33333333333333,1424951924635)\n" + "(1,35,80.55555555555554,1424951924635)\n" +
+					"(0,60,125.0,1424951925635)\n" + "(1,40,91.66666666666666,1424951925635)\n" +
+					"(0,55,140.27777777777777,1424951926635)\n" + "(1,45,104.16666666666666,1424951926636)\n" +
+					"(0,60,156.94444444444443,1424951927636)\n" + "(1,50,118.05555555555554,1424951927636)\n" +
+					"(0,55,172.2222222222222,1424951928636)\n" + "(1,45,130.55555555555554,1424951928636)\n" +
+					"(0,50,186.1111111111111,1424951929636)\n" + "(1,50,144.44444444444443,1424951929637)\n" +
+					"(0,55,201.38888888888886,1424951930637)\n" + "(1,55,159.7222222222222,1424951930637)\n" +
+					"(0,60,218.05555555555551,1424951931637)\n" + "(1,60,176.38888888888886,1424951931637)\n" +
+					"(0,55,233.3333333333333,1424951932637)\n" + "(1,65,194.4444444444444,1424951932638)\n" +
+					"(0,50,247.22222222222217,1424951933638)\n" + "(1,70,213.88888888888886,1424951933638)\n" +
+					"(0,45,259.7222222222222,1424951934638)\n" + "(1,65,231.9444444444444,1424951934638)\n" +
+					"(0,50,273.6111111111111,1424951935638)\n" + "(1,70,251.38888888888886,1424951935639)\n" +
+					"(0,55,288.88888888888886,1424951936639)\n" + "(1,75,272.2222222222222,1424951936639)\n" +
+					"(0,50,302.77777777777777,1424951937639)\n" + "(1,70,291.66666666666663,1424951937639)\n" +
+					"(0,45,315.27777777777777,1424951938640)\n" + "(1,65,309.7222222222222,1424951938640)\n" +
+					"(0,50,329.1666666666667,1424951939640)\n" + "(1,70,329.16666666666663,1424951939640)\n" +
+					"(0,55,344.44444444444446,1424951940640)\n" + "(1,65,347.2222222222222,1424951940640)\n" +
+					"(0,50,358.33333333333337,1424951941641)\n" + "(1,70,366.66666666666663,1424951941641)\n" +
+					"(0,55,373.61111111111114,1424951942641)\n" + "(1,65,384.7222222222222,1424951942641)\n" +
+					"(0,50,387.50000000000006,1424951943641)\n" + "(1,70,404.16666666666663,1424951943641)\n" +
+					"(0,45,400.00000000000006,1424951944642)\n" + "(1,65,422.2222222222222,1424951944642)\n" +
+					"(0,50,413.88888888888897,1424951945642)\n" + "(1,60,438.88888888888886,1424951945642)\n" +
+					"(0,45,426.38888888888897,1424951946642)\n" + "(1,65,456.9444444444444,1424951946642)\n" +
+					"(0,40,437.50000000000006,1424951947643)\n" + "(1,70,476.38888888888886,1424951947643)\n" +
+					"(0,45,450.00000000000006,1424951948643)\n" + "(1,75,497.2222222222222,1424951948643)\n" +
+					"(0,40,461.11111111111114,1424951949643)\n" + "(1,80,519.4444444444443,1424951949644)\n" +
+					"(0,45,473.61111111111114,1424951950644)\n" + "(1,75,540.2777777777777,1424951950644)\n" +
+					"(0,50,487.50000000000006,1424951951644)\n" + "(1,80,562.4999999999999,1424951951644)\n" +
+					"(0,45,500.00000000000006,1424951952644)\n" + "(1,85,586.111111111111,1424951952645)\n" +
+					"(0,40,511.11111111111114,1424951953645)\n" + "(1,80,608.3333333333331,1424951953645)\n" +
+					"(0,35,520.8333333333334,1424951954645)\n" + "(1,75,629.1666666666665,1424951954645)\n" +
+					"(0,40,531.9444444444445,1424951955645)\n" + "(1,70,648.611111111111,1424951955646)\n" +
+					"(0,45,544.4444444444445,1424951956646)\n" + "(1,75,669.4444444444443,1424951956646)\n" +
+					"(0,50,558.3333333333334,1424951957646)\n" + "(1,80,691.6666666666665,1424951957646)\n" +
+					"(0,55,573.6111111111112,1424951958646)\n" + "(1,85,715.2777777777776,1424951958647)\n" +
+					"(0,60,590.2777777777778,1424951959647)\n" + "(1,80,737.4999999999998,1424951959647)\n" +
+					"(0,65,608.3333333333334,1424951960647)\n" + "(1,85,761.1111111111109,1424951960647)\n" +
+					"(0,70,627.7777777777778,1424951961647)\n" + "(1,80,783.333333333333,1424951961648)\n" +
+					"(0,75,648.6111111111112,1424951962648)\n" + "(1,85,806.9444444444441,1424951962648)\n" +
+					"(0,80,670.8333333333334,1424951963648)\n" + "(1,90,831.9444444444441,1424951963648)\n" +
+					"(0,75,691.6666666666667,1424951964649)\n" + "(1,95,858.333333333333,1424951964649)\n" +
+					"(0,70,711.1111111111112,1424951965649)\n" + "(1,90,883.333333333333,1424951965649)\n" +
+					"(0,75,731.9444444444446,1424951966649)\n" + "(1,95,909.722222222222,1424951966649)\n" +
+					"(0,70,751.388888888889,1424951967649)\n" + "(1,100,937.4999999999998,1424951967650)\n" +
+					"(0,75,772.2222222222224,1424951968650)\n" + "(1,100,965.2777777777776,1424951968650)\n" +
+					"(0,80,794.4444444444446,1424951969650)\n" + "(1,100,993.0555555555554,1424951969650)\n" +
+					"(0,75,815.2777777777779,1424951970651)\n" + "(1,100,1020.8333333333333,1424951970651)\n" +
+					"(0,80,837.5000000000001,1424951971651)\n" + "(1,100,1048.611111111111,1424951971651)\n" +
+					"(0,85,861.1111111111112,1424951972651)\n" + "(1,100,1076.388888888889,1424951972651)\n" +
+					"(0,80,883.3333333333334,1424951973652)\n" + "(1,95,1102.7777777777778,1424951973652)\n" +
+					"(0,75,904.1666666666667,1424951974652)\n" + "(1,100,1130.5555555555557,1424951974652)\n" +
+					"(0,70,923.6111111111112,1424951975652)\n" + "(1,100,1158.3333333333335,1424951975652)\n" +
+					"(0,75,944.4444444444446,1424951976653)\n" + "(1,100,1186.1111111111113,1424951976653)\n" +
+					"(0,80,966.6666666666667,1424951977653)\n" + "(1,95,1212.5000000000002,1424951977653)\n" +
+					"(0,75,987.5000000000001,1424951978653)\n" + "(1,100,1240.277777777778,1424951978653)\n" +
+					"(0,80,1009.7222222222223,1424951979654)\n" + "(1,100,1268.0555555555559,1424951979654)\n" +
+					"(0,85,1033.3333333333335,1424951980654)\n" + "(1,100,1295.8333333333337,1424951980654)\n" +
+					"(0,90,1058.3333333333335,1424951981654)\n" + "(1,100,1323.6111111111115,1424951981654)\n" +
+					"(0,85,1081.9444444444446,1424951982655)\n" + "(1,100,1351.3888888888894,1424951982655)\n" +
+					"(0,90,1106.9444444444446,1424951983655)\n" + "(1,100,1379.1666666666672,1424951983655)\n" +
+					"(0,95,1133.3333333333335,1424951984655)\n" + "(1,100,1406.944444444445,1424951984656)\n" +
+					"(0,90,1158.3333333333335,1424951985656)\n" + "(1,95,1433.333333333334,1424951985656)\n" +
+					"(0,95,1184.7222222222224,1424951986656)\n" + "(1,90,1458.333333333334,1424951986656)\n" +
+					"(0,90,1209.7222222222224,1424951987656)\n" + "(1,95,1484.7222222222229,1424951987657)\n" +
+					"(0,85,1233.3333333333335,1424951988657)\n" + "(1,90,1509.7222222222229,1424951988657)\n" +
+					"(0,80,1255.5555555555557,1424951989657)\n" + "(1,95,1536.1111111111118,1424951989657)\n" +
+					"(0,85,1279.1666666666667,1424951990657)\n" + "(1,100,1563.8888888888896,1424951990658)\n" +
+					"(0,90,1304.1666666666667,1424951991658)\n" + "(1,95,1590.2777777777785,1424951991658)\n" +
+					"(0,95,1330.5555555555557,1424951992658)\n" + "(1,90,1615.2777777777785,1424951992658)\n" +
+					"(0,100,1358.3333333333335,1424951993659)\n" + "(1,95,1641.6666666666674,1424951993659)\n" +
+					"(0,100,1386.1111111111113,1424951994659)\n" + "(1,100,1669.4444444444453,1424951994659)\n" +
+					"(0,95,1412.5000000000002,1424951995659)\n" + "(1,95,1695.8333333333342,1424951995660)\n" +
+					"(0,100,1440.277777777778,1424951996660)\n" + "(1,90,1720.8333333333342,1424951996660)\n" +
+					"(0,100,1468.0555555555559,1424951997660)\n" + "(1,85,1744.4444444444453,1424951997660)\n" +
+					"(0,95,1494.4444444444448,1424951998660)\n" + "(1,80,1766.6666666666674,1424951998661)\n" +
+					"(0,100,1522.2222222222226,1424951999661)\n" + "(1,75,1787.5000000000007,1424951999661)\n" +
+					"(0,95,1548.6111111111115,1424952000661)\n" + "(1,80,1809.7222222222229,1424952000661)\n" +
+					"(0,90,1573.6111111111115,1424952001662)\n" + "(1,75,1830.555555555556,1424952001662)\n" +
+					"(0,95,1600.0000000000005,1424952002662)\n" + "(1,80,1852.7777777777783,1424952002662)\n" +
+					"(0,100,1627.7777777777783,1424952003662)\n" + "(1,85,1876.3888888888894,1424952003662)\n" +
+					"(0,100,1655.555555555556,1424952004663)\n" + "(1,80,1898.6111111111115,1424952004663)\n" +
+					"(0,95,1681.944444444445,1424952005663)\n" + "(1,85,1922.2222222222226,1424952005663)\n" +
+					"(0,100,1709.7222222222229,1424952006663)\n" + "(1,90,1947.2222222222226,1424952006664)\n" +
+					"(0,100,1737.5000000000007,1424952007664)\n" + "(1,95,1973.6111111111115,1424952007664)\n" +
+					"(0,95,1763.8888888888896,1424952008664)\n" + "(1,90,1998.6111111111115,1424952008664)\n" +
+					"(0,100,1791.6666666666674,1424952009664)\n" + "(1,85,2022.2222222222226,1424952009665)\n" +
+					"(0,95,1818.0555555555563,1424952010665)\n" + "(1,80,2044.4444444444448,1424952010665)\n" +
+					"(0,90,1843.0555555555563,1424952011665)\n" + "(1,75,2065.2777777777783,1424952011665)\n" +
+					"(0,95,1869.4444444444453,1424952012666)\n" + "(1,80,2087.5000000000005,1424952012666)\n" +
+					"(0,100,1897.222222222223,1424952013666)\n" + "(1,85,2111.1111111111118,1424952013666)\n" +
+					"(0,95,1923.611111111112,1424952014666)\n" + "(1,90,2136.1111111111118,1424952014666)\n" +
+					"(0,100,1951.3888888888898,1424952015667)\n" + "(1,85,2159.722222222223,1424952015667)\n" +
+					"(0,95,1977.7777777777787,1424952016667)\n" + "(1,90,2184.722222222223,1424952016667)\n" +
+					"(0,100,2005.5555555555566,1424952017667)\n" + "(1,95,2211.1111111111118,1424952017668)";
+
+	public static final String TOP_SPEEDS =
+			"(0,55,15.277777777777777,1424951918630)\n" +
+					"(1,50,26.38888888888889,1424951919632)\n" +
+					"(0,65,108.33333333333333,1424951924635)\n" +
+					"(1,50,26.38888888888889,1424951919632)\n" +
+					"(0,65,108.33333333333333,1424951924635)\n" +
+					"(1,65,194.4444444444444,1424951932638)\n" +
+					"(0,65,108.33333333333333,1424951924635)\n" +
+					"(1,70,213.88888888888886,1424951933638)\n" +
+					"(0,60,218.05555555555551,1424951931637)\n" +
+					"(1,75,272.2222222222222,1424951936639)\n" +
+					"(0,55,233.3333333333333,1424951932637)\n" +
+					"(1,75,272.2222222222222,1424951936639)\n" +
+					"(1,75,272.2222222222222,1424951936639)\n" +
+					"(0,55,288.88888888888886,1424951936639)\n" +
+					"(1,70,329.16666666666663,1424951939640)\n" +
+					"(0,55,373.61111111111114,1424951942641)\n" +
+					"(1,80,519.4444444444443,1424951949644)\n" +
+					"(1,85,586.111111111111,1424951952645)\n" +
+					"(0,50,487.50000000000006,1424951951644)\n" +
+					"(1,85,586.111111111111,1424951952645)\n" +
+					"(0,60,590.2777777777778,1424951959647)\n" +
+					"(1,85,586.111111111111,1424951952645)\n" +
+					"(0,75,648.6111111111112,1424951962648)\n" +
+					"(1,85,715.2777777777776,1424951958647)\n" +
+					"(1,95,858.333333333333,1424951964649)\n" +
+					"(0,80,670.8333333333334,1424951963648)\n" +
+					"(1,95,858.333333333333,1424951964649)\n" +
+					"(0,80,670.8333333333334,1424951963648)\n" +
+					"(1,100,937.4999999999998,1424951967650)\n" +
+					"(1,100,937.4999999999998,1424951967650)\n" +
+					"(0,80,670.8333333333334,1424951963648)\n" +
+					"(1,100,937.4999999999998,1424951967650)\n" +
+					"(0,85,861.1111111111112,1424951972651)\n" +
+					"(1,100,937.4999999999998,1424951967650)\n" +
+					"(1,100,937.4999999999998,1424951967650)\n" +
+					"(0,85,861.1111111111112,1424951972651)\n" +
+					"(1,100,993.0555555555554,1424951969650)\n" +
+					"(0,85,861.1111111111112,1424951972651)\n" +
+					"(1,100,1048.611111111111,1424951971651)\n" +
+					"(1,100,1130.5555555555557,1424951974652)\n" +
+					"(0,90,1058.3333333333335,1424951981654)\n" +
+					"(1,100,1158.3333333333335,1424951975652)\n" +
+					"(0,95,1133.3333333333335,1424951984655)\n" +
+					"(1,100,1240.277777777778,1424951978653)\n" +
+					"(0,95,1133.3333333333335,1424951984655)\n" +
+					"(1,100,1268.0555555555559,1424951979654)\n" +
+					"(0,95,1133.3333333333335,1424951984655)\n" +
+					"(1,100,1323.6111111111115,1424951981654)\n" +
+					"(0,95,1133.3333333333335,1424951984655)\n" +
+					"(1,100,1379.1666666666672,1424951983655)\n" +
+					"(0,100,1358.3333333333335,1424951993659)\n" +
+					"(1,100,1563.8888888888896,1424951990658)\n" +
+					"(0,100,1358.3333333333335,1424951993659)\n" +
+					"(1,100,1563.8888888888896,1424951990658)\n" +
+					"(0,100,1358.3333333333335,1424951993659)\n" +
+					"(1,100,1563.8888888888896,1424951990658)\n" +
+					"(0,100,1358.3333333333335,1424951993659)\n" +
+					"(0,100,1358.3333333333335,1424951993659)\n" +
+					"(1,100,1669.4444444444453,1424951994659)\n" +
+					"(0,100,1440.277777777778,1424951996660)\n" +
+					"(1,90,1720.8333333333342,1424951996660)\n" +
+					"(0,100,1468.0555555555559,1424951997660)\n" +
+					"(1,95,1973.6111111111115,1424952007664)\n" +
+					"(0,100,1522.2222222222226,1424951999661)\n" +
+					"(0,100,1627.7777777777783,1424952003662)\n" +
+					"(1,95,1973.6111111111115,1424952007664)\n" +
+					"(0,100,1627.7777777777783,1424952003662)\n" +
+					"(1,95,1973.6111111111115,1424952007664)\n" +
+					"(0,100,1709.7222222222229,1424952006663)\n" +
+					"(0,100,1737.5000000000007,1424952007664)\n" +
+					"(1,95,1973.6111111111115,1424952007664)\n" +
+					"(0,100,1791.6666666666674,1424952009664)\n" +
+					"(1,95,2211.1111111111118,1424952017668)\n";
+
+	public static final String TOP_CASE_CLASS_SPEEDS =
+			"CarEvent(0,55,15.277777777777777,1424951918630)\n" +
+					"CarEvent(1,50,26.38888888888889,1424951919632)\n" +
+					"CarEvent(0,65,108.33333333333333,1424951924635)\n" +
+					"CarEvent(1,50,26.38888888888889,1424951919632)\n" +
+					"CarEvent(0,65,108.33333333333333,1424951924635)\n" +
+					"CarEvent(1,65,194.4444444444444,1424951932638)\n" +
+					"CarEvent(0,65,108.33333333333333,1424951924635)\n" +
+					"CarEvent(1,70,213.88888888888886,1424951933638)\n" +
+					"CarEvent(0,60,218.05555555555551,1424951931637)\n" +
+					"CarEvent(1,75,272.2222222222222,1424951936639)\n" +
+					"CarEvent(0,55,233.3333333333333,1424951932637)\n" +
+					"CarEvent(1,75,272.2222222222222,1424951936639)\n" +
+					"CarEvent(1,75,272.2222222222222,1424951936639)\n" +
+					"CarEvent(0,55,288.88888888888886,1424951936639)\n" +
+					"CarEvent(1,70,329.16666666666663,1424951939640)\n" +
+					"CarEvent(0,55,373.61111111111114,1424951942641)\n" +
+					"CarEvent(1,80,519.4444444444443,1424951949644)\n" +
+					"CarEvent(1,85,586.111111111111,1424951952645)\n" +
+					"CarEvent(0,50,487.50000000000006,1424951951644)\n" +
+					"CarEvent(1,85,586.111111111111,1424951952645)\n" +
+					"CarEvent(0,60,590.2777777777778,1424951959647)\n" +
+					"CarEvent(1,85,586.111111111111,1424951952645)\n" +
+					"CarEvent(0,75,648.6111111111112,1424951962648)\n" +
+					"CarEvent(1,85,715.2777777777776,1424951958647)\n" +
+					"CarEvent(1,95,858.333333333333,1424951964649)\n" +
+					"CarEvent(0,80,670.8333333333334,1424951963648)\n" +
+					"CarEvent(1,95,858.333333333333,1424951964649)\n" +
+					"CarEvent(0,80,670.8333333333334,1424951963648)\n" +
+					"CarEvent(1,100,937.4999999999998,1424951967650)\n" +
+					"CarEvent(1,100,937.4999999999998,1424951967650)\n" +
+					"CarEvent(0,80,670.8333333333334,1424951963648)\n" +
+					"CarEvent(1,100,937.4999999999998,1424951967650)\n" +
+					"CarEvent(0,85,861.1111111111112,1424951972651)\n" +
+					"CarEvent(1,100,937.4999999999998,1424951967650)\n" +
+					"CarEvent(1,100,937.4999999999998,1424951967650)\n" +
+					"CarEvent(0,85,861.1111111111112,1424951972651)\n" +
+					"CarEvent(1,100,993.0555555555554,1424951969650)\n" +
+					"CarEvent(0,85,861.1111111111112,1424951972651)\n" +
+					"CarEvent(1,100,1048.611111111111,1424951971651)\n" +
+					"CarEvent(1,100,1130.5555555555557,1424951974652)\n" +
+					"CarEvent(0,90,1058.3333333333335,1424951981654)\n" +
+					"CarEvent(1,100,1158.3333333333335,1424951975652)\n" +
+					"CarEvent(0,95,1133.3333333333335,1424951984655)\n" +
+					"CarEvent(1,100,1240.277777777778,1424951978653)\n" +
+					"CarEvent(0,95,1133.3333333333335,1424951984655)\n" +
+					"CarEvent(1,100,1268.0555555555559,1424951979654)\n" +
+					"CarEvent(0,95,1133.3333333333335,1424951984655)\n" +
+					"CarEvent(1,100,1323.6111111111115,1424951981654)\n" +
+					"CarEvent(0,95,1133.3333333333335,1424951984655)\n" +
+					"CarEvent(1,100,1379.1666666666672,1424951983655)\n" +
+					"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
+					"CarEvent(1,100,1563.8888888888896,1424951990658)\n" +
+					"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
+					"CarEvent(1,100,1563.8888888888896,1424951990658)\n" +
+					"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
+					"CarEvent(1,100,1563.8888888888896,1424951990658)\n" +
+					"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
+					"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
+					"CarEvent(1,100,1669.4444444444453,1424951994659)\n" +
+					"CarEvent(0,100,1440.277777777778,1424951996660)\n" +
+					"CarEvent(1,90,1720.8333333333342,1424951996660)\n" +
+					"CarEvent(0,100,1468.0555555555559,1424951997660)\n" +
+					"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
+					"CarEvent(0,100,1522.2222222222226,1424951999661)\n" +
+					"CarEvent(0,100,1627.7777777777783,1424952003662)\n" +
+					"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
+					"CarEvent(0,100,1627.7777777777783,1424952003662)\n" +
+					"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
+					"CarEvent(0,100,1709.7222222222229,1424952006663)\n" +
+					"CarEvent(0,100,1737.5000000000007,1424952007664)\n" +
+					"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
+					"CarEvent(0,100,1791.6666666666674,1424952009664)\n" +
+					"CarEvent(1,95,2211.1111111111118,1424952017668)\n";
+
+	private TopSpeedWindowingExampleData() {
+	}
+}