You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:40 UTC

[04/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and API

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
new file mode 100644
index 0000000..c05f281
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class FieldFromTupleTest {
+
+	private String[] testStrings;
+
+	@Before
+	public void init() {
+		testStrings = new String[Tuple.MAX_ARITY];
+		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+			testStrings[i] = Integer.toString(i);
+		}
+	}
+
+	@Test
+	public void testSingleFieldExtraction() throws InstantiationException, IllegalAccessException {
+		// extract single fields
+		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+			Tuple current = (Tuple) CLASSES[i].newInstance();
+			for (int j = 0; j < i; j++) {
+				current.setField(testStrings[j], j);
+			}
+			for (int j = 0; j < i; j++) {
+				assertEquals(testStrings[j], new FieldFromTuple<String>(j).extract(current));
+			}
+		}
+	}
+
+	private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
+			Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
+			Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
+			Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
+			Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
+			Tuple24.class, Tuple25.class };
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
new file mode 100644
index 0000000..7a9a716
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.streaming.api.functions.windowing.delta.extractor.FieldsFromArray;
+import org.junit.Test;
+
+public class FieldsFromArrayTest {
+
+	String[] testStringArray = { "0", "1", "2", "3", "4" };
+	Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
+	int[] testIntArray = { 20, 21, 22, 23, 24 };
+
+	@Test
+	public void testStringArray() {
+		// check single field extraction
+		for (int i = 0; i < testStringArray.length; i++) {
+			String[] tmp = { testStringArray[i] };
+			arrayEqualityCheck(tmp,
+					new FieldsFromArray<String>(String.class, i).extract(testStringArray));
+		}
+
+		// check reverse order
+		String[] reverseOrder = new String[testStringArray.length];
+		for (int i = 0; i < testStringArray.length; i++) {
+			reverseOrder[i] = testStringArray[testStringArray.length - i - 1];
+		}
+		arrayEqualityCheck(reverseOrder,
+				new FieldsFromArray<String>(String.class, 4, 3, 2, 1, 0).extract(testStringArray));
+
+		// check picking fields and reorder
+		String[] crazyOrder = { testStringArray[4], testStringArray[1], testStringArray[2] };
+		arrayEqualityCheck(crazyOrder,
+				new FieldsFromArray<String>(String.class, 4, 1, 2).extract(testStringArray));
+	}
+
+	@Test
+	public void testIntegerArray() {
+		// check single field extraction
+		for (int i = 0; i < testIntegerArray.length; i++) {
+			Integer[] tmp = { testIntegerArray[i] };
+			arrayEqualityCheck(tmp,
+					new FieldsFromArray<Integer>(Integer.class, i).extract(testIntegerArray));
+		}
+
+		// check reverse order
+		Integer[] reverseOrder = new Integer[testIntegerArray.length];
+		for (int i = 0; i < testIntegerArray.length; i++) {
+			reverseOrder[i] = testIntegerArray[testIntegerArray.length - i - 1];
+		}
+		arrayEqualityCheck(reverseOrder,
+				new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0)
+						.extract(testIntegerArray));
+
+		// check picking fields and reorder
+		Integer[] crazyOrder = { testIntegerArray[4], testIntegerArray[1], testIntegerArray[2] };
+		arrayEqualityCheck(crazyOrder,
+				new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntegerArray));
+
+	}
+
+	@Test
+	public void testIntArray() {
+		for (int i = 0; i < testIntArray.length; i++) {
+			Integer[] tmp = { testIntArray[i] };
+			arrayEqualityCheck(tmp,
+					new FieldsFromArray<Integer>(Integer.class, i).extract(testIntArray));
+		}
+
+		// check reverse order
+		Integer[] reverseOrder = new Integer[testIntArray.length];
+		for (int i = 0; i < testIntArray.length; i++) {
+			reverseOrder[i] = testIntArray[testIntArray.length - i - 1];
+		}
+		arrayEqualityCheck(reverseOrder,
+				new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0).extract(testIntArray));
+
+		// check picking fields and reorder
+		Integer[] crazyOrder = { testIntArray[4], testIntArray[1], testIntArray[2] };
+		arrayEqualityCheck(crazyOrder,
+				new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntArray));
+
+	}
+
+	private void arrayEqualityCheck(Object[] array1, Object[] array2) {
+		assertEquals("The result arrays must have the same length", array1.length, array2.length);
+		for (int i = 0; i < array1.length; i++) {
+			assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
new file mode 100644
index 0000000..025ed8a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.streaming.api.functions.windowing.delta.extractor.FieldsFromTuple;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FieldsFromTupleTest {
+
+	private double[] testDouble;
+
+	@Before
+	public void init() {
+		testDouble = new double[Tuple.MAX_ARITY];
+		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+			testDouble[i] = i;
+		}
+	}
+
+	@Test
+	public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException {
+		Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance();
+		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+			currentTuple.setField(testDouble[i], i);
+		}
+
+		double[] expected = { testDouble[5], testDouble[3], testDouble[6], testDouble[7],
+				testDouble[0] };
+		arrayEqualityCheck(expected, new FieldsFromTuple(5, 3, 6, 7, 0).extract(currentTuple));
+
+		double[] expected2 = { testDouble[0], testDouble[Tuple.MAX_ARITY - 1] };
+		arrayEqualityCheck(expected2,
+				new FieldsFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple));
+
+		double[] expected3 = { testDouble[Tuple.MAX_ARITY - 1], testDouble[0] };
+		arrayEqualityCheck(expected3,
+				new FieldsFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple));
+
+		double[] expected4 = { testDouble[13], testDouble[4], testDouble[5], testDouble[4],
+				testDouble[2], testDouble[8], testDouble[6], testDouble[2], testDouble[8],
+				testDouble[3], testDouble[5], testDouble[2], testDouble[16], testDouble[4],
+				testDouble[3], testDouble[2], testDouble[6], testDouble[4], testDouble[7],
+				testDouble[4], testDouble[2], testDouble[8], testDouble[7], testDouble[2] };
+		arrayEqualityCheck(expected4, new FieldsFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16,
+				4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple));
+	}
+
+	private void arrayEqualityCheck(double[] array1, double[] array2) {
+		assertEquals("The result arrays must have the same length", array1.length, array2.length);
+		for (int i = 0; i < array1.length; i++) {
+			assertEquals("Unequal fields at position " + i, array1[i], array2[i], 0d);
+		}
+	}
+
+	private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
+			Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
+			Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
+			Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
+			Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
+			Tuple24.class, Tuple25.class };
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
index c116c01..d00dc67 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
@@ -18,9 +18,7 @@
 package org.apache.flink.streaming.api.operators.co;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -29,17 +27,12 @@ import java.util.List;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
@@ -184,15 +177,4 @@ public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
 
 		assertEquals(expected, result);
 	}
-
-	private static class IntegerTimestamp implements Timestamp<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long getTimestamp(Integer value) {
-			return value;
-		}
-
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizerTest.java
deleted file mode 100644
index c464b47..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizerTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.windowing.GroupedStreamDiscretizer;
-import org.apache.flink.streaming.api.operators.windowing.GroupedWindowBuffer;
-import org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class GroupedStreamDiscretizerTest {
-
-	KeySelector<Tuple2<Integer, String>, ?> keySelector = new KeySelector<Tuple2<Integer, String>, String>() {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple2<Integer, String> value) throws Exception {
-			return value.f1;
-		}
-	};
-
-	/**
-	 * Test for not active distributed triggers with single field
-	 */
-	@Test
-	public void groupedDiscretizerTest() {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(2);
-		inputs.add(2);
-		inputs.add(3);
-		inputs.add(4);
-		inputs.add(5);
-		inputs.add(10);
-		inputs.add(11);
-		inputs.add(11);
-
-		Set<StreamWindow<Integer>> expected = new HashSet<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(2, 2));
-		expected.add(StreamWindow.fromElements(1, 3));
-		expected.add(StreamWindow.fromElements(5, 11));
-		expected.add(StreamWindow.fromElements(4, 10));
-		expected.add(StreamWindow.fromElements(11));
-
-		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) {
-				return value % 2;
-			}
-		};
-
-		CloneableTriggerPolicy<Integer> trigger = new CountTriggerPolicy<Integer>(2);
-		CloneableEvictionPolicy<Integer> eviction = new TumblingEvictionPolicy<Integer>();
-
-		GroupedStreamDiscretizer<Integer> discretizer = new GroupedStreamDiscretizer<Integer>(
-				keySelector, trigger, eviction);
-
-		StreamWindowBuffer<Integer> buffer = new GroupedWindowBuffer<Integer>(
-				new BasicWindowBuffer<Integer>(), keySelector);
-
-		List<WindowEvent<Integer>> bufferEvents = MockContext.createAndExecute(discretizer,
-				inputs);
-		List<StreamWindow<Integer>> result = MockContext.createAndExecute(buffer, bufferEvents);
-
-		assertEquals(expected, new HashSet<StreamWindow<Integer>>(result));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
deleted file mode 100644
index 2c06c00..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeITCase.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Tests that {@link ParallelMerge} does not swallow records of the
- * last window.
- */
-public class ParallelMergeITCase extends StreamingProgramTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-	protected final String input = "To be, or not to be,--that is the question:--" +
-									"Whether 'tis nobler in the mind to suffer";
-
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", input);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		List<String> resultLines = new ArrayList<>();
-		readAllResultLines(resultLines, resultPath);
-
-		// check that result lines are not swallowed, as every element is expected to be in the
-		// last time window we either get the right output or no output at all
-		if (resultLines.isEmpty()){
-			Assert.fail();
-		}
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> text = env.fromElements(input);
-
-		DataStream<Tuple2<String, Integer>> counts =
-				text.flatMap(new Tokenizer())
-						.window(Time.of(1000, TimeUnit.MILLISECONDS))
-						.keyBy(0)
-						.sum(1)
-						.flatten();
-
-		counts.writeAsText(resultPath);
-
-		try {
-			env.execute();
-		} catch (RuntimeException e){
-			// might happen at closing the active window
-			// do nothing
-		}
-	}
-
-	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
-				throws Exception {
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(Tuple2.of(token, 1));
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java
deleted file mode 100644
index f111890..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class ParallelMergeTest {
-
-	@Test
-	public void nonGroupedTest() throws Exception {
-
-		ReduceFunction<Integer> reducer = new ReduceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer reduce(Integer a, Integer b) throws Exception {
-				return a + b;
-			}
-		};
-
-		TestOutput<StreamWindow<Integer>> output = new TestOutput<StreamWindow<Integer>>();
-		TimestampedCollector<StreamWindow<Integer>> collector = new TimestampedCollector<StreamWindow<Integer>>(output);
-		List<StreamWindow<Integer>> result = output.getCollected();
-
-		ParallelMerge<Integer> merger = new ParallelMerge<Integer>(reducer);
-		merger.numberOfDiscretizers = 2;
-
-		merger.flatMap1(createTestWindow(1), collector);
-		merger.flatMap1(createTestWindow(1), collector);
-		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
-		assertTrue(result.isEmpty());
-		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
-		assertEquals(StreamWindow.fromElements(2), result.get(0));
-
-		merger.flatMap2(new Tuple2<Integer, Integer>(2, 2), collector);
-		merger.flatMap1(createTestWindow(2), collector);
-		merger.flatMap1(createTestWindow(2), collector);
-		merger.flatMap2(new Tuple2<Integer, Integer>(2, 1), collector);
-		assertEquals(1, result.size());
-		merger.flatMap1(createTestWindow(2), collector);
-		assertEquals(StreamWindow.fromElements(3), result.get(1));
-
-		// check error handling
-		merger.flatMap1(createTestWindow(3), collector);
-		merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), collector);
-		merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), collector);
-
-		merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), collector);
-		merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), collector);
-		merger.flatMap1(createTestWindow(4), collector);
-		try {
-			merger.flatMap1(createTestWindow(4), collector);
-			fail();
-		} catch (RuntimeException e) {
-			// Do nothing
-		}
-
-		ParallelMerge<Integer> merger2 = new ParallelMerge<Integer>(reducer);
-		merger2.numberOfDiscretizers = 2;
-		merger2.flatMap1(createTestWindow(0), collector);
-		merger2.flatMap1(createTestWindow(1), collector);
-		merger2.flatMap1(createTestWindow(1), collector);
-		merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
-		try {
-			merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
-			fail();
-		} catch (RuntimeException e) {
-			// Do nothing
-		}
-
-	}
-
-	@Test
-	public void groupedTest() throws Exception {
-
-		TestOutput<StreamWindow<Integer>> output = new TestOutput<StreamWindow<Integer>>();
-		TimestampedCollector<StreamWindow<Integer>> collector = new TimestampedCollector<StreamWindow<Integer>>(output);
-		List<StreamWindow<Integer>> result = output.getCollected();
-
-		ParallelMerge<Integer> merger = new ParallelGroupedMerge<Integer>();
-		merger.numberOfDiscretizers = 2;
-
-		merger.flatMap1(createTestWindow(1), collector);
-		merger.flatMap1(createTestWindow(1), collector);
-		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
-		assertTrue(result.isEmpty());
-		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
-		assertEquals(StreamWindow.fromElements(1, 1), result.get(0));
-	}
-
-	private StreamWindow<Integer> createTestWindow(Integer id) {
-		StreamWindow<Integer> ret = new StreamWindow<Integer>(id);
-		ret.add(1);
-		return ret;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizerTest.java
deleted file mode 100644
index 24251f1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizerTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer;
-import org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class StreamDiscretizerTest {
-
-	
-	@Test
-	public void testDiscretizer() {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(2);
-		inputs.add(2);
-		inputs.add(3);
-		inputs.add(4);
-		inputs.add(5);
-		inputs.add(10);
-		inputs.add(11);
-		inputs.add(11);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(1, 2, 2, 3, 4));
-		expected.add(StreamWindow.fromElements(3, 4, 5));
-		expected.add(StreamWindow.fromElements(5));
-		expected.add(StreamWindow.fromElements(10));
-		expected.add(StreamWindow.fromElements(10, 11, 11));
-
-		Timestamp<Integer> myTimeStamp = new Timestamp<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-		};
-
-		TriggerPolicy<Integer> trigger = new TimeTriggerPolicy<Integer>(2L,
-				new TimestampWrapper<Integer>(myTimeStamp, 3));
-
-		EvictionPolicy<Integer> eviction = new TimeEvictionPolicy<Integer>(4L,
-				new TimestampWrapper<Integer>(myTimeStamp, 1));
-		
-		
-
-		StreamDiscretizer<Integer> discretizer = new StreamDiscretizer<Integer>(trigger, eviction);
-		StreamWindowBuffer<Integer> buffer = new StreamWindowBuffer<Integer>(new BasicWindowBuffer<Integer>());
-
-		List<WindowEvent<Integer>> bufferEvents = MockContext.createAndExecute(discretizer, inputs);
-		List<StreamWindow<Integer>> result = MockContext.createAndExecute(buffer, bufferEvents);
-		
-		assertEquals(expected, result);
-	}
-
-	@Test
-	public void testDiscretizer2() {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(2);
-		inputs.add(2);
-		inputs.add(3);
-		inputs.add(4);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(1, 2));
-		expected.add(StreamWindow.fromElements(2, 3));
-		expected.add(StreamWindow.fromElements(4));
-
-		TriggerPolicy<Integer> trigger = new CountTriggerPolicy<Integer>(2);
-
-		EvictionPolicy<Integer> eviction = new TumblingEvictionPolicy<Integer>();
-
-		StreamDiscretizer<Integer> discretizer = new StreamDiscretizer<Integer>(trigger, eviction);
-		StreamWindowBuffer<Integer> buffer = new StreamWindowBuffer<Integer>(new BasicWindowBuffer<Integer>());
-
-		List<WindowEvent<Integer>> bufferEvents = MockContext.createAndExecute(discretizer, inputs);
-		List<StreamWindow<Integer>> result = MockContext.createAndExecute(buffer, bufferEvents);
-		assertEquals(expected, result);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattenerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattenerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattenerTest.java
deleted file mode 100644
index dc6d0d6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattenerTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowFlattenerTest {
-
-	@Test
-	public void test() {
-		OneInputStreamOperator<StreamWindow<Integer>, Integer> flattener = new WindowFlattener<Integer>();
-
-		StreamWindow<Integer> w1 = StreamWindow.fromElements(1, 2, 3);
-		StreamWindow<Integer> w2 = new StreamWindow<Integer>();
-
-		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
-		input.add(w1);
-		input.add(w2);
-
-		List<Integer> expected = new ArrayList<Integer>();
-		expected.addAll(w1);
-		expected.addAll(w2);
-
-		List<Integer> output = MockContext.createAndExecute(flattener, input);
-
-		assertEquals(expected, output);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFolderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFolderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFolderTest.java
deleted file mode 100644
index 3b54069..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFolderTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowFolderTest {
-
-	@Test
-	public void test() {
-		OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<String>> windowReducer = new WindowFolder<Integer,String>(
-				new FoldFunction<Integer, String>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public String fold(String accumulator, Integer value) throws Exception {
-						return accumulator + value.toString();
-					}
-				}, "");
-
-		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
-		input.add(StreamWindow.fromElements(1, 2, 3));
-		input.add(new StreamWindow<Integer>());
-		input.add(StreamWindow.fromElements(-1));
-
-		List<StreamWindow<String>> expected = new ArrayList<StreamWindow<String>>();
-		expected.add(StreamWindow.fromElements("123"));
-		expected.add(new StreamWindow<String>());
-		expected.add(StreamWindow.fromElements("-1"));
-
-		List<StreamWindow<String>> output = MockContext.createAndExecute(windowReducer, input);
-
-		assertEquals(expected, output);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMapperTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMapperTest.java
deleted file mode 100644
index 9836a99..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMapperTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class WindowMapperTest {
-
-	@Test
-	public void test() {
-		OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> windowMapper = new WindowMapper<Integer, Integer>(
-				new WindowMapFunction<Integer, Integer>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void mapWindow(Iterable<Integer> values, Collector<Integer> out)
-							throws Exception {
-						for (Integer v : values) {
-							out.collect(v);
-						}
-					}
-				});
-
-		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
-		input.add(StreamWindow.fromElements(1, 2, 3));
-		input.add(new StreamWindow<Integer>());
-
-		List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowMapper, input);
-
-		assertEquals(input, output);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMergerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMergerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMergerTest.java
deleted file mode 100644
index 43e3785..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMergerTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowMergerTest {
-
-	@Test
-	public void test() throws Exception {
-		OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> windowMerger = new WindowMerger<Integer>();
-
-		StreamWindow<Integer> w1 = new StreamWindow<Integer>();
-		StreamWindow<Integer> w2 = StreamWindow.fromElements(1, 2, 3, 4);
-		StreamWindow<Integer> w3 = StreamWindow.fromElements(-1, 2, 3, 4);
-		StreamWindow<Integer> w4_1 = new StreamWindow<Integer>(1, 2);
-		StreamWindow<Integer> w4_2 = new StreamWindow<Integer>(1, 2);
-		w4_1.add(1);
-		w4_2.add(2);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(w1);
-		expected.add(w2);
-		expected.add(w3);
-		expected.add(StreamWindow.fromElements(1, 2));
-
-		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
-		input.add(w1);
-		input.add(w4_1);
-		input.addAll(StreamWindow.split(w2, 2));
-		input.addAll(StreamWindow.partitionBy(w3, new KeySelector<Integer, Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value % 2;
-			}
-		}, false));
-		input.add(w4_2);
-
-		List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowMerger, input);
-
-		assertEquals(expected.size(), expected.size());
-		for (int i = 0; i < output.size(); i++) {
-			assertEquals(new HashSet<Integer>(expected.get(i)), new HashSet<Integer>(output.get(i)));
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitionerTest.java
deleted file mode 100644
index 7521a2b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitionerTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowPartitionerTest {
-
-	@Test
-	public void test() throws Exception {
-		OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> splitPartitioner = new WindowPartitioner<Integer>(
-				2);
-
-		OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> gbPartitioner = new WindowPartitioner<Integer>(
-				new MyKey());
-
-		StreamWindow<Integer> w1 = new StreamWindow<Integer>();
-		StreamWindow<Integer> w2 = StreamWindow.fromElements(1, 2, 3, 4);
-
-		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
-		expected1.addAll(StreamWindow.split(w1,2));
-		expected1.addAll(StreamWindow.split(w2,2));
-
-		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
-		expected2.addAll(StreamWindow.partitionBy(w1,new MyKey(),false));
-		expected2.addAll(StreamWindow.partitionBy(w2,new MyKey(),false));
-
-		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
-		input.add(w1);
-		input.add(w2);
-
-		List<StreamWindow<Integer>> output1 = MockContext.createAndExecute(splitPartitioner, input);
-		List<StreamWindow<Integer>> output2 = MockContext.createAndExecute(gbPartitioner, input);
-
-		assertEquals(expected1, output1);
-		assertEquals(expected2, output2);
-
-	}
-
-	private static class MyKey implements KeySelector<Integer, Object> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object getKey(Integer value) throws Exception {
-			return value / 2;
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowReducerTest.java
deleted file mode 100644
index b78a5ba..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowReducerTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowReducerTest {
-
-	@Test
-	public void test() {
-		OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> windowReducer = new WindowReducer<Integer>(
-				new ReduceFunction<Integer>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer reduce(Integer value1, Integer value2) throws Exception {
-						return value1 + value2;
-					}
-				});
-
-		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
-		input.add(StreamWindow.fromElements(1, 2, 3));
-		input.add(new StreamWindow<Integer>());
-		input.add(StreamWindow.fromElements(-1));
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(6));
-		expected.add(new StreamWindow<Integer>());
-		expected.add(StreamWindow.fromElements(-1));
-
-		List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowReducer, input);
-
-		assertEquals(expected, output);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
deleted file mode 100644
index 4611966..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.FullStream;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class WindowingITCase extends StreamingMultipleProgramsTestBase {
-
-	@SuppressWarnings("serial")
-	public static class ModKey implements KeySelector<Integer, Integer> {
-		private int m;
-
-		public ModKey(int m) {
-			this.m = m;
-		}
-
-		@Override
-		public Integer getKey(Integer value) throws Exception {
-			return value % m;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static class IdentityWindowMap implements
-			WindowMapFunction<Integer, StreamWindow<Integer>> {
-
-		@Override
-		public void mapWindow(Iterable<Integer> values, Collector<StreamWindow<Integer>> out)
-				throws Exception {
-
-			StreamWindow<Integer> window = new StreamWindow<Integer>();
-
-			for (Integer value : values) {
-				window.add(value);
-			}
-			out.collect(window);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	@Test
-	public void test() throws Exception {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(2);
-		inputs.add(2);
-		inputs.add(3);
-		inputs.add(4);
-		inputs.add(5);
-		inputs.add(10);
-		inputs.add(11);
-		inputs.add(11);
-
-		KeySelector<Integer, ?> key = new ModKey(2);
-
-		Timestamp<Integer> ts = new IntegerTimestamp();
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(2);
-		env.disableOperatorChaining();
-
-		DataStream<Integer> source = env.fromCollection(inputs);
-
-		source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
-				.addSink(new TestSink1());
-
-		source.window(Time.of(4, ts, 1)).keyBy(new ModKey(2))
-				.mapWindow(new IdentityWindowMap())
-				.flatten()
-				.addSink(new TestSink2()).name("TESTSIUNK2");
-
-		source.keyBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
-				.addSink(new TestSink4());
-
-		source.keyBy(new ModKey(3)).window(Count.of(2)).keyBy(new ModKey(2))
-				.mapWindow(new IdentityWindowMap())
-				.flatten()
-				.addSink(new TestSink5());
-
-		source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
-				.addSink(new TestSink3());
-
-		source.keyBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
-				.addSink(new TestSink6());
-
-		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap())
-				.flatten()
-				.addSink(new TestSink7());
-
-		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).keyBy(new ModKey(2)).sum(0)
-				.getDiscretizedStream()
-				.addSink(new TestSink8());
-
-		try {
-			source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream();
-			fail();
-		} catch (Exception e) {
-		}
-		try {
-			source.window(FullStream.window()).getDiscretizedStream();
-			fail();
-		} catch (Exception e) {
-		}
-		try {
-			source.every(Count.of(5)).mapWindow(new IdentityWindowMap()).getDiscretizedStream();
-			fail();
-		} catch (Exception e) {
-		}
-
-		source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11());
-
-		source.window(FullStream.window()).every(Count.of(4)).keyBy(key).sum(0)
-				.getDiscretizedStream()
-				.addSink(new TestSink12());
-
-		DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-				for (int i = 1; i <= 10; i++) {
-					ctx.collect(i);
-				}
-			}
-
-			@Override
-			public void cancel() {
-			}
-		});
-
-		DataStream<Integer> source3 = env.addSource(new RichParallelSourceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			private int i = 1;
-
-			@Override
-			public void open(Configuration parameters) throws Exception {
-				super.open(parameters);
-				i = 1 + getRuntimeContext().getIndexOfThisSubtask();
-			}
-
-			@Override
-			public void cancel() {
-			}
-
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-				for (;i < 11; i += 2) {
-					ctx.collect(i);
-				}
-
-			}
-		});
-
-		source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9());
-
-		source3.window(Time.of(5, ts, 1)).keyBy(new ModKey(2)).sum(0).getDiscretizedStream()
-				.addSink(new TestSink10());
-
-		source
-				.map(new MapFunction<Integer, Integer>() {
-					@Override
-					public Integer map(Integer value) throws Exception {
-						return value;
-					}
-				})
-				.every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream()
-				.addSink(new TestSink13());
-
-		env.execute();
-
-		// sum ( Time of 3 slide 2 )
-		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
-		expected1.add(StreamWindow.fromElements(5));
-		expected1.add(StreamWindow.fromElements(11));
-		expected1.add(StreamWindow.fromElements(9));
-		expected1.add(StreamWindow.fromElements(10));
-		expected1.add(StreamWindow.fromElements(32));
-
-		validateOutput(expected1, TestSink1.windows);
-
-		// Tumbling Time of 4 grouped by mod 2
-		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
-		expected2.add(StreamWindow.fromElements(2, 2, 4));
-		expected2.add(StreamWindow.fromElements(1, 3));
-		expected2.add(StreamWindow.fromElements(5));
-		expected2.add(StreamWindow.fromElements(10));
-		expected2.add(StreamWindow.fromElements(11, 11));
-
-		validateOutput(expected2, TestSink2.windows);
-
-		// groupby mod 2 sum ( Tumbling Time of 4)
-		List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
-		expected3.add(StreamWindow.fromElements(4));
-		expected3.add(StreamWindow.fromElements(5));
-		expected3.add(StreamWindow.fromElements(22));
-		expected3.add(StreamWindow.fromElements(8));
-		expected3.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected3, TestSink4.windows);
-
-		// groupby mod3 Tumbling Count of 2 grouped by mod 2
-		List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
-		expected4.add(StreamWindow.fromElements(2, 2));
-		expected4.add(StreamWindow.fromElements(1));
-		expected4.add(StreamWindow.fromElements(4));
-		expected4.add(StreamWindow.fromElements(5, 11));
-		expected4.add(StreamWindow.fromElements(10));
-		expected4.add(StreamWindow.fromElements(11));
-		expected4.add(StreamWindow.fromElements(3));
-
-		validateOutput(expected4, TestSink5.windows);
-
-		// min ( Time of 2 slide 3 )
-		List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
-		expected5.add(StreamWindow.fromElements(1));
-		expected5.add(StreamWindow.fromElements(4));
-		expected5.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected5, TestSink3.windows);
-
-		// groupby mod 2 max ( Tumbling Time of 4)
-		List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
-		expected6.add(StreamWindow.fromElements(3));
-		expected6.add(StreamWindow.fromElements(5));
-		expected6.add(StreamWindow.fromElements(11));
-		expected6.add(StreamWindow.fromElements(4));
-		expected6.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected6, TestSink6.windows);
-
-		List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
-		expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
-		expected7.add(StreamWindow.fromElements(10));
-		expected7.add(StreamWindow.fromElements(10, 11, 11));
-
-		validateOutput(expected7, TestSink7.windows);
-
-		List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
-		expected8.add(StreamWindow.fromElements(4, 8));
-		expected8.add(StreamWindow.fromElements(4, 5));
-		expected8.add(StreamWindow.fromElements(10, 22));
-
-		for (List<Integer> sw : TestSink8.windows) {
-			Collections.sort(sw);
-		}
-
-		validateOutput(expected8, TestSink8.windows);
-
-		List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>();
-		expected9.add(StreamWindow.fromElements(6));
-		expected9.add(StreamWindow.fromElements(14));
-		expected9.add(StreamWindow.fromElements(22));
-		expected9.add(StreamWindow.fromElements(30));
-		expected9.add(StreamWindow.fromElements(38));
-
-		validateOutput(expected9, TestSink9.windows);
-
-		List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>();
-		expected10.add(StreamWindow.fromElements(6, 9));
-		expected10.add(StreamWindow.fromElements(16, 24));
-
-		for (List<Integer> sw : TestSink10.windows) {
-			Collections.sort(sw);
-		}
-
-		validateOutput(expected10, TestSink10.windows);
-
-		List<StreamWindow<Integer>> expected11 = new ArrayList<StreamWindow<Integer>>();
-		expected11.add(StreamWindow.fromElements(8));
-		expected11.add(StreamWindow.fromElements(38));
-		expected11.add(StreamWindow.fromElements(49));
-
-		for (List<Integer> sw : TestSink11.windows) {
-			Collections.sort(sw);
-		}
-
-		validateOutput(expected11, TestSink11.windows);
-
-		List<StreamWindow<Integer>> expected12 = new ArrayList<StreamWindow<Integer>>();
-		expected12.add(StreamWindow.fromElements(4, 4));
-		expected12.add(StreamWindow.fromElements(18, 20));
-		expected12.add(StreamWindow.fromElements(18, 31));
-
-		for (List<Integer> sw : TestSink12.windows) {
-			Collections.sort(sw);
-		}
-
-		validateOutput(expected12, TestSink12.windows);
-
-		List<StreamWindow<Integer>> expected13 = new ArrayList<StreamWindow<Integer>>();
-		expected13.add(StreamWindow.fromElements(17));
-		expected13.add(StreamWindow.fromElements(27));
-		expected13.add(StreamWindow.fromElements(49));
-
-		for (List<Integer> sw : TestSink13.windows) {
-			Collections.sort(sw);
-		}
-
-		validateOutput(expected13, TestSink13.windows);
-
-	}
-
-	public static <R> void validateOutput(List<R> expected, List<R> actual) {
-		assertEquals(new HashSet<R>(expected), new HashSet<R>(actual));
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink1 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink2 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink3 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink4 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink5 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink6 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink7 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink8 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink9 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink10 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink11 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink12 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class TestSink13 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	private static class IntegerTimestamp implements Timestamp<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long getTimestamp(Integer value) {
-			return value;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
deleted file mode 100644
index c3efc7b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.StreamWindowSerializer;
-import org.junit.Test;
-
-public class StreamWindowTest {
-
-	@Test
-	public void creationTest() {
-
-		StreamWindow<Integer> window1 = new StreamWindow<Integer>();
-		assertTrue(window1.isEmpty());
-		assertTrue(window1.windowID != 0);
-
-		window1.add(10);
-		assertEquals(1, window1.size());
-
-		StreamWindow<Integer> window2 = new StreamWindow<Integer>(window1);
-
-		assertTrue(window1.windowID == window2.windowID);
-		assertEquals(1, window2.size());
-
-		StreamWindow<Integer> window3 = new StreamWindow<Integer>(100);
-		assertEquals(100, window3.windowID);
-
-		StreamWindow<Integer> window4 = new StreamWindow<Integer>();
-		assertFalse(window4.windowID == window1.windowID);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void mergeTest() throws IOException {
-		StreamWindow<Integer> window1 = new StreamWindow<Integer>().setNumberOfParts(3);
-		StreamWindow<Integer> window2 = new StreamWindow<Integer>(window1.windowID, 3);
-		StreamWindow<Integer> window3 = new StreamWindow<Integer>(window1.windowID, 3);
-
-		window1.add(1);
-		window2.add(2);
-		window3.add(3);
-
-		Set<Integer> values = new HashSet<Integer>();
-		values.add(1);
-		values.add(2);
-		values.add(3);
-
-		StreamWindow<Integer> merged = StreamWindow.merge(window1, window2, window3);
-
-		assertEquals(3, merged.size());
-		assertEquals(window1.windowID, merged.windowID);
-		assertEquals(values, new HashSet<Integer>(merged));
-
-		try {
-			StreamWindow.merge(window1, new StreamWindow<Integer>());
-			fail();
-		} catch (RuntimeException e) {
-			// good
-		}
-
-		List<StreamWindow<Integer>> wList = StreamWindow.split(merged,3);
-
-		StreamWindow<Integer> merged2 = StreamWindow.merge(wList);
-
-		assertEquals(3, merged2.size());
-		assertEquals(window1.windowID, merged2.windowID);
-		assertEquals(values, new HashSet<Integer>(merged2));
-
-	}
-
-	@Test
-	public void serializerTest() throws IOException {
-
-		StreamWindow<Integer> streamWindow = new StreamWindow<Integer>();
-		streamWindow.add(1);
-		streamWindow.add(2);
-		streamWindow.add(3);
-
-		TypeSerializer<StreamWindow<Integer>> ts = new StreamWindowSerializer<Integer>(
-				BasicTypeInfo.INT_TYPE_INFO, null);
-
-		TestOutputView ow = new TestOutputView();
-
-		ts.serialize(streamWindow, ow);
-
-		TestInputView iw = ow.getInputView();
-
-		assertEquals(streamWindow, ts.deserialize(iw));
-
-	}
-
-	@Test
-	public void partitionTest() {
-		StreamWindow<Integer> streamWindow = new StreamWindow<Integer>();
-		streamWindow.add(1);
-		streamWindow.add(2);
-		streamWindow.add(3);
-		streamWindow.add(4);
-		streamWindow.add(5);
-		streamWindow.add(6);
-
-		List<StreamWindow<Integer>> split = StreamWindow.split(streamWindow,2);
-		assertEquals(2, split.size());
-		assertEquals(StreamWindow.fromElements(1, 2, 3), split.get(0));
-		assertEquals(StreamWindow.fromElements(4, 5, 6), split.get(1));
-
-		List<StreamWindow<Integer>> split2 = StreamWindow.split(streamWindow,6);
-		assertEquals(6, split2.size());
-		assertEquals(StreamWindow.fromElements(1), split2.get(0));
-		assertEquals(StreamWindow.fromElements(2), split2.get(1));
-		assertEquals(StreamWindow.fromElements(3), split2.get(2));
-		assertEquals(StreamWindow.fromElements(4), split2.get(3));
-		assertEquals(StreamWindow.fromElements(5), split2.get(4));
-		assertEquals(StreamWindow.fromElements(6), split2.get(5));
-
-		List<StreamWindow<Integer>> split3 = StreamWindow.split(streamWindow,10);
-		assertEquals(6, split3.size());
-		assertEquals(StreamWindow.fromElements(1), split3.get(0));
-		assertEquals(StreamWindow.fromElements(2), split3.get(1));
-		assertEquals(StreamWindow.fromElements(3), split3.get(2));
-		assertEquals(StreamWindow.fromElements(4), split3.get(3));
-		assertEquals(StreamWindow.fromElements(5), split3.get(4));
-		assertEquals(StreamWindow.fromElements(6), split3.get(5));
-
-	}
-
-	private class TestOutputView extends DataOutputStream implements DataOutputView {
-
-		public TestOutputView() {
-			super(new ByteArrayOutputStream(4096));
-		}
-
-		public TestInputView getInputView() {
-			ByteArrayOutputStream baos = (ByteArrayOutputStream) out;
-			return new TestInputView(baos.toByteArray());
-		}
-
-		@Override
-		public void skipBytesToWrite(int numBytes) throws IOException {
-			for (int i = 0; i < numBytes; i++) {
-				write(0);
-			}
-		}
-
-		@Override
-		public void write(DataInputView source, int numBytes) throws IOException {
-			byte[] buffer = new byte[numBytes];
-			source.readFully(buffer);
-			write(buffer);
-		}
-	}
-
-	private class TestInputView extends DataInputStream implements DataInputView {
-
-		public TestInputView(byte[] data) {
-			super(new ByteArrayInputStream(data));
-		}
-
-		@Override
-		public void skipBytesToRead(int numBytes) throws IOException {
-			while (numBytes > 0) {
-				int skipped = skipBytes(numBytes);
-				numBytes -= skipped;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java
deleted file mode 100644
index 2e98a8f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.PojoField;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-import static org.junit.Assert.*;
-
-public class StreamWindowTypeInfoTest extends TestLogger {
-
-	public static class TestClass{}
-
-	@Test
-	public void testStreamWindowTypeInfoEquality() {
-		StreamWindowTypeInfo<TestClass> tpeInfo1 = new StreamWindowTypeInfo<>(new GenericTypeInfo<>(TestClass.class));
-		StreamWindowTypeInfo<TestClass> tpeInfo2 = new StreamWindowTypeInfo<>(new GenericTypeInfo<>(TestClass.class));
-
-		assertEquals(tpeInfo1, tpeInfo2);
-		assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
-	}
-
-	@Test
-	public void testStreamWindowTypeInfoInequality() {
-		StreamWindowTypeInfo<TestClass> tpeInfo1 = new StreamWindowTypeInfo<>(new GenericTypeInfo<>(TestClass.class));
-		StreamWindowTypeInfo<TestClass> tpeInfo2 = new StreamWindowTypeInfo<>(new PojoTypeInfo<>(TestClass.class, new ArrayList<PojoField>()));
-
-		assertNotEquals(tpeInfo1, tpeInfo2);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java
deleted file mode 100644
index 17d3974..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.streaming.api.windowing.extractor.ArrayFromTuple;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ArrayFromTupleTest {
-
-	private String[] testStrings;
-
-	@Before
-	public void init() {
-		testStrings = new String[Tuple.MAX_ARITY];
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			testStrings[i] = Integer.toString(i);
-		}
-	}
-
-	@Test
-	public void testConvertFromTupleToArray() throws InstantiationException, IllegalAccessException {
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			Tuple currentTuple = (Tuple) CLASSES[i].newInstance();
-			String[] currentArray = new String[i + 1];
-			for (int j = 0; j <= i; j++) {
-				currentTuple.setField(testStrings[j], j);
-				currentArray[j] = testStrings[j];
-			}
-			arrayEqualityCheck(currentArray, new ArrayFromTuple().extract(currentTuple));
-		}
-	}
-
-	@Test
-	public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException {
-		Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance();
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			currentTuple.setField(testStrings[i], i);
-		}
-
-		String[] expected = { testStrings[5], testStrings[3], testStrings[6], testStrings[7],
-				testStrings[0] };
-		arrayEqualityCheck(expected, new ArrayFromTuple(5, 3, 6, 7, 0).extract(currentTuple));
-
-		String[] expected2 = { testStrings[0], testStrings[Tuple.MAX_ARITY - 1] };
-		arrayEqualityCheck(expected2,
-				new ArrayFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple));
-
-		String[] expected3 = { testStrings[Tuple.MAX_ARITY - 1], testStrings[0] };
-		arrayEqualityCheck(expected3,
-				new ArrayFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple));
-
-		String[] expected4 = { testStrings[13], testStrings[4], testStrings[5], testStrings[4],
-				testStrings[2], testStrings[8], testStrings[6], testStrings[2], testStrings[8],
-				testStrings[3], testStrings[5], testStrings[2], testStrings[16], testStrings[4],
-				testStrings[3], testStrings[2], testStrings[6], testStrings[4], testStrings[7],
-				testStrings[4], testStrings[2], testStrings[8], testStrings[7], testStrings[2] };
-		arrayEqualityCheck(expected4, new ArrayFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16,
-				4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple));
-	}
-
-	private void arrayEqualityCheck(Object[] array1, Object[] array2) {
-		assertEquals("The result arrays must have the same length", array1.length, array2.length);
-		for (int i = 0; i < array1.length; i++) {
-			assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
-		}
-	}
-
-	private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
-			Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
-			Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
-			Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
-			Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
-			Tuple24.class, Tuple25.class };
-}