You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:38 UTC

[02/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and API

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
deleted file mode 100644
index ce312d3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class JumpingTimePreReducerTest {
-
-	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
-
-	ReduceFunction<Integer> reducer = new SumReducer();
-
-	@Test
-	public void testEmitWindow() throws Exception {
-
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-		List<StreamWindow<Integer>> collected = collector.getCollected();
-
-		WindowBuffer<Integer> wb = new JumpingTimePreReducer<Integer>(
-				reducer, serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-		}, 1));
-
-		wb.store(1);
-		wb.store(2);
-		wb.store(3);
-		wb.evict(1);
-		wb.emitWindow(collector);
-
-		assertEquals(1, collected.size());
-		assertEquals(StreamWindow.fromElements(5),
-				collected.get(0));
-
-		wb.store(4);
-		wb.store(5);
-
-		// Nothing should happen here
-		wb.evict(2);
-
-		wb.store(6);
-
-		wb.emitWindow(collector);
-		wb.evict(2);
-		wb.emitWindow(collector);
-		wb.store(12);
-		wb.emitWindow(collector);
-
-		assertEquals(3, collected.size());
-		assertEquals(StreamWindow.fromElements(11),
-				collected.get(1));
-		assertEquals(StreamWindow.fromElements(12),
-				collected.get(2));
-	}
-
-	private static class SumReducer implements ReduceFunction<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			return value1 + value2;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
deleted file mode 100644
index 7f58527..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.operators.windowing.WindowingITCase;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class SlidingCountGroupedPreReducerTest {
-
-	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
-
-	ReduceFunction<Integer> reducer = new SumReducer();
-
-	KeySelector<Integer, ?> key = new WindowingITCase.ModKey(2);
-
-	@Test
-	public void testPreReduce1() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
-				reducer, serializer, key, 3, 2, 0);
-
-		preReducer.store(1);
-		preReducer.store(2);
-		preReducer.emitWindow(collector);
-		preReducer.store(3);
-		preReducer.store(4);
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(5);
-		preReducer.store(6);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(7);
-		preReducer.store(8);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(9);
-		preReducer.store(10);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(11);
-		preReducer.store(12);
-		preReducer.emitWindow(collector);
-		preReducer.store(13);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(1, 2));
-		expected.add(StreamWindow.fromElements(3, 6));
-		expected.add(StreamWindow.fromElements(5, 10));
-		expected.add(StreamWindow.fromElements(7, 14));
-		expected.add(StreamWindow.fromElements(9, 18));
-		expected.add(StreamWindow.fromElements(11, 22));
-
-		checkResults(expected, collector.getCollected());
-	}
-
-	@Test
-	public void testPreReduce2() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
-				reducer, serializer, key, 5, 2, 0);
-
-		preReducer.store(1);
-		preReducer.store(2);
-		preReducer.emitWindow(collector);
-		preReducer.store(3);
-		preReducer.store(4);
-		preReducer.emitWindow(collector);
-		preReducer.store(5);
-		preReducer.store(6);
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(7);
-		preReducer.store(8);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(9);
-		preReducer.store(10);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(11);
-		preReducer.store(12);
-		preReducer.emitWindow(collector);
-		preReducer.store(13);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(1, 2));
-		expected.add(StreamWindow.fromElements(4, 6));
-		expected.add(StreamWindow.fromElements(12, 8));
-		expected.add(StreamWindow.fromElements(18, 12));
-		expected.add(StreamWindow.fromElements(24, 16));
-		expected.add(StreamWindow.fromElements(30, 20));
-
-		checkResults(expected, collector.getCollected());
-	}
-
-	@Test
-	public void testPreReduce3() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
-				reducer, serializer, key, 6, 3, 0);
-
-		preReducer.store(1);
-		preReducer.store(2);
-		preReducer.store(3);
-		preReducer.emitWindow(collector);
-		preReducer.store(4);
-		preReducer.store(5);
-		preReducer.store(6);
-		preReducer.emitWindow(collector);
-		preReducer.evict(3);
-		preReducer.store(7);
-		preReducer.store(8);
-		preReducer.store(9);
-		preReducer.emitWindow(collector);
-		preReducer.evict(3);
-		preReducer.store(10);
-		preReducer.store(11);
-		preReducer.store(12);
-		preReducer.emitWindow(collector);
-		preReducer.evict(3);
-		preReducer.store(13);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(2, 4));
-		expected.add(StreamWindow.fromElements(9, 12));
-		expected.add(StreamWindow.fromElements(21, 18));
-		expected.add(StreamWindow.fromElements(30, 27));
-
-		checkResults(expected, collector.getCollected());
-	}
-
-	@Test
-	public void testPreReduce4() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
-				reducer, serializer, key, 5, 1, 2);
-
-		preReducer.store(1);
-		preReducer.evict(1);
-		preReducer.store(1);
-		preReducer.evict(1);
-		preReducer.store(1);
-		preReducer.emitWindow(collector);
-		preReducer.store(2);
-		preReducer.emitWindow(collector);
-		preReducer.store(3);
-		preReducer.emitWindow(collector);
-		preReducer.store(4);
-		preReducer.emitWindow(collector);
-		preReducer.store(5);
-		preReducer.emitWindow(collector);
-		preReducer.evict(1);
-		preReducer.store(6);
-		preReducer.emitWindow(collector);
-		preReducer.evict(1);
-		preReducer.store(7);
-		preReducer.emitWindow(collector);
-		preReducer.evict(1);
-		preReducer.store(8);
-		preReducer.emitWindow(collector);
-		preReducer.evict(1);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(1));
-		expected.add(StreamWindow.fromElements(1, 2));
-		expected.add(StreamWindow.fromElements(4, 2));
-		expected.add(StreamWindow.fromElements(4, 6));
-		expected.add(StreamWindow.fromElements(9, 6));
-		expected.add(StreamWindow.fromElements(8, 12));
-		expected.add(StreamWindow.fromElements(15, 10));
-		expected.add(StreamWindow.fromElements(12, 18));
-
-		checkResults(expected, collector.getCollected());
-	}
-
-	private static class SumReducer implements ReduceFunction<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			return value1 + value2;
-		}
-
-	}
-
-
-	protected static void checkResults(List<StreamWindow<Integer>> expected,
-			List<StreamWindow<Integer>> actual) {
-
-		for (StreamWindow<Integer> sw : expected) {
-			Collections.sort(sw);
-		}
-
-		for (StreamWindow<Integer> sw : actual) {
-			Collections.sort(sw);
-		}
-
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
deleted file mode 100644
index 156b875..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class SlidingCountPreReducerTest {
-
-	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
-
-	ReduceFunction<Integer> reducer = new SumReducer();
-
-	@Test
-	public void testPreReduce1() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
-				serializer, 3, 2, 0);
-
-		preReducer.store(1);
-		preReducer.store(2);
-		preReducer.emitWindow(collector);
-		preReducer.store(3);
-		preReducer.store(4);
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(5);
-		preReducer.store(6);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(7);
-		preReducer.store(8);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(9);
-		preReducer.store(10);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(11);
-		preReducer.store(12);
-		preReducer.emitWindow(collector);
-		preReducer.store(13);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(3));
-		expected.add(StreamWindow.fromElements(9));
-		expected.add(StreamWindow.fromElements(15));
-		expected.add(StreamWindow.fromElements(21));
-		expected.add(StreamWindow.fromElements(27));
-		expected.add(StreamWindow.fromElements(33));
-
-		assertEquals(expected, collector.getCollected());
-	}
-
-	@Test
-	public void testPreReduce2() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
-				serializer, 5, 2, 0);
-
-		preReducer.store(1);
-		preReducer.store(2);
-		preReducer.emitWindow(collector);
-		preReducer.store(3);
-		preReducer.store(4);
-		preReducer.emitWindow(collector);
-		preReducer.store(5);
-		preReducer.store(6);
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(7);
-		preReducer.store(8);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(9);
-		preReducer.store(10);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(11);
-		preReducer.store(12);
-		preReducer.emitWindow(collector);
-		preReducer.store(13);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(3));
-		expected.add(StreamWindow.fromElements(10));
-		expected.add(StreamWindow.fromElements(20));
-		expected.add(StreamWindow.fromElements(30));
-		expected.add(StreamWindow.fromElements(40));
-		expected.add(StreamWindow.fromElements(50));
-
-		assertEquals(expected, collector.getCollected());
-	}
-
-	@Test
-	public void testPreReduce3() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
-				serializer, 6, 3, 0);
-
-		preReducer.store(1);
-		preReducer.store(2);
-		preReducer.store(3);
-		preReducer.emitWindow(collector);
-		preReducer.store(4);
-		preReducer.store(5);
-		preReducer.store(6);
-		preReducer.emitWindow(collector);
-		preReducer.evict(3);
-		preReducer.store(7);
-		preReducer.store(8);
-		preReducer.store(9);
-		preReducer.emitWindow(collector);
-		preReducer.evict(3);
-		preReducer.store(10);
-		preReducer.store(11);
-		preReducer.store(12);
-		preReducer.emitWindow(collector);
-		preReducer.evict(3);
-		preReducer.store(13);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(6));
-		expected.add(StreamWindow.fromElements(21));
-		expected.add(StreamWindow.fromElements(39));
-		expected.add(StreamWindow.fromElements(57));
-
-		assertEquals(expected, collector.getCollected());
-	}
-
-	@Test
-	public void testPreReduce4() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
-				serializer, 5, 1, 2);
-
-		preReducer.store(1);
-		preReducer.evict(1);
-		preReducer.store(1);
-		preReducer.evict(1);
-		preReducer.store(1);
-		preReducer.emitWindow(collector);
-		preReducer.store(2);
-		preReducer.emitWindow(collector);
-		preReducer.store(3);
-		preReducer.emitWindow(collector);
-		preReducer.store(4);
-		preReducer.emitWindow(collector);
-		preReducer.store(5);
-		preReducer.emitWindow(collector);
-		preReducer.evict(1);
-		preReducer.store(6);
-		preReducer.emitWindow(collector);
-		preReducer.evict(1);
-		preReducer.store(7);
-		preReducer.emitWindow(collector);
-		preReducer.evict(1);
-		preReducer.store(8);
-		preReducer.emitWindow(collector);
-		preReducer.evict(1);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(1));
-		expected.add(StreamWindow.fromElements(3));
-		expected.add(StreamWindow.fromElements(6));
-		expected.add(StreamWindow.fromElements(10));
-		expected.add(StreamWindow.fromElements(15));
-		expected.add(StreamWindow.fromElements(20));
-		expected.add(StreamWindow.fromElements(25));
-		expected.add(StreamWindow.fromElements(30));
-
-		assertEquals(expected, collector.getCollected());
-	}
-
-	private static class SumReducer implements ReduceFunction<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			return value1 + value2;
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
deleted file mode 100644
index 68bceda..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.operators.windowing.WindowingITCase;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class SlidingTimeGroupedPreReducerTest {
-
-	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
-	TypeInformation<Tuple2<Integer,Integer>> tupleType = TypeInfoParser.parse("Tuple2<Integer,Integer>");
-
-
-	ReduceFunction<Integer> reducer = new SumReducer();
-	ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer();
-
-
-	KeySelector<Integer, ?> key = new WindowingITCase.ModKey(2);
-	KeySelector<Tuple2<Integer, Integer>, ?> tupleKey = new TupleModKey(2);
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testPreReduce1() throws Exception {
-		// This ensures that the buffer is properly cleared after a burst of elements by
-		// replaying the same sequence of elements with a later timestamp and expecting the same
-		// result.
-
-		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
-
-		SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>>(tupleReducer,
-				tupleType.createSerializer(new ExecutionConfig()), tupleKey, 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public long getTimestamp(Tuple2<Integer, Integer> value) {
-				return value.f0;
-			}
-		}, 1));
-
-		int timeOffset = 0;
-
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
-		preReducer.emitWindow(collector);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
-		preReducer.emitWindow(collector);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
-
-		// ensure that everything is cleared out
-		preReducer.evict(100);
-
-
-		timeOffset = 25; // a little while later...
-
-		// Repeat the same sequence, this should produce the same result
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
-		preReducer.emitWindow(collector);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
-		preReducer.emitWindow(collector);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
-
-		List<StreamWindow<Tuple2<Integer, Integer>>> expected = new ArrayList<StreamWindow<Tuple2<Integer, Integer>>>();
-		timeOffset = 0; // rewind ...
-		expected.add(StreamWindow.fromElements(
-				new Tuple2<Integer, Integer>(timeOffset + 2, 2),
-				new Tuple2<Integer, Integer>(timeOffset + 1, 1)));
-		expected.add(StreamWindow.fromElements(
-				new Tuple2<Integer, Integer>(timeOffset + 2, 6),
-				new Tuple2<Integer, Integer>(timeOffset + 3, 3)));
-		expected.add(StreamWindow.fromElements(
-				new Tuple2<Integer, Integer>(timeOffset + 4, 10),
-				new Tuple2<Integer, Integer>(timeOffset + 5, 5)));
-		expected.add(StreamWindow.fromElements(
-				new Tuple2<Integer, Integer>(timeOffset + 6, 14),
-				new Tuple2<Integer, Integer>(timeOffset + 7, 7)));
-		expected.add(StreamWindow.fromElements(
-				new Tuple2<Integer, Integer>(timeOffset + 8, 18),
-				new Tuple2<Integer, Integer>(timeOffset + 9, 9)));
-		expected.add(StreamWindow.fromElements(
-				new Tuple2<Integer, Integer>(timeOffset + 10, 22),
-				new Tuple2<Integer, Integer>(timeOffset + 11, 11)));
-
-		timeOffset = 25; // and back to the future ...
-		expected.add(StreamWindow.fromElements(
-				new Tuple2<Integer, Integer>(timeOffset + 2, 2),
-				new Tuple2<Integer, Integer>(timeOffset + 1, 1)));
-		expected.add(StreamWindow.fromElements(
-				new Tuple2<Integer, Integer>(timeOffset + 2, 6),
-				new Tuple2<Integer, Integer>(timeOffset + 3, 3)));
-		expected.add(StreamWindow.fromElements(
-				new Tuple2<Integer, Integer>(timeOffset + 4, 10),
-				new Tuple2<Integer, Integer>(timeOffset + 5, 5)));
-		expected.add(StreamWindow.fromElements(
-				new Tuple2<Integer, Integer>(timeOffset + 6, 14),
-				new Tuple2<Integer, Integer>(timeOffset + 7, 7)));
-		expected.add(StreamWindow.fromElements(
-				new Tuple2<Integer, Integer>(timeOffset + 8, 18),
-				new Tuple2<Integer, Integer>(timeOffset + 9, 9)));
-		expected.add(StreamWindow.fromElements(
-				new Tuple2<Integer, Integer>(timeOffset + 10, 22),
-				new Tuple2<Integer, Integer>(timeOffset + 11, 11)));
-
-		assertEquals(expected, collector.getCollected());
-	}
-
-	protected static void checkResults(List<StreamWindow<Integer>> expected,
-			List<StreamWindow<Integer>> actual) {
-
-		for (StreamWindow<Integer> sw : expected) {
-			Collections.sort(sw);
-		}
-
-		for (StreamWindow<Integer> sw : actual) {
-			Collections.sort(sw);
-		}
-
-		assertEquals(expected, actual);
-	}
-
-	@Test
-	public void testPreReduce2() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
-				reducer, serializer, key, 5, 2, new TimestampWrapper<Integer>(
-						new Timestamp<Integer>() {
-
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public long getTimestamp(Integer value) {
-								return value;
-							}
-						}, 1));
-
-		preReducer.store(1);
-		preReducer.store(2);
-		preReducer.emitWindow(collector);
-		preReducer.store(3);
-		preReducer.store(4);
-		preReducer.emitWindow(collector);
-		preReducer.store(5);
-		preReducer.store(6);
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(7);
-		preReducer.store(8);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(9);
-		preReducer.store(10);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(11);
-		preReducer.store(12);
-		preReducer.emitWindow(collector);
-		preReducer.store(13);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(1, 2));
-		expected.add(StreamWindow.fromElements(4, 6));
-		expected.add(StreamWindow.fromElements(12, 8));
-		expected.add(StreamWindow.fromElements(18, 12));
-		expected.add(StreamWindow.fromElements(24, 16));
-		expected.add(StreamWindow.fromElements(30, 20));
-
-		checkResults(expected, collector.getCollected());
-	}
-
-	@Test
-	public void testPreReduce3() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
-				reducer, serializer, key, 6, 3, new TimestampWrapper<Integer>(
-						new Timestamp<Integer>() {
-
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public long getTimestamp(Integer value) {
-								return value;
-							}
-						}, 1));
-
-		preReducer.store(1);
-		preReducer.store(2);
-		preReducer.store(3);
-		preReducer.emitWindow(collector);
-		preReducer.store(4);
-		preReducer.store(5);
-		preReducer.store(6);
-		preReducer.emitWindow(collector);
-		preReducer.evict(3);
-		preReducer.store(7);
-		preReducer.store(8);
-		preReducer.store(9);
-		preReducer.emitWindow(collector);
-		preReducer.evict(3);
-		preReducer.store(10);
-		preReducer.store(11);
-		preReducer.store(12);
-		preReducer.emitWindow(collector);
-		preReducer.evict(3);
-		preReducer.store(13);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(2, 4));
-		expected.add(StreamWindow.fromElements(9, 12));
-		expected.add(StreamWindow.fromElements(21, 18));
-		expected.add(StreamWindow.fromElements(30, 27));
-
-		checkResults(expected, collector.getCollected());
-	}
-
-	@Test
-	public void testPreReduce4() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
-				reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(
-						new Timestamp<Integer>() {
-
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public long getTimestamp(Integer value) {
-								return value;
-							}
-						}, 1));
-
-		preReducer.store(1);
-		preReducer.store(2);
-		preReducer.emitWindow(collector);
-		preReducer.store(3);
-		preReducer.store(4);
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(5);
-		preReducer.store(6);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(7);
-		preReducer.store(8);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.emitWindow(collector);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(14);
-		preReducer.emitWindow(collector);
-		preReducer.emitWindow(collector);
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.emitWindow(collector);
-		preReducer.store(21);
-		preReducer.emitWindow(collector);
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-
-		preReducer.store(9);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(1, 2));
-		expected.add(StreamWindow.fromElements(3, 6));
-		expected.add(StreamWindow.fromElements(5, 10));
-		expected.add(StreamWindow.fromElements(7, 14));
-		expected.add(StreamWindow.fromElements(8));
-		expected.add(StreamWindow.fromElements(8));
-		expected.add(StreamWindow.fromElements(14));
-		expected.add(StreamWindow.fromElements(14));
-		expected.add(StreamWindow.fromElements(21));
-
-		checkResults(expected, collector.getCollected());
-	}
-
-	private static class SumReducer implements ReduceFunction<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			return value1 + value2;
-		}
-
-	}
-
-	private static class TupleSumReducer implements ReduceFunction<Tuple2<Integer, Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
-			return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
-		}
-
-	}
-
-	public static class TupleModKey implements KeySelector<Tuple2<Integer, Integer>, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		private int m;
-
-		public TupleModKey(int m) {
-			this.m = m;
-		}
-
-		@Override
-		public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
-			return value.f1 % m;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
deleted file mode 100644
index 6a36c57..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class SlidingTimePreReducerTest {
-
-	TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
-	TypeInformation<Tuple2<Integer,Integer>> tupleType = TypeInfoParser.parse("Tuple2<Integer,Integer>");
-
-	ReduceFunction<Integer> reducer = new SumReducer();
-	ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer();
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testPreReduce1() throws Exception {
-		// This ensures that the buffer is properly cleared after a burst of elements by
-		// replaying the same sequence of elements with a later timestamp and expecting the same
-		// result.
-
-		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
-
-		SlidingTimePreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimePreReducer<Tuple2<Integer, Integer>>(tupleReducer,
-				tupleType.createSerializer(new ExecutionConfig()), 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public long getTimestamp(Tuple2<Integer, Integer> value) {
-						return value.f0;
-					}
-				}, 1));
-
-		int timeOffset = 0;
-
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
-		preReducer.emitWindow(collector);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
-		preReducer.emitWindow(collector);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
-
-		// ensure that everything is cleared out
-		preReducer.evict(100);
-
-
-		timeOffset = 25; // a little while later...
-
-		// Repeat the same sequence, this should produce the same result
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
-		preReducer.emitWindow(collector);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
-		preReducer.emitWindow(collector);
-		preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
-
-		List<StreamWindow<Tuple2<Integer, Integer>>> expected = new ArrayList<StreamWindow<Tuple2<Integer, Integer>>>();
-		timeOffset = 0; // rewind ...
-		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 1, 3)));
-		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 2, 9)));
-		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 4, 15)));
-		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 6, 21)));
-		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 8, 27)));
-		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 10, 33)));
-
-		timeOffset = 25; // and back to the future ...
-		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 1, 3)));
-		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 2, 9)));
-		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 4, 15)));
-		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 6, 21)));
-		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 8, 27)));
-		expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 10, 33)));
-
-
-		assertEquals(expected, collector.getCollected());
-	}
-
-	@Test
-	public void testPreReduce2() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
-				serializer, 5, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public long getTimestamp(Integer value) {
-						return value;
-					}
-				}, 1));
-
-		preReducer.store(1);
-		preReducer.store(2);
-		preReducer.emitWindow(collector);
-		preReducer.store(3);
-		preReducer.store(4);
-		preReducer.emitWindow(collector);
-		preReducer.store(5);
-		preReducer.store(6);
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(7);
-		preReducer.store(8);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(9);
-		preReducer.store(10);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(11);
-		preReducer.store(12);
-		preReducer.emitWindow(collector);
-		preReducer.store(13);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(3));
-		expected.add(StreamWindow.fromElements(10));
-		expected.add(StreamWindow.fromElements(20));
-		expected.add(StreamWindow.fromElements(30));
-		expected.add(StreamWindow.fromElements(40));
-		expected.add(StreamWindow.fromElements(50));
-
-		assertEquals(expected, collector.getCollected());
-	}
-
-	@Test
-	public void testPreReduce3() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
-				serializer, 6, 3, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public long getTimestamp(Integer value) {
-						return value;
-					}
-				}, 1));
-
-		preReducer.store(1);
-		preReducer.store(2);
-		preReducer.store(3);
-		preReducer.emitWindow(collector);
-		preReducer.store(4);
-		preReducer.store(5);
-		preReducer.store(6);
-		preReducer.emitWindow(collector);
-		preReducer.evict(3);
-		preReducer.store(7);
-		preReducer.store(8);
-		preReducer.store(9);
-		preReducer.emitWindow(collector);
-		preReducer.evict(3);
-		preReducer.store(10);
-		preReducer.store(11);
-		preReducer.store(12);
-		preReducer.emitWindow(collector);
-		preReducer.evict(3);
-		preReducer.store(13);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(6));
-		expected.add(StreamWindow.fromElements(21));
-		expected.add(StreamWindow.fromElements(39));
-		expected.add(StreamWindow.fromElements(57));
-
-		assertEquals(expected, collector.getCollected());
-	}
-
-	@Test
-	public void testPreReduce4() throws Exception {
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
-		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
-				serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public long getTimestamp(Integer value) {
-						return value;
-					}
-				}, 1));
-
-		preReducer.store(1);
-		preReducer.store(2);
-		preReducer.emitWindow(collector);
-		preReducer.store(3);
-		preReducer.store(4);
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(5);
-		preReducer.store(6);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(7);
-		preReducer.store(8);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.emitWindow(collector);
-		preReducer.emitWindow(collector);
-		preReducer.evict(2);
-		preReducer.store(14);
-		preReducer.emitWindow(collector);
-		preReducer.emitWindow(collector);
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-		preReducer.emitWindow(collector);
-		preReducer.store(21);
-		preReducer.emitWindow(collector);
-		preReducer.evict(1);
-		preReducer.emitWindow(collector);
-
-		preReducer.store(9);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(3));
-		expected.add(StreamWindow.fromElements(9));
-		expected.add(StreamWindow.fromElements(15));
-		expected.add(StreamWindow.fromElements(21));
-		expected.add(StreamWindow.fromElements(8));
-		expected.add(StreamWindow.fromElements(8));
-		expected.add(StreamWindow.fromElements(14));
-		expected.add(StreamWindow.fromElements(14));
-		expected.add(StreamWindow.fromElements(21));
-
-		assertEquals(expected, collector.getCollected());
-	}
-
-	private static class SumReducer implements ReduceFunction<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			return value1 + value2;
-		}
-
-	}
-
-	private static class TupleSumReducer implements ReduceFunction<Tuple2<Integer, Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
-			return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
deleted file mode 100644
index 3aee288..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.junit.Test;
-
-public class TumblingGroupedPreReducerTest {
-
-	TypeInformation<Tuple2<Integer, Integer>> type = TypeExtractor
-			.getForObject(new Tuple2<Integer, Integer>(1, 1));
-	TypeSerializer<Tuple2<Integer, Integer>> serializer = type.createSerializer(null);
-
-	KeySelector<Tuple2<Integer, Integer>, ?> key = KeySelectorUtil.getSelectorForKeys(
-			new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[] { 0 }, type), type, null);
-
-	Reducer reducer = new Reducer();
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testEmitWindow() throws Exception {
-
-		List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
-		inputs.add(new Tuple2<Integer, Integer>(1, 1));
-		inputs.add(new Tuple2<Integer, Integer>(0, 0));
-		inputs.add(new Tuple2<Integer, Integer>(1, -1));
-		inputs.add(new Tuple2<Integer, Integer>(1, -2));
-
-		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
-		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
-
-		WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingGroupedPreReducer<Tuple2<Integer, Integer>>(
-				reducer, key, serializer);
-
-		wb.store(serializer.copy(inputs.get(0)));
-		wb.store(serializer.copy(inputs.get(1)));
-		wb.emitWindow(collector);
-		wb.evict(2);
-
-		assertEquals(1, collected.size());
-
-		assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(1, 1),
-				new Tuple2<Integer, Integer>(0, 0)), collected.get(0));
-
-		wb.store(serializer.copy(inputs.get(0)));
-		wb.store(serializer.copy(inputs.get(1)));
-		wb.store(serializer.copy(inputs.get(2)));
-
-		wb.store(serializer.copy(inputs.get(3)));
-
-		wb.emitWindow(collector);
-		wb.evict(4);
-
-		assertEquals(2, collected.size());
-
-		assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, -2),
-				new Tuple2<Integer, Integer>(0, 0)), collected.get(1));
-
-		// Test whether function is mutating inputs or not
-		assertEquals(2, reducer.allInputs.size());
-		assertEquals(reducer.allInputs.get(0), inputs.get(2));
-		assertEquals(reducer.allInputs.get(1), inputs.get(3));
-
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testEmitWindow2() throws Exception {
-
-		List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
-		inputs.add(new Tuple2<Integer, Integer>(1, 1));
-		inputs.add(new Tuple2<Integer, Integer>(0, 0));
-		inputs.add(new Tuple2<Integer, Integer>(1, -1));
-		inputs.add(new Tuple2<Integer, Integer>(1, -2));
-
-		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
-		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
-
-		WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingGroupedPreReducer<Tuple2<Integer, Integer>>(
-				reducer, key, serializer).sequentialID();
-
-		wb.store(serializer.copy(inputs.get(0)));
-		wb.store(serializer.copy(inputs.get(1)));
-		wb.emitWindow(collector);
-		wb.evict(2);
-		
-		assertSetEquals(StreamWindow.fromElements(inputs.get(0), inputs.get(1)), collected.get(0));
-		
-		wb.store(serializer.copy(inputs.get(0)));
-		wb.store(serializer.copy(inputs.get(1)));
-		wb.store(serializer.copy(inputs.get(2)));
-		wb.emitWindow(collector);
-		wb.evict(3);
-		
-		assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(2, 0), inputs.get(1)), collected.get(1));
-
-		
-	}
-
-	private static <T> void assertSetEquals(Collection<T> first, Collection<T> second) {
-		assertEquals(new HashSet<T>(first), new HashSet<T>(second));
-	}
-
-	@SuppressWarnings("serial")
-	private class Reducer implements ReduceFunction<Tuple2<Integer, Integer>> {
-
-		public List<Tuple2<Integer, Integer>> allInputs = new ArrayList<Tuple2<Integer, Integer>>();
-
-		@Override
-		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
-				Tuple2<Integer, Integer> value2) throws Exception {
-			allInputs.add(value2);
-			value1.f0 = value1.f0 + value2.f0;
-			value1.f1 = value1.f1 + value2.f1;
-			return value1;
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
deleted file mode 100644
index 3e537a5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class TumblingPreReducerTest {
-
-	TypeSerializer<Tuple2<Integer, Integer>> serializer = TypeExtractor.getForObject(
-			new Tuple2<Integer, Integer>(1, 1)).createSerializer(null);
-
-	Reducer reducer = new Reducer();
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testEmitWindow() throws Exception {
-
-		List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
-		inputs.add(new Tuple2<Integer, Integer>(1, 1));
-		inputs.add(new Tuple2<Integer, Integer>(2, 0));
-		inputs.add(new Tuple2<Integer, Integer>(3, -1));
-		inputs.add(new Tuple2<Integer, Integer>(4, -2));
-
-		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
-		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
-
-		WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingPreReducer<Tuple2<Integer, Integer>>(
-				reducer, serializer);
-
-		wb.store(serializer.copy(inputs.get(0)));
-		wb.store(serializer.copy(inputs.get(1)));
-
-		wb.emitWindow(collector);
-		wb.evict(2);
-
-		assertEquals(1, collected.size());
-		assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, 1)),
-				collected.get(0));
-
-		wb.store(serializer.copy(inputs.get(0)));
-		wb.store(serializer.copy(inputs.get(1)));
-		wb.store(serializer.copy(inputs.get(2)));
-
-		wb.store(serializer.copy(inputs.get(3)));
-
-		wb.emitWindow(collector);
-		wb.evict(4);
-
-		assertEquals(2, collected.size());
-		assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(10, -2)),
-				collected.get(1));
-
-		// Test whether function is mutating inputs or not
-		assertEquals(4, reducer.allInputs.size());
-		assertEquals(reducer.allInputs.get(0), inputs.get(1));
-		assertEquals(reducer.allInputs.get(1), inputs.get(1));
-		assertEquals(reducer.allInputs.get(2), inputs.get(2));
-		assertEquals(reducer.allInputs.get(3), inputs.get(3));
-
-	}
-
-	@SuppressWarnings("serial")
-	private class Reducer implements ReduceFunction<Tuple2<Integer, Integer>> {
-
-		public List<Tuple2<Integer, Integer>> allInputs = new ArrayList<Tuple2<Integer, Integer>>();
-
-		@Override
-		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
-				Tuple2<Integer, Integer> value2) throws Exception {
-			allInputs.add(value2);
-			value1.f0 = value1.f0 + value2.f0;
-			value1.f1 = value1.f1 + value2.f1;
-			return value1;
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 99d45bb..4c73e44 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -17,32 +17,35 @@
 
 package org.apache.flink.streaming.examples.ml;
 
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Skeleton for incremental machine learning algorithm consisting of a
  * pre-computed model, which gets updated for the new inputs and new input data
  * for which the job provides predictions.
- * <p/>
+ *
  * <p>
  * This may serve as a base of a number of algorithms, e.g. updating an
  * incremental Alternating Least Squares model while also providing the
  * predictions.
- * </p>
- * <p/>
+ *
  * <p/>
  * This example shows how to use:
  * <ul>
- * <li>Connected streams
- * <li>CoFunctions
- * <li>Tuple data types
+ *   <li>Connected streams
+ *   <li>CoFunctions
+ *   <li>Tuple data types
  * </ul>
  */
 public class IncrementalLearningSkeleton {
@@ -61,12 +64,16 @@ public class IncrementalLearningSkeleton {
 		}
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
 		trainingData = env.addSource(new FiniteTrainingDataSource());
 		newData = env.addSource(new FiniteNewDataSource());
 
 		// build new model on every second of new data
-		DataStream<Double[]> model = trainingData.window(Time.of(5000, new LinearTimestamp()))
-				.mapWindow(new PartialModelBuilder()).flatten();
+		DataStream<Double[]> model = trainingData
+				.extractTimestamp(new LinearTimestamp())
+				.timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+				.apply(new PartialModelBuilder());
 
 		// use partial model for newData
 		DataStream<Integer> prediction = newData.connect(model).map(new Predictor());
@@ -140,21 +147,32 @@ public class IncrementalLearningSkeleton {
 		}
 	}
 
-	public static class LinearTimestamp implements Timestamp<Integer> {
+	public static class LinearTimestamp implements TimestampExtractor<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		private long counter = 0L;
 
 		@Override
-		public long getTimestamp(Integer value) {
+		public long extractTimestamp(Integer element, long currentTimestamp) {
 			return counter += 10L;
 		}
+
+		@Override
+		public long emitWatermark(Integer element, long currentTimestamp) {
+			return counter - 1;
+		}
+
+		@Override
+		public long getCurrentWatermark() {
+			return Long.MIN_VALUE;
+		}
+
 	}
 
 	/**
 	 * Builds up-to-date partial models on new training data.
 	 */
-	public static class PartialModelBuilder implements WindowMapFunction<Integer, Double[]> {
+	public static class PartialModelBuilder implements AllWindowFunction<Integer, Double[], TimeWindow> {
 		private static final long serialVersionUID = 1L;
 
 		protected Double[] buildPartialModel(Iterable<Integer> values) {
@@ -162,7 +180,7 @@ public class IncrementalLearningSkeleton {
 		}
 
 		@Override
-		public void mapWindow(Iterable<Integer> values, Collector<Double[]> out) throws Exception {
+		public void apply(TimeWindow window, Iterable<Integer> values, Collector<Double[]> out) throws Exception {
 			out.collect(buildPartialModel(values));
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
index dedc5ee..8a6cd88 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
@@ -19,15 +19,13 @@ package org.apache.flink.streaming.examples.ml.util;
 
 public class IncrementalLearningSkeletonData {
 
-	public static final String RESULTS = "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n"
-			+ "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
-			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
-			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
-			"0\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"
-			+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
-			"1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
-			"1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
-			"1\n";
+	public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
+			"1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n";
 
 	private IncrementalLearningSkeletonData() {
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 4730cc1..950b0f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -18,11 +18,14 @@
 package org.apache.flink.streaming.examples.windowing;
 
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
-import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
+import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -37,37 +40,36 @@ public class SessionWindowing {
 		}
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(2);
 
-		final List<Tuple3<String, Long, Integer>> input = new ArrayList<Tuple3<String, Long, Integer>>();
+		final List<Tuple3<String, Long, Integer>> input = new ArrayList<>();
 
-		input.add(new Tuple3<String, Long, Integer>("a", 1L, 1));
-		input.add(new Tuple3<String, Long, Integer>("b", 1L, 1));
-		input.add(new Tuple3<String, Long, Integer>("b", 3L, 1));
-		input.add(new Tuple3<String, Long, Integer>("b", 5L, 1));
-		input.add(new Tuple3<String, Long, Integer>("c", 6L, 1));
+		input.add(new Tuple3<>("a", 1L, 1));
+		input.add(new Tuple3<>("b", 1L, 1));
+		input.add(new Tuple3<>("b", 3L, 1));
+		input.add(new Tuple3<>("b", 5L, 1));
+		input.add(new Tuple3<>("c", 6L, 1));
 		// We expect to detect the session "a" earlier than this point (the old
 		// functionality can only detect here when the next starts)
-		input.add(new Tuple3<String, Long, Integer>("a", 10L, 1));
+		input.add(new Tuple3<>("a", 10L, 1));
 		// We expect to detect session "b" and "c" at this point as well
-		input.add(new Tuple3<String, Long, Integer>("c", 11L, 1));
+		input.add(new Tuple3<>("c", 11L, 1));
 
 		DataStream<Tuple3<String, Long, Integer>> source = env
-				.addSource(new SourceFunction<Tuple3<String, Long, Integer>>() {
+				.addSource(new EventTimeSourceFunction<Tuple3<String,Long,Integer>>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
 					public void run(SourceContext<Tuple3<String, Long, Integer>> ctx) throws Exception {
 						for (Tuple3<String, Long, Integer> value : input) {
-							// We sleep three seconds between every output so we
-							// can see whether we properly detect sessions
-							// before the next start for a specific id
-							ctx.collect(value);
+							ctx.collectWithTimestamp(value, value.f1);
+							ctx.emitWatermark(new Watermark(value.f1 - 1));
 							if (!fileOutput) {
 								System.out.println("Collected: " + value);
-								Thread.sleep(3000);
 							}
 						}
+						ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
 					}
 
 					@Override
@@ -76,10 +78,11 @@ public class SessionWindowing {
 				});
 
 		// We create sessions for each id with max timeout of 3 time units
-		DataStream<Tuple3<String, Long, Integer>> aggregated = source.keyBy(0)
-				.window(new SessionTriggerPolicy(3L),
-						new TumblingEvictionPolicy<Tuple3<String, Long, Integer>>()).sum(2)
-				.flatten();
+		DataStream<Tuple3<String, Long, Integer>> aggregated = source
+				.keyBy(0)
+				.window(GlobalWindows.create())
+				.trigger(new SessionTrigger(3L))
+				.sum(2);
 
 		if (fileOutput) {
 			aggregated.writeAsText(outputPath);
@@ -90,55 +93,46 @@ public class SessionWindowing {
 		env.execute();
 	}
 
-	private static class SessionTriggerPolicy implements
-			CentralActiveTrigger<Tuple3<String, Long, Integer>> {
+	private static class SessionTrigger implements Trigger<Tuple3<String, Long, Integer>, GlobalWindow> {
 
 		private static final long serialVersionUID = 1L;
 
 		private volatile Long lastSeenEvent = 1L;
 		private Long sessionTimeout;
 
-		public SessionTriggerPolicy(Long sessionTimeout) {
+		public SessionTrigger(Long sessionTimeout) {
 			this.sessionTimeout = sessionTimeout;
 
 		}
 
 		@Override
-		public boolean notifyTrigger(Tuple3<String, Long, Integer> datapoint) {
-
-			Long eventTimestamp = datapoint.f1;
-			Long timeSinceLastEvent = eventTimestamp - lastSeenEvent;
+		public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) {
+			Long timeSinceLastEvent = timestamp - lastSeenEvent;
 
 			// Update the last seen event time
-			lastSeenEvent = eventTimestamp;
+			lastSeenEvent = timestamp;
+
+			ctx.registerWatermarkTimer(lastSeenEvent + sessionTimeout);
 
 			if (timeSinceLastEvent > sessionTimeout) {
-				return true;
+				return TriggerResult.FIRE_AND_PURGE;
 			} else {
-				return false;
+				return TriggerResult.CONTINUE;
 			}
 		}
 
 		@Override
-		public Object[] notifyOnLastGlobalElement(Tuple3<String, Long, Integer> datapoint) {
-			Long eventTimestamp = datapoint.f1;
-			Long timeSinceLastEvent = eventTimestamp - lastSeenEvent;
-
-			// Here we dont update the last seen event time because this data
-			// belongs to a different group
-
-			if (timeSinceLastEvent > sessionTimeout) {
-				return new Object[]{datapoint};
-			} else {
-				return null;
+		public TriggerResult onTime(long time, TriggerContext ctx) {
+			if (time - lastSeenEvent >= sessionTimeout) {
+				return TriggerResult.FIRE_AND_PURGE;
 			}
+			return TriggerResult.CONTINUE;
 		}
 
 		@Override
-		public SessionTriggerPolicy clone() {
-			return new SessionTriggerPolicy(sessionTimeout);
+		public SessionTrigger duplicate() {
+			return new SessionTrigger(sessionTimeout);
 		}
-
 	}
 
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 55d48dd..a46ffd9 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -19,16 +19,20 @@ package org.apache.flink.streaming.examples.windowing;
 
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.helper.Delta;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
 
 import java.util.Arrays;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 /**
  * An example of grouped stream windowing where different eviction and trigger
@@ -52,29 +56,37 @@ public class TopSpeedWindowing {
 		}
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
 		@SuppressWarnings({"rawtypes", "serial"})
 		DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
+
 		if (fileInput) {
 			carData = env.readTextFile(inputPath).map(new ParseCarData());
 		} else {
 			carData = env.addSource(CarSource.create(numOfCars));
 		}
-		DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData.keyBy(0)
-				.window(Time.of(evictionSec * 1000, new CarTimestamp()))
-				.every(Delta.of(triggerMeters,
+
+		DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
+				.extractTimestamp(new CarTimestamp())
+				.keyBy(0)
+				.window(GlobalWindows.create())
+				.evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS)))
+				.trigger(DeltaTrigger.of(triggerMeters,
 						new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
 							private static final long serialVersionUID = 1L;
 
-
 							@Override
 							public double getDelta(
 									Tuple4<Integer, Integer, Double, Long> oldDataPoint,
 									Tuple4<Integer, Integer, Double, Long> newDataPoint) {
 								return newDataPoint.f2 - oldDataPoint.f2;
 							}
-						}, new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 0l))).local().maxBy(1).flatten();
+						}))
+				.maxBy(1);
+
 		if (fileOutput) {
+			topSpeeds.print();
 			topSpeeds.writeAsText(outputPath);
 		} else {
 			topSpeeds.print();
@@ -143,17 +155,28 @@ public class TopSpeedWindowing {
 		public Tuple4<Integer, Integer, Double, Long> map(String record) {
 			String rawData = record.substring(1, record.length() - 1);
 			String[] data = rawData.split(",");
-			return new Tuple4<Integer, Integer, Double, Long>(Integer.valueOf(data[0]),
-					Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3]));
+			return new Tuple4<>(Integer.valueOf(data[0]), Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3]));
 		}
 	}
 
-	private static class CarTimestamp implements Timestamp<Tuple4<Integer, Integer, Double, Long>> {
+	private static class CarTimestamp implements TimestampExtractor<Tuple4<Integer, Integer, Double, Long>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public long getTimestamp(Tuple4<Integer, Integer, Double, Long> value) {
-			return value.f3;
+		public long extractTimestamp(Tuple4<Integer, Integer, Double, Long> element,
+				long currentTimestamp) {
+			return element.f3;
+		}
+
+		@Override
+		public long emitWatermark(Tuple4<Integer, Integer, Double, Long> element,
+				long currentTimestamp) {
+			return element.f3 - 1;
+		}
+
+		@Override
+		public long getCurrentWatermark() {
+			return Long.MIN_VALUE;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index 023a36a..04352d8 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -21,7 +21,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.examples.wordcount.WordCount;
 
 /**
@@ -70,11 +72,12 @@ public class WindowWordCount {
 		// split up the lines in pairs (2-tuples) containing: (word,1)
 		text.flatMap(new WordCount.Tokenizer())
 				// create windows of windowSize records slided every slideSize records
-				.window(Count.of(windowSize)).every(Count.of(slideSize))
+				.keyBy(0)
+				.window(GlobalWindows.create())
+				.evictor(CountEvictor.of(windowSize))
+				.trigger(CountTrigger.of(slideSize))
 				// group by the tuple field "0" and sum up tuple field "1"
-				.keyBy(0).sum(1)
-				// flatten the windows to a single stream
-				.flatten();
+				.sum(1);
 
 		// emit result
 		if (fileOutput) {