You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/07/21 12:45:13 UTC

[3/8] flink git commit: [FLINK-1967] Introduce (Event)time in Streaming

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
deleted file mode 100644
index ec8cda8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.FullStream;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class WindowIntegrationTest implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	private static final Integer MEMORYSIZE = 32;
-
-	@SuppressWarnings("serial")
-	public static class ModKey implements KeySelector<Integer, Integer> {
-		private int m;
-
-		public ModKey(int m) {
-			this.m = m;
-		}
-
-		@Override
-		public Integer getKey(Integer value) throws Exception {
-			return value % m;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static class IdentityWindowMap implements
-			WindowMapFunction<Integer, StreamWindow<Integer>> {
-
-		@Override
-		public void mapWindow(Iterable<Integer> values, Collector<StreamWindow<Integer>> out)
-				throws Exception {
-
-			StreamWindow<Integer> window = new StreamWindow<Integer>();
-
-			for (Integer value : values) {
-				window.add(value);
-			}
-			out.collect(window);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	@Test
-	public void test() throws Exception {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(2);
-		inputs.add(2);
-		inputs.add(3);
-		inputs.add(4);
-		inputs.add(5);
-		inputs.add(10);
-		inputs.add(11);
-		inputs.add(11);
-
-		KeySelector<Integer, ?> key = new ModKey(2);
-
-		Timestamp<Integer> ts = new Timestamp<Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-		};
-
-		StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
-		env.disableOperatorChaining();
-
-		DataStream<Integer> source = env.fromCollection(inputs);
-
-		source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
-				.addSink(new TestSink1());
-
-		source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
-				.flatten().addSink(new TestSink2());
-
-		source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
-				.addSink(new TestSink4());
-
-		source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
-				.mapWindow(new IdentityWindowMap()).flatten().addSink(new TestSink5());
-
-		source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
-				.addSink(new TestSink3());
-
-		source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
-				.addSink(new TestSink6());
-
-		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
-				.addSink(new TestSink7());
-
-		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
-				.getDiscretizedStream().addSink(new TestSink8());
-
-		try {
-			source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream();
-			fail();
-		} catch (Exception e) {
-		}
-		try {
-			source.window(FullStream.window()).getDiscretizedStream();
-			fail();
-		} catch (Exception e) {
-		}
-		try {
-			source.every(Count.of(5)).mapWindow(new IdentityWindowMap()).getDiscretizedStream();
-			fail();
-		} catch (Exception e) {
-		}
-
-		source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11());
-
-		source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0)
-				.getDiscretizedStream().addSink(new TestSink12());
-
-		DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-				for (int i = 1; i <= 10; i++) {
-					ctx.collect(i);
-				}
-			}
-
-			@Override
-			public void cancel() {
-			}
-		});
-
-		DataStream<Integer> source3 = env.addSource(new RichParallelSourceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			private int i = 1;
-
-			@Override
-			public void open(Configuration parameters) throws Exception {
-				super.open(parameters);
-				i = 1 + getRuntimeContext().getIndexOfThisSubtask();
-			}
-
-			@Override
-			public void cancel() {
-			}
-
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-				for (;i < 11; i += 2) {
-					ctx.collect(i);
-				}
-
-			}
-		});
-
-		source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9());
-
-		source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream()
-				.addSink(new TestSink10());
-
-		source.map(new MapFunction<Integer, Integer>() {
-			@Override
-			public Integer map(Integer value) throws Exception {
-				return value;
-			}
-		}).every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink13());
-
-		env.execute();
-
-		// sum ( Time of 3 slide 2 )
-		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
-		expected1.add(StreamWindow.fromElements(5));
-		expected1.add(StreamWindow.fromElements(11));
-		expected1.add(StreamWindow.fromElements(9));
-		expected1.add(StreamWindow.fromElements(10));
-		expected1.add(StreamWindow.fromElements(32));
-
-		validateOutput(expected1, TestSink1.windows);
-
-		// Tumbling Time of 4 grouped by mod 2
-		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
-		expected2.add(StreamWindow.fromElements(2, 2, 4));
-		expected2.add(StreamWindow.fromElements(1, 3));
-		expected2.add(StreamWindow.fromElements(5));
-		expected2.add(StreamWindow.fromElements(10));
-		expected2.add(StreamWindow.fromElements(11, 11));
-
-		validateOutput(expected2, TestSink2.windows);
-
-		// groupby mod 2 sum ( Tumbling Time of 4)
-		List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
-		expected3.add(StreamWindow.fromElements(4));
-		expected3.add(StreamWindow.fromElements(5));
-		expected3.add(StreamWindow.fromElements(22));
-		expected3.add(StreamWindow.fromElements(8));
-		expected3.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected3, TestSink4.windows);
-
-		// groupby mod3 Tumbling Count of 2 grouped by mod 2
-		List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
-		expected4.add(StreamWindow.fromElements(2, 2));
-		expected4.add(StreamWindow.fromElements(1));
-		expected4.add(StreamWindow.fromElements(4));
-		expected4.add(StreamWindow.fromElements(5, 11));
-		expected4.add(StreamWindow.fromElements(10));
-		expected4.add(StreamWindow.fromElements(11));
-		expected4.add(StreamWindow.fromElements(3));
-
-		validateOutput(expected4, TestSink5.windows);
-
-		// min ( Time of 2 slide 3 )
-		List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
-		expected5.add(StreamWindow.fromElements(1));
-		expected5.add(StreamWindow.fromElements(4));
-		expected5.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected5, TestSink3.windows);
-
-		// groupby mod 2 max ( Tumbling Time of 4)
-		List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
-		expected6.add(StreamWindow.fromElements(3));
-		expected6.add(StreamWindow.fromElements(5));
-		expected6.add(StreamWindow.fromElements(11));
-		expected6.add(StreamWindow.fromElements(4));
-		expected6.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected6, TestSink6.windows);
-
-		List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
-		expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
-		expected7.add(StreamWindow.fromElements(10));
-		expected7.add(StreamWindow.fromElements(10, 11, 11));
-
-		validateOutput(expected7, TestSink7.windows);
-
-		List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
-		expected8.add(StreamWindow.fromElements(4, 8));
-		expected8.add(StreamWindow.fromElements(4, 5));
-		expected8.add(StreamWindow.fromElements(10, 22));
-
-		for (List<Integer> sw : TestSink8.windows) {
-			Collections.sort(sw);
-		}
-
-		validateOutput(expected8, TestSink8.windows);
-
-		List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>();
-		expected9.add(StreamWindow.fromElements(6));
-		expected9.add(StreamWindow.fromElements(14));
-		expected9.add(StreamWindow.fromElements(22));
-		expected9.add(StreamWindow.fromElements(30));
-		expected9.add(StreamWindow.fromElements(38));
-
-		validateOutput(expected9, TestSink9.windows);
-
-		List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>();
-		expected10.add(StreamWindow.fromElements(6, 9));
-		expected10.add(StreamWindow.fromElements(16, 24));
-
-		for (List<Integer> sw : TestSink10.windows) {
-			Collections.sort(sw);
-		}
-
-		validateOutput(expected10, TestSink10.windows);
-
-		List<StreamWindow<Integer>> expected11 = new ArrayList<StreamWindow<Integer>>();
-		expected11.add(StreamWindow.fromElements(8));
-		expected11.add(StreamWindow.fromElements(38));
-		expected11.add(StreamWindow.fromElements(49));
-
-		for (List<Integer> sw : TestSink11.windows) {
-			Collections.sort(sw);
-		}
-
-		validateOutput(expected11, TestSink11.windows);
-
-		List<StreamWindow<Integer>> expected12 = new ArrayList<StreamWindow<Integer>>();
-		expected12.add(StreamWindow.fromElements(4, 4));
-		expected12.add(StreamWindow.fromElements(18, 20));
-		expected12.add(StreamWindow.fromElements(18, 31));
-
-		for (List<Integer> sw : TestSink12.windows) {
-			Collections.sort(sw);
-		}
-
-		validateOutput(expected12, TestSink12.windows);
-
-		List<StreamWindow<Integer>> expected13 = new ArrayList<StreamWindow<Integer>>();
-		expected13.add(StreamWindow.fromElements(17));
-		expected13.add(StreamWindow.fromElements(27));
-		expected13.add(StreamWindow.fromElements(49));
-
-		for (List<Integer> sw : TestSink13.windows) {
-			Collections.sort(sw);
-		}
-
-		validateOutput(expected13, TestSink13.windows);
-
-	}
-
-	public static <R> void validateOutput(List<R> expected, List<R> actual) {
-		assertEquals(new HashSet<R>(expected), new HashSet<R>(actual));
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink1 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink2 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink3 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink4 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink5 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink6 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink7 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink8 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink9 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink10 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink11 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink12 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink13 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
new file mode 100644
index 0000000..5e6ffa2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.helper.FullStream;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class WindowingITCase implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	private static final Integer MEMORYSIZE = 32;
+
+	@SuppressWarnings("serial")
+	public static class ModKey implements KeySelector<Integer, Integer> {
+		private int m;
+
+		public ModKey(int m) {
+			this.m = m;
+		}
+
+		@Override
+		public Integer getKey(Integer value) throws Exception {
+			return value % m;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static class IdentityWindowMap implements
+			WindowMapFunction<Integer, StreamWindow<Integer>> {
+
+		@Override
+		public void mapWindow(Iterable<Integer> values, Collector<StreamWindow<Integer>> out)
+				throws Exception {
+
+			StreamWindow<Integer> window = new StreamWindow<Integer>();
+
+			for (Integer value : values) {
+				window.add(value);
+			}
+			out.collect(window);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	@Test
+	public void test() throws Exception {
+
+		List<Integer> inputs = new ArrayList<Integer>();
+		inputs.add(1);
+		inputs.add(2);
+		inputs.add(2);
+		inputs.add(3);
+		inputs.add(4);
+		inputs.add(5);
+		inputs.add(10);
+		inputs.add(11);
+		inputs.add(11);
+
+		KeySelector<Integer, ?> key = new ModKey(2);
+
+		Timestamp<Integer> ts = new Timestamp<Integer>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public long getTimestamp(Integer value) {
+				return value;
+			}
+		};
+
+		StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
+		env.disableOperatorChaining();
+
+		DataStream<Integer> source = env.fromCollection(inputs);
+
+		source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
+				.addSink(new TestSink1());
+
+		source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
+				.flatten().addSink(new TestSink2());
+
+		source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
+				.addSink(new TestSink4());
+
+		source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
+				.mapWindow(new IdentityWindowMap()).flatten().addSink(new TestSink5());
+
+		source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
+				.addSink(new TestSink3());
+
+		source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
+				.addSink(new TestSink6());
+
+		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
+				.addSink(new TestSink7());
+
+		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
+				.getDiscretizedStream().addSink(new TestSink8());
+
+		try {
+			source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream();
+			fail();
+		} catch (Exception e) {
+		}
+		try {
+			source.window(FullStream.window()).getDiscretizedStream();
+			fail();
+		} catch (Exception e) {
+		}
+		try {
+			source.every(Count.of(5)).mapWindow(new IdentityWindowMap()).getDiscretizedStream();
+			fail();
+		} catch (Exception e) {
+		}
+
+		source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11());
+
+		source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0)
+				.getDiscretizedStream().addSink(new TestSink12());
+
+		DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+				for (int i = 1; i <= 10; i++) {
+					ctx.collect(i);
+				}
+			}
+
+			@Override
+			public void cancel() {
+			}
+		});
+
+		DataStream<Integer> source3 = env.addSource(new RichParallelSourceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			private int i = 1;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+				i = 1 + getRuntimeContext().getIndexOfThisSubtask();
+			}
+
+			@Override
+			public void cancel() {
+			}
+
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+				for (;i < 11; i += 2) {
+					ctx.collect(i);
+				}
+
+			}
+		});
+
+		source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9());
+
+		source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream()
+				.addSink(new TestSink10());
+
+		source.map(new MapFunction<Integer, Integer>() {
+			@Override
+			public Integer map(Integer value) throws Exception {
+				return value;
+			}
+		}).every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink13());
+
+		env.execute();
+
+		// sum ( Time of 3 slide 2 )
+		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
+		expected1.add(StreamWindow.fromElements(5));
+		expected1.add(StreamWindow.fromElements(11));
+		expected1.add(StreamWindow.fromElements(9));
+		expected1.add(StreamWindow.fromElements(10));
+		expected1.add(StreamWindow.fromElements(32));
+
+		validateOutput(expected1, TestSink1.windows);
+
+		// Tumbling Time of 4 grouped by mod 2
+		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
+		expected2.add(StreamWindow.fromElements(2, 2, 4));
+		expected2.add(StreamWindow.fromElements(1, 3));
+		expected2.add(StreamWindow.fromElements(5));
+		expected2.add(StreamWindow.fromElements(10));
+		expected2.add(StreamWindow.fromElements(11, 11));
+
+		validateOutput(expected2, TestSink2.windows);
+
+		// groupby mod 2 sum ( Tumbling Time of 4)
+		List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
+		expected3.add(StreamWindow.fromElements(4));
+		expected3.add(StreamWindow.fromElements(5));
+		expected3.add(StreamWindow.fromElements(22));
+		expected3.add(StreamWindow.fromElements(8));
+		expected3.add(StreamWindow.fromElements(10));
+
+		validateOutput(expected3, TestSink4.windows);
+
+		// groupby mod3 Tumbling Count of 2 grouped by mod 2
+		List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
+		expected4.add(StreamWindow.fromElements(2, 2));
+		expected4.add(StreamWindow.fromElements(1));
+		expected4.add(StreamWindow.fromElements(4));
+		expected4.add(StreamWindow.fromElements(5, 11));
+		expected4.add(StreamWindow.fromElements(10));
+		expected4.add(StreamWindow.fromElements(11));
+		expected4.add(StreamWindow.fromElements(3));
+
+		validateOutput(expected4, TestSink5.windows);
+
+		// min ( Time of 2 slide 3 )
+		List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
+		expected5.add(StreamWindow.fromElements(1));
+		expected5.add(StreamWindow.fromElements(4));
+		expected5.add(StreamWindow.fromElements(10));
+
+		validateOutput(expected5, TestSink3.windows);
+
+		// groupby mod 2 max ( Tumbling Time of 4)
+		List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
+		expected6.add(StreamWindow.fromElements(3));
+		expected6.add(StreamWindow.fromElements(5));
+		expected6.add(StreamWindow.fromElements(11));
+		expected6.add(StreamWindow.fromElements(4));
+		expected6.add(StreamWindow.fromElements(10));
+
+		validateOutput(expected6, TestSink6.windows);
+
+		List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
+		expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
+		expected7.add(StreamWindow.fromElements(10));
+		expected7.add(StreamWindow.fromElements(10, 11, 11));
+
+		validateOutput(expected7, TestSink7.windows);
+
+		List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
+		expected8.add(StreamWindow.fromElements(4, 8));
+		expected8.add(StreamWindow.fromElements(4, 5));
+		expected8.add(StreamWindow.fromElements(10, 22));
+
+		for (List<Integer> sw : TestSink8.windows) {
+			Collections.sort(sw);
+		}
+
+		validateOutput(expected8, TestSink8.windows);
+
+		List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>();
+		expected9.add(StreamWindow.fromElements(6));
+		expected9.add(StreamWindow.fromElements(14));
+		expected9.add(StreamWindow.fromElements(22));
+		expected9.add(StreamWindow.fromElements(30));
+		expected9.add(StreamWindow.fromElements(38));
+
+		validateOutput(expected9, TestSink9.windows);
+
+		List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>();
+		expected10.add(StreamWindow.fromElements(6, 9));
+		expected10.add(StreamWindow.fromElements(16, 24));
+
+		for (List<Integer> sw : TestSink10.windows) {
+			Collections.sort(sw);
+		}
+
+		validateOutput(expected10, TestSink10.windows);
+
+		List<StreamWindow<Integer>> expected11 = new ArrayList<StreamWindow<Integer>>();
+		expected11.add(StreamWindow.fromElements(8));
+		expected11.add(StreamWindow.fromElements(38));
+		expected11.add(StreamWindow.fromElements(49));
+
+		for (List<Integer> sw : TestSink11.windows) {
+			Collections.sort(sw);
+		}
+
+		validateOutput(expected11, TestSink11.windows);
+
+		List<StreamWindow<Integer>> expected12 = new ArrayList<StreamWindow<Integer>>();
+		expected12.add(StreamWindow.fromElements(4, 4));
+		expected12.add(StreamWindow.fromElements(18, 20));
+		expected12.add(StreamWindow.fromElements(18, 31));
+
+		for (List<Integer> sw : TestSink12.windows) {
+			Collections.sort(sw);
+		}
+
+		validateOutput(expected12, TestSink12.windows);
+
+		List<StreamWindow<Integer>> expected13 = new ArrayList<StreamWindow<Integer>>();
+		expected13.add(StreamWindow.fromElements(17));
+		expected13.add(StreamWindow.fromElements(27));
+		expected13.add(StreamWindow.fromElements(49));
+
+		for (List<Integer> sw : TestSink13.windows) {
+			Collections.sort(sw);
+		}
+
+		validateOutput(expected13, TestSink13.windows);
+
+	}
+
+	public static <R> void validateOutput(List<R> expected, List<R> actual) {
+		assertEquals(new HashSet<R>(expected), new HashSet<R>(actual));
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink1 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink2 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink3 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink4 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink5 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink6 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink7 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink8 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink9 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink10 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink11 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink12 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class TestSink13 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index eb49e26..6e22021 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -48,6 +48,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.InstantiationUtil;
@@ -103,12 +105,13 @@ public class StatefulOperatorTest {
 	@Test
 	public void apiTest() throws Exception {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(3, 32);
-		
+
 		KeyedDataStream<Integer> keyedStream = env.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).keyBy(new ModKey(4));
 		
 		keyedStream.map(new StatefulMapper()).addSink(new SinkFunction<String>() {
 			private static final long serialVersionUID = 1L;
-			public void invoke(String value) throws Exception {}
+			public void invoke(String value) throws Exception {
+			}
 		});
 		
 		keyedStream.map(new StatefulMapper2()).setParallelism(1).addSink(new SinkFunction<String>() {
@@ -128,8 +131,8 @@ public class StatefulOperatorTest {
 
 	private void processInputs(StreamMap<Integer, ?> map, List<Integer> input) throws Exception {
 		for (Integer i : input) {
-			map.getRuntimeContext().setNextInput(i);
-			map.processElement(i);
+			map.getRuntimeContext().setNextInput(new StreamRecord<Integer>(i, 0L));
+			map.processElement(new StreamRecord<Integer>(i, 0L));
 		}
 	}
 
@@ -144,11 +147,16 @@ public class StatefulOperatorTest {
 
 		StreamMap<Integer, String> op = new StreamMap<Integer, String>(new StatefulMapper());
 
-		op.setup(new Output<String>() {
+		op.setup(new Output<StreamRecord<String>>() {
 
 			@Override
-			public void collect(String record) {
-				outputList.add(record);
+			public void collect(StreamRecord<String> record) {
+				outputList.add(record.getValue());
+			}
+
+			@Override
+			public void emitWatermark(Watermark mark) {
+
 			}
 
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
index 4ac7fda..317a21c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
@@ -40,6 +40,6 @@ public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamR
 	
 	@Override
 	public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> record) {
-		emittedRecords.add(record.getInstance().getObject().f0);
+		emittedRecords.add(record.getInstance().getValue().f0);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
index 967c719..6bc0e30 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
@@ -22,10 +22,10 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import org.apache.flink.util.Collector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Test;
 
 public class BasicWindowBufferTest {
@@ -33,7 +33,7 @@ public class BasicWindowBufferTest {
 	@Test
 	public void testEmitWindow() throws Exception {
 
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 		List<StreamWindow<Integer>> collected = collector.getCollected();
 
 		WindowBuffer<Integer> wb = new BasicWindowBuffer<Integer>();
@@ -60,13 +60,13 @@ public class BasicWindowBufferTest {
 		assertEquals(2, collected.size());
 	}
 
-	public static class TestCollector<T> implements Collector<T> {
+	public static class TestOutput<T> implements Output<StreamRecord<T>> {
 
 		private final List<T> collected = new ArrayList<T>();
 
 		@Override
-		public void collect(T record) {
-			collected.add(record);
+		public void collect(StreamRecord<T> record) {
+			collected.add(record.getValue());
 		}
 
 		@Override
@@ -77,6 +77,10 @@ public class BasicWindowBufferTest {
 			return collected;
 		}
 
+		@Override
+		public void emitWatermark(Watermark mark) {
+
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
index c91910b..8430499 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
@@ -32,8 +32,9 @@ import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
+
 import org.junit.Test;
 
 public class JumpingCountGroupedPreReducerTest {
@@ -58,7 +59,7 @@ public class JumpingCountGroupedPreReducerTest {
 		inputs.add(new Tuple2<Integer, Integer>(1, -2));
 		inputs.add(new Tuple2<Integer, Integer>(100, -200));
 
-		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
 		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
 
 		WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(
@@ -109,7 +110,7 @@ public class JumpingCountGroupedPreReducerTest {
 		inputs.add(new Tuple2<Integer, Integer>(1, -2));
 		inputs.add(new Tuple2<Integer, Integer>(100, -200));
 
-		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
 		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
 
 		WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
index ba890ab..2279264 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class JumpingCountPreReducerTest {
@@ -48,7 +48,7 @@ public class JumpingCountPreReducerTest {
 		inputs.add(new Tuple2<Integer, Integer>(4, -2));
 		inputs.add(new Tuple2<Integer, Integer>(5, -3));
 
-		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
 		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
 
 		WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountPreReducer<Tuple2<Integer, Integer>>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
index 5b693e7..ce312d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class JumpingTimePreReducerTest {
@@ -39,7 +39,7 @@ public class JumpingTimePreReducerTest {
 	@Test
 	public void testEmitWindow() throws Exception {
 
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 		List<StreamWindow<Integer>> collected = collector.getCollected();
 
 		WindowBuffer<Integer> wb = new JumpingTimePreReducer<Integer>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
index 377bdb5..7f58527 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
@@ -1,34 +1,35 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 
 package org.apache.flink.streaming.api.windowing.windowbuffer;
 
-import static org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducerTest.checkResults;
+import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.operators.windowing.WindowingITCase;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class SlidingCountGroupedPreReducerTest {
@@ -37,11 +38,11 @@ public class SlidingCountGroupedPreReducerTest {
 
 	ReduceFunction<Integer> reducer = new SumReducer();
 
-	KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+	KeySelector<Integer, ?> key = new WindowingITCase.ModKey(2);
 
 	@Test
 	public void testPreReduce1() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
 				reducer, serializer, key, 3, 2, 0);
@@ -84,7 +85,7 @@ public class SlidingCountGroupedPreReducerTest {
 
 	@Test
 	public void testPreReduce2() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
 				reducer, serializer, key, 5, 2, 0);
@@ -126,7 +127,7 @@ public class SlidingCountGroupedPreReducerTest {
 
 	@Test
 	public void testPreReduce3() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
 				reducer, serializer, key, 6, 3, 0);
@@ -163,7 +164,7 @@ public class SlidingCountGroupedPreReducerTest {
 
 	@Test
 	public void testPreReduce4() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
 				reducer, serializer, key, 5, 1, 2);
@@ -217,4 +218,18 @@ public class SlidingCountGroupedPreReducerTest {
 
 	}
 
+
+	protected static void checkResults(List<StreamWindow<Integer>> expected,
+			List<StreamWindow<Integer>> actual) {
+
+		for (StreamWindow<Integer> sw : expected) {
+			Collections.sort(sw);
+		}
+
+		for (StreamWindow<Integer> sw : actual) {
+			Collections.sort(sw);
+		}
+
+		assertEquals(expected, actual);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
index 3ce65f1..156b875 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class SlidingCountPreReducerTest {
@@ -37,7 +37,7 @@ public class SlidingCountPreReducerTest {
 
 	@Test
 	public void testPreReduce1() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
 				serializer, 3, 2, 0);
@@ -80,7 +80,7 @@ public class SlidingCountPreReducerTest {
 
 	@Test
 	public void testPreReduce2() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
 				serializer, 5, 2, 0);
@@ -122,7 +122,7 @@ public class SlidingCountPreReducerTest {
 
 	@Test
 	public void testPreReduce3() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
 				serializer, 6, 3, 0);
@@ -159,7 +159,7 @@ public class SlidingCountPreReducerTest {
 
 	@Test
 	public void testPreReduce4() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
 				serializer, 5, 1, 2);

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
index 3f1cba1..68bceda 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
@@ -31,11 +31,11 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.operators.windowing.WindowingITCase;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class SlidingTimeGroupedPreReducerTest {
@@ -48,7 +48,7 @@ public class SlidingTimeGroupedPreReducerTest {
 	ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer();
 
 
-	KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+	KeySelector<Integer, ?> key = new WindowingITCase.ModKey(2);
 	KeySelector<Tuple2<Integer, Integer>, ?> tupleKey = new TupleModKey(2);
 
 	@Test
@@ -58,7 +58,7 @@ public class SlidingTimeGroupedPreReducerTest {
 		// replaying the same sequence of elements with a later timestamp and expecting the same
 		// result.
 
-		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
 
 		SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>>(tupleReducer,
 				tupleType.createSerializer(new ExecutionConfig()), tupleKey, 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
@@ -190,7 +190,7 @@ public class SlidingTimeGroupedPreReducerTest {
 
 	@Test
 	public void testPreReduce2() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
 				reducer, serializer, key, 5, 2, new TimestampWrapper<Integer>(
@@ -241,7 +241,7 @@ public class SlidingTimeGroupedPreReducerTest {
 
 	@Test
 	public void testPreReduce3() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
 				reducer, serializer, key, 6, 3, new TimestampWrapper<Integer>(
@@ -287,7 +287,7 @@ public class SlidingTimeGroupedPreReducerTest {
 
 	@Test
 	public void testPreReduce4() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
 				reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
index 0519da7..6a36c57 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class SlidingTimePreReducerTest {
@@ -50,7 +50,7 @@ public class SlidingTimePreReducerTest {
 		// replaying the same sequence of elements with a later timestamp and expecting the same
 		// result.
 
-		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
 
 		SlidingTimePreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimePreReducer<Tuple2<Integer, Integer>>(tupleReducer,
 				tupleType.createSerializer(new ExecutionConfig()), 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
@@ -145,7 +145,7 @@ public class SlidingTimePreReducerTest {
 
 	@Test
 	public void testPreReduce2() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
 				serializer, 5, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
@@ -195,7 +195,7 @@ public class SlidingTimePreReducerTest {
 
 	@Test
 	public void testPreReduce3() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
 				serializer, 6, 3, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
@@ -240,7 +240,7 @@ public class SlidingTimePreReducerTest {
 
 	@Test
 	public void testPreReduce4() throws Exception {
-		TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
 
 		SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
 				serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
index c5107bf..3aee288 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 import org.junit.Test;
 
@@ -57,7 +57,7 @@ public class TumblingGroupedPreReducerTest {
 		inputs.add(new Tuple2<Integer, Integer>(1, -1));
 		inputs.add(new Tuple2<Integer, Integer>(1, -2));
 
-		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
 		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
 
 		WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingGroupedPreReducer<Tuple2<Integer, Integer>>(
@@ -104,7 +104,7 @@ public class TumblingGroupedPreReducerTest {
 		inputs.add(new Tuple2<Integer, Integer>(1, -1));
 		inputs.add(new Tuple2<Integer, Integer>(1, -2));
 
-		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
 		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
 
 		WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingGroupedPreReducer<Tuple2<Integer, Integer>>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
index b8de02e..3e537a5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
@@ -27,9 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class TumblingPreReducerTest {
@@ -49,7 +47,7 @@ public class TumblingPreReducerTest {
 		inputs.add(new Tuple2<Integer, Integer>(3, -1));
 		inputs.add(new Tuple2<Integer, Integer>(4, -2));
 
-		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
 		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
 
 		WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingPreReducer<Tuple2<Integer, Integer>>(

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
index 3f8401d..d8a3696 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.io.BarrierBuffer;
 import org.junit.Test;
 
 public class BarrierBufferIOTest {
@@ -55,7 +54,7 @@ public class BarrierBufferIOTest {
 				if (boe.isBuffer()) {
 					boe.getBuffer().recycle();
 				} else {
-					barrierBuffer.processSuperstep(boe);
+					barrierBuffer.processBarrier(boe);
 				}
 			}
 			// System.out.println("Ran for " + (System.currentTimeMillis() -
@@ -101,14 +100,14 @@ public class BarrierBufferIOTest {
 
 		private int numChannels;
 		private BufferPool[] bufferPools;
-		private int[] currentSupersteps;
+		private int[] currentBarriers;
 		BarrierGenerator[] barrierGens;
 		int currentChannel = 0;
 		long c = 0;
 
 		public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
 			this.numChannels = bufferPools.length;
-			this.currentSupersteps = new int[numChannels];
+			this.currentBarriers = new int[numChannels];
 			this.bufferPools = bufferPools;
 			this.barrierGens = barrierGens;
 		}
@@ -132,7 +131,7 @@ public class BarrierBufferIOTest {
 			currentChannel = (currentChannel + 1) % numChannels;
 
 			if (barrierGens[currentChannel].isNextBarrier()) {
-				return BarrierBufferTest.createSuperstep(++currentSupersteps[currentChannel],
+				return BarrierBufferTest.createBarrier(++currentBarriers[currentChannel],
 						currentChannel);
 			} else {
 				Buffer buffer = bufferPools[currentChannel].requestBuffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 89ec7dc..cb5e046 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
 
 import org.junit.Test;
 
@@ -67,10 +67,10 @@ public class BarrierBufferTest {
 		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
 		input.add(createBuffer(0));
 		input.add(createBuffer(0));
-		input.add(createSuperstep(1, 0));
+		input.add(createBarrier(1, 0));
 		input.add(createBuffer(0));
 		input.add(createBuffer(0));
-		input.add(createSuperstep(2, 0));
+		input.add(createBarrier(2, 0));
 		input.add(createBuffer(0));
 
 		InputGate mockIG = new MockInputGate(1, input);
@@ -82,11 +82,11 @@ public class BarrierBufferTest {
 		assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
 		assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
 		assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
+		bb.processBarrier(nextBoe);
 		assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
 		assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
 		assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
+		bb.processBarrier(nextBoe);
 		assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
 
 		bb.cleanup();
@@ -98,18 +98,18 @@ public class BarrierBufferTest {
 		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
 		input.add(createBuffer(0));
 		input.add(createBuffer(1));
-		input.add(createSuperstep(1, 0));
-		input.add(createSuperstep(2, 0));
+		input.add(createBarrier(1, 0));
+		input.add(createBarrier(2, 0));
 		input.add(createBuffer(0));
-		input.add(createSuperstep(3, 0));
+		input.add(createBarrier(3, 0));
 		input.add(createBuffer(0));
 		input.add(createBuffer(1));
-		input.add(createSuperstep(1, 1));
+		input.add(createBarrier(1, 1));
 		input.add(createBuffer(0));
 		input.add(createBuffer(1));
-		input.add(createSuperstep(2, 1));
-		input.add(createSuperstep(3, 1));
-		input.add(createSuperstep(4, 0));
+		input.add(createBarrier(2, 1));
+		input.add(createBarrier(3, 1));
+		input.add(createBarrier(4, 0));
 		input.add(createBuffer(0));
 		input.add(new BufferOrEvent(new EndOfPartitionEvent(), 1));
 		
@@ -123,24 +123,24 @@ public class BarrierBufferTest {
 		check(input.get(0), nextBoe = bb.getNextNonBlocked());
 		check(input.get(1), nextBoe = bb.getNextNonBlocked());
 		check(input.get(2), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
+		bb.processBarrier(nextBoe);
 		check(input.get(7), nextBoe = bb.getNextNonBlocked());
 		check(input.get(8), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
+		bb.processBarrier(nextBoe);
 		check(input.get(3), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
+		bb.processBarrier(nextBoe);
 		check(input.get(10), nextBoe = bb.getNextNonBlocked());
 		check(input.get(11), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
+		bb.processBarrier(nextBoe);
 		check(input.get(4), nextBoe = bb.getNextNonBlocked());
 		check(input.get(5), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
+		bb.processBarrier(nextBoe);
 		check(input.get(12), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
+		bb.processBarrier(nextBoe);
 		check(input.get(6), nextBoe = bb.getNextNonBlocked());
 		check(input.get(9), nextBoe = bb.getNextNonBlocked());
 		check(input.get(13), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
+		bb.processBarrier(nextBoe);
 		check(input.get(14), nextBoe = bb.getNextNonBlocked());
 		check(input.get(15), nextBoe = bb.getNextNonBlocked());
 
@@ -206,8 +206,8 @@ public class BarrierBufferTest {
 		}
 	}
 
-	protected static BufferOrEvent createSuperstep(long id, int channel) {
-		return new BufferOrEvent(new StreamingSuperstep(id, System.currentTimeMillis()), channel);
+	protected static BufferOrEvent createBarrier(long id, int channel) {
+		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
 	}
 
 	protected static BufferOrEvent createBuffer(int channel) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
deleted file mode 100644
index 528829d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.runtime.io.BarrierBuffer;
-import org.apache.flink.streaming.runtime.io.CoRecordReader;
-import org.apache.flink.streaming.runtime.io.BarrierBufferTest.MockInputGate;
-import org.junit.Test;
-
-public class CoRecordReaderTest {
-
-	@Test
-	public void test() throws InterruptedException, IOException {
-
-		List<BufferOrEvent> input1 = new LinkedList<BufferOrEvent>();
-		input1.add(BarrierBufferTest.createBuffer(0));
-		input1.add(BarrierBufferTest.createSuperstep(1, 0));
-		input1.add(BarrierBufferTest.createBuffer(0));
-
-		InputGate ig1 = new MockInputGate(1, input1);
-
-		List<BufferOrEvent> input2 = new LinkedList<BufferOrEvent>();
-		input2.add(BarrierBufferTest.createBuffer(0));
-		input2.add(BarrierBufferTest.createBuffer(0));
-		input2.add(BarrierBufferTest.createSuperstep(1, 0));
-		input2.add(BarrierBufferTest.createBuffer(0));
-
-		InputGate ig2 = new MockInputGate(1, input2);
-
-		CoRecordReader<?, ?> coReader = new CoRecordReader<IOReadableWritable, IOReadableWritable>(
-				ig1, ig2);
-		BarrierBuffer b1 = coReader.barrierBuffer1;
-		BarrierBuffer b2 = coReader.barrierBuffer2;
-
-		coReader.addToAvailable(ig1);
-		coReader.addToAvailable(ig2);
-		coReader.addToAvailable(ig2);
-		coReader.addToAvailable(ig1);
-
-		assertEquals(1, coReader.getNextReaderIndexBlocking());
-		b1.getNextNonBlocked();
-
-		assertEquals(2, coReader.getNextReaderIndexBlocking());
-		b2.getNextNonBlocked();
-
-		assertEquals(2, coReader.getNextReaderIndexBlocking());
-		b2.getNextNonBlocked();
-
-		assertEquals(1, coReader.getNextReaderIndexBlocking());
-		b1.getNextNonBlocked();
-		b1.processSuperstep(input1.get(1));
-
-		coReader.addToAvailable(ig1);
-		coReader.addToAvailable(ig2);
-		coReader.addToAvailable(ig2);
-
-		assertEquals(2, coReader.getNextReaderIndexBlocking());
-		b2.getNextNonBlocked();
-		b2.processSuperstep(input2.get(2));
-
-		assertEquals(1, coReader.getNextReaderIndexBlocking());
-		b1.getNextNonBlocked();
-
-		assertEquals(2, coReader.getNextReaderIndexBlocking());
-		b2.getNextNonBlocked();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
index aa4d24a..a1cea13 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertArrayEquals;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
@@ -32,7 +31,7 @@ public class BroadcastPartitionerTest {
 	private BroadcastPartitioner<Tuple> broadcastPartitioner2;
 	private BroadcastPartitioner<Tuple> broadcastPartitioner3;
 	
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
 	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(null);
 
 	@Before

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
index b37e43a..2643bba 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 public class DistributePartitionerTest {
 	
 	private RebalancePartitioner<Tuple> distributePartitioner;
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
 	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
 			null);
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
index 94d29ac..05541f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
@@ -21,34 +21,28 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
 
 public class FieldsPartitionerTest {
 
-	private FieldsPartitioner<Tuple> fieldsPartitioner;
-	private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>()
-			.setObject(new Tuple2<String, Integer>("test", 0));
-	private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>()
-			.setObject(new Tuple2<String, Integer>("test", 42));
-	private SerializationDelegate<StreamRecord<Tuple>> sd1 = new SerializationDelegate<StreamRecord<Tuple>>(
-			null);
-	private SerializationDelegate<StreamRecord<Tuple>> sd2 = new SerializationDelegate<StreamRecord<Tuple>>(
-			null);
+	private FieldsPartitioner<Tuple2<String, Integer>> fieldsPartitioner;
+	private StreamRecord<Tuple2<String, Integer>> streamRecord1 = new StreamRecord<Tuple2<String, Integer>>(new Tuple2<String, Integer>("test", 0));
+	private StreamRecord<Tuple2<String, Integer>> streamRecord2 = new StreamRecord<Tuple2<String, Integer>>(new Tuple2<String, Integer>("test", 42));
+	private SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> sd1 = new SerializationDelegate<StreamRecord<Tuple2<String, Integer>>>(null);
+	private SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> sd2 = new SerializationDelegate<StreamRecord<Tuple2<String, Integer>>>(null);
 
 	@Before
 	public void setPartitioner() {
-		fieldsPartitioner = new FieldsPartitioner<Tuple>(new KeySelector<Tuple, String>() {
+		fieldsPartitioner = new FieldsPartitioner<Tuple2<String, Integer>>(new KeySelector<Tuple2<String, Integer>, String>() {
 
 			private static final long serialVersionUID = 1L;
 
 			@Override
-			public String getKey(Tuple value) throws Exception {
+			public String getKey(Tuple2<String, Integer> value) throws Exception {
 				return value.getField(0);
 			}
 		});