You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:39 UTC

[03/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and API

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtractTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtractTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtractTest.java
deleted file mode 100644
index e99de38..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtractTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ConcatenatedExtractTest {
-
-	private String[] testStringArray1 = { "1", "2", "3" };
-	private int[] testIntArray1 = { 1, 2, 3 };
-	private String[] testStringArray2 = { "4", "5", "6" };
-	private int[] testIntArray2 = { 4, 5, 6 };
-	private String[] testStringArray3 = { "7", "8", "9" };
-	private int[] testIntArray3 = { 7, 8, 9 };
-	private Tuple2<String[], int[]>[] testTuple2Array;
-	private Tuple2<String[], int[]> testTuple2;
-	private Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]> testData;
-
-	@SuppressWarnings("unchecked")
-	@Before
-	public void setupData() {
-		testTuple2Array = new Tuple2[2];
-		testTuple2Array[0] = new Tuple2<String[], int[]>(testStringArray1, testIntArray2);
-		testTuple2Array[1] = new Tuple2<String[], int[]>(testStringArray2, testIntArray1);
-
-		testTuple2 = new Tuple2<String[], int[]>(testStringArray3, testIntArray3);
-
-		testData = new Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]>(testTuple2,
-				testTuple2Array);
-	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void test1() {
-		Extractor ext = new ConcatenatedExtract(new FieldFromTuple(0), new FieldFromTuple(1))
-				.add(new FieldsFromArray(Integer.class, 2, 1, 0));
-		int[] expected = { testIntArray3[2], testIntArray3[1], testIntArray3[0] };
-		assertEquals(new Integer(expected[0]), ((Integer[]) ext.extract(testData))[0]);
-		assertEquals(new Integer(expected[1]), ((Integer[]) ext.extract(testData))[1]);
-		assertEquals(new Integer(expected[2]), ((Integer[]) ext.extract(testData))[2]);
-	}
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	@Test
-	public void test2() {
-		Extractor ext = new ConcatenatedExtract(new FieldFromTuple(1), // Tuple2<String[],int[]>[]
-				new FieldsFromArray(Tuple2.class, 1)) // Tuple2<String[],int[]>[]
-				.add(new FieldFromArray(0)) // Tuple2<String[],int[]>
-				.add(new ArrayFromTuple(0)) // Object[] (Containing String[])
-				.add(new FieldFromArray(0)) // String[]
-				.add(new FieldFromArray(1)); // String
-
-		String expected2 = testStringArray2[1];
-		assertEquals(expected2, ext.extract(testData));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java
deleted file mode 100644
index 2d4dbcf..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.streaming.api.windowing.extractor.FieldFromArray;
-import org.junit.Test;
-
-public class FieldFromArrayTest {
-
-	String[] testStringArray = { "0", "1", "2", "3", "4" };
-	Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
-	int[] testIntArray = { 20, 21, 22, 23, 24 };
-
-	@Test
-	public void testStringArray() {
-		for (int i = 0; i < this.testStringArray.length; i++) {
-			assertEquals(this.testStringArray[i],
-					new FieldFromArray<String>(i).extract(testStringArray));
-		}
-	}
-
-	@Test
-	public void testIntegerArray() {
-		for (int i = 0; i < this.testIntegerArray.length; i++) {
-			assertEquals(this.testIntegerArray[i],
-					new FieldFromArray<String>(i).extract(testIntegerArray));
-		}
-	}
-
-	@Test
-	public void testIntArray() {
-		for (int i = 0; i < this.testIntArray.length; i++) {
-			assertEquals(new Integer(this.testIntArray[i]),
-					new FieldFromArray<Integer>(i).extract(testIntArray));
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java
deleted file mode 100644
index 528611a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.streaming.api.windowing.extractor.FieldFromTuple;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldFromTupleTest {
-
-	private String[] testStrings;
-
-	@Before
-	public void init() {
-		testStrings = new String[Tuple.MAX_ARITY];
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			testStrings[i] = Integer.toString(i);
-		}
-	}
-
-	@Test
-	public void testSingleFieldExtraction() throws InstantiationException, IllegalAccessException {
-		// extract single fields
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			Tuple current = (Tuple) CLASSES[i].newInstance();
-			for (int j = 0; j < i; j++) {
-				current.setField(testStrings[j], j);
-			}
-			for (int j = 0; j < i; j++) {
-				assertEquals(testStrings[j], new FieldFromTuple<String>(j).extract(current));
-			}
-		}
-	}
-
-	private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
-			Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
-			Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
-			Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
-			Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
-			Tuple24.class, Tuple25.class };
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java
deleted file mode 100644
index 3139aa5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.streaming.api.windowing.extractor.FieldsFromArray;
-import org.junit.Test;
-
-public class FieldsFromArrayTest {
-
-	String[] testStringArray = { "0", "1", "2", "3", "4" };
-	Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
-	int[] testIntArray = { 20, 21, 22, 23, 24 };
-
-	@Test
-	public void testStringArray() {
-		// check single field extraction
-		for (int i = 0; i < testStringArray.length; i++) {
-			String[] tmp = { testStringArray[i] };
-			arrayEqualityCheck(tmp,
-					new FieldsFromArray<String>(String.class, i).extract(testStringArray));
-		}
-
-		// check reverse order
-		String[] reverseOrder = new String[testStringArray.length];
-		for (int i = 0; i < testStringArray.length; i++) {
-			reverseOrder[i] = testStringArray[testStringArray.length - i - 1];
-		}
-		arrayEqualityCheck(reverseOrder,
-				new FieldsFromArray<String>(String.class, 4, 3, 2, 1, 0).extract(testStringArray));
-
-		// check picking fields and reorder
-		String[] crazyOrder = { testStringArray[4], testStringArray[1], testStringArray[2] };
-		arrayEqualityCheck(crazyOrder,
-				new FieldsFromArray<String>(String.class, 4, 1, 2).extract(testStringArray));
-	}
-
-	@Test
-	public void testIntegerArray() {
-		// check single field extraction
-		for (int i = 0; i < testIntegerArray.length; i++) {
-			Integer[] tmp = { testIntegerArray[i] };
-			arrayEqualityCheck(tmp,
-					new FieldsFromArray<Integer>(Integer.class, i).extract(testIntegerArray));
-		}
-
-		// check reverse order
-		Integer[] reverseOrder = new Integer[testIntegerArray.length];
-		for (int i = 0; i < testIntegerArray.length; i++) {
-			reverseOrder[i] = testIntegerArray[testIntegerArray.length - i - 1];
-		}
-		arrayEqualityCheck(reverseOrder,
-				new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0)
-						.extract(testIntegerArray));
-
-		// check picking fields and reorder
-		Integer[] crazyOrder = { testIntegerArray[4], testIntegerArray[1], testIntegerArray[2] };
-		arrayEqualityCheck(crazyOrder,
-				new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntegerArray));
-
-	}
-
-	@Test
-	public void testIntArray() {
-		for (int i = 0; i < testIntArray.length; i++) {
-			Integer[] tmp = { testIntArray[i] };
-			arrayEqualityCheck(tmp,
-					new FieldsFromArray<Integer>(Integer.class, i).extract(testIntArray));
-		}
-
-		// check reverse order
-		Integer[] reverseOrder = new Integer[testIntArray.length];
-		for (int i = 0; i < testIntArray.length; i++) {
-			reverseOrder[i] = testIntArray[testIntArray.length - i - 1];
-		}
-		arrayEqualityCheck(reverseOrder,
-				new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0).extract(testIntArray));
-
-		// check picking fields and reorder
-		Integer[] crazyOrder = { testIntArray[4], testIntArray[1], testIntArray[2] };
-		arrayEqualityCheck(crazyOrder,
-				new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntArray));
-
-	}
-
-	private void arrayEqualityCheck(Object[] array1, Object[] array2) {
-		assertEquals("The result arrays must have the same length", array1.length, array2.length);
-		for (int i = 0; i < array1.length; i++) {
-			assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java
deleted file mode 100644
index 0379fe0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTupleTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.streaming.api.windowing.extractor.FieldsFromTuple;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldsFromTupleTest {
-
-	private double[] testDouble;
-
-	@Before
-	public void init() {
-		testDouble = new double[Tuple.MAX_ARITY];
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			testDouble[i] = i;
-		}
-	}
-
-	@Test
-	public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException {
-		Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance();
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			currentTuple.setField(testDouble[i], i);
-		}
-
-		double[] expected = { testDouble[5], testDouble[3], testDouble[6], testDouble[7],
-				testDouble[0] };
-		arrayEqualityCheck(expected, new FieldsFromTuple(5, 3, 6, 7, 0).extract(currentTuple));
-
-		double[] expected2 = { testDouble[0], testDouble[Tuple.MAX_ARITY - 1] };
-		arrayEqualityCheck(expected2,
-				new FieldsFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple));
-
-		double[] expected3 = { testDouble[Tuple.MAX_ARITY - 1], testDouble[0] };
-		arrayEqualityCheck(expected3,
-				new FieldsFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple));
-
-		double[] expected4 = { testDouble[13], testDouble[4], testDouble[5], testDouble[4],
-				testDouble[2], testDouble[8], testDouble[6], testDouble[2], testDouble[8],
-				testDouble[3], testDouble[5], testDouble[2], testDouble[16], testDouble[4],
-				testDouble[3], testDouble[2], testDouble[6], testDouble[4], testDouble[7],
-				testDouble[4], testDouble[2], testDouble[8], testDouble[7], testDouble[2] };
-		arrayEqualityCheck(expected4, new FieldsFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16,
-				4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple));
-	}
-
-	private void arrayEqualityCheck(double[] array1, double[] array2) {
-		assertEquals("The result arrays must have the same length", array1.length, array2.length);
-		for (int i = 0; i < array1.length; i++) {
-			assertEquals("Unequal fields at position " + i, array1[i], array2[i], 0d);
-		}
-	}
-
-	private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
-			Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
-			Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
-			Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
-			Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
-			Tuple24.class, Tuple25.class };
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java
deleted file mode 100644
index 8a7a011..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class CountEvictionPolicyTest {
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	@Test
-	public void testCountEvictionPolicy() {
-		List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-		int counter;
-
-		// The count policy should not care about the triggered parameter
-		// Therefore its value switches after each use in this test.
-		boolean triggered = false;
-		// the size of the buffer should not matter as well!
-
-		// Test count of different sizes (0..9)
-		for (int i = 0; i < 10; i++) {
-			EvictionPolicy evictionPolicy = new CountEvictionPolicy(i, i);
-			counter = 0;
-
-			// Test first i steps (should not evict)
-			for (int j = 0; j < i; j++) {
-				counter++;
-				assertEquals("Evictionpolicy with count of " + i + " evicted tuples at add nr. "
-						+ counter + ". It should not evict for the first " + i + " adds.", 0,
-						evictionPolicy.notifyEviction(tuples.get(j), (triggered = !triggered),
-								tuples.get(Math.abs((i - j)) % 10)));
-			}
-
-			// Test the next three evictions
-			for (int j = 0; j < 3; j++) {
-				// The first add should evict now
-				counter++;
-				assertEquals("Evictionpolicy with count of " + i
-						+ " did not evict correct number of tuples at the expected pos " + counter
-						+ ".", i, evictionPolicy.notifyEviction(tuples.get(j),
-						(triggered = !triggered), tuples.get(Math.abs((i - j)) % 10)));
-
-				// the next i-1 adds should not evict
-				for (int k = 0; k < i - 1; k++) {
-					counter++;
-					assertEquals("Evictionpolicy with count of " + i
-							+ " evicted tuples at add nr. " + counter, 0,
-							evictionPolicy.notifyEviction(tuples.get(j), (triggered = !triggered),
-									tuples.get(Math.abs((i - j)) % 10)));
-				}
-			}
-		}
-	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testCountEvictionPolicyStartValuesAndEvictionAmount() {
-
-		// The count policy should not care about the triggered parameter
-		// Therefore its value switches after each use in this test.
-		boolean triggered = false;
-		// the size of the buffer should not matter as well!
-
-		List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-
-		// Text different eviction amounts (0..3)
-		for (int x = 0; x < 4; x++) {
-
-			// Test count of different sizes (0..9)
-			for (int i = 0; i < 10; i++) {
-
-				int counter = 0;
-
-				// Test different start values (-5..5)
-				for (int j = -5; i < 6; i++) {
-					EvictionPolicy evictionPolicy = new CountEvictionPolicy(i, x, j);
-					// Add tuples without eviction
-					for (int k = 0; k < ((i - j > 0) ? i - j : 0); k++) {
-						counter++;
-						assertEquals("Evictionpolicy with count of " + i
-								+ " did not evict correct number of tuples at the expected pos "
-								+ counter + ".", 0, evictionPolicy.notifyEviction(
-								tuples.get(Math.abs(j)), (triggered = !triggered),
-								tuples.get(Math.abs((i - j)) % 10)));
-					}
-					// Expect eviction
-					counter++;
-					assertEquals("Evictionpolicy with count of " + i
-							+ " did not evict correct number of tuples at the expected pos "
-							+ counter + ".", x, evictionPolicy.notifyEviction(
-							tuples.get(Math.abs(j)), (triggered = !triggered),
-							tuples.get(Math.abs((i - j)) % 10)));
-				}
-			}
-		}
-	}
-
-	@Test
-	public void equalityTest() {
-		assertEquals(new CountEvictionPolicy<Integer>(5, 5, 5), new CountEvictionPolicy<Integer>(5,
-				5, 5));
-
-		assertEquals(new CountEvictionPolicy<Integer>(5, 5), new CountEvictionPolicy<Integer>(5, 5));
-		assertEquals(new CountEvictionPolicy<Integer>(5), new CountEvictionPolicy<Integer>(5));
-
-		assertNotEquals(new CountEvictionPolicy<Integer>(4, 5, 5),
-				new CountEvictionPolicy<Integer>(5, 5, 5));
-		assertNotEquals(new CountEvictionPolicy<Integer>(5, 5, 5),
-				new CountEvictionPolicy<Integer>(5, 4, 5));
-
-		assertNotEquals(new CountEvictionPolicy<Integer>(5, 5, 5),
-				new CountEvictionPolicy<Integer>(5, 5, 4));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java
deleted file mode 100644
index ce5ae3b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.junit.Test;
-
-public class CountTriggerPolicyTest {
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testCountTriggerPolicy() {
-
-		List tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-		int counter;
-
-		// Test count of different sizes (0..9)
-		for (int i = 0; i < 10; i++) {
-			TriggerPolicy triggerPolicy = Count.of(i).toTrigger();
-			counter = 0;
-
-			// Test first i steps (should not trigger)
-			for (int j = 0; j < i; j++) {
-				counter++;
-				assertFalse("Triggerpolicy with count of " + i + " triggered at add nr. " + counter
-						+ ". It should not trigger for the first " + i + " adds.",
-						triggerPolicy.notifyTrigger(tuples.get(j)));
-			}
-
-			// Test the next three triggers
-			for (int j = 0; j < 3; j++) {
-				// The first add should trigger now
-				counter++;
-				assertTrue("Triggerpolicy with count of " + i
-						+ " did not trigger at the expected pos " + counter + ".",
-						triggerPolicy.notifyTrigger(tuples.get(j)));
-
-				// the next i-1 adds should not trigger
-				for (int k = 0; k < i - 1; k++) {
-					counter++;
-					assertFalse("Triggerpolicy with count of " + i + " triggered at add nr. "
-							+ counter, triggerPolicy.notifyTrigger(tuples.get(k)));
-				}
-			}
-		}
-	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testCountTriggerPolicyStartValues() {
-
-		List tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-
-		// Test count of different sizes (0..9)
-		for (int i = 0; i < 10; i++) {
-
-			// Test different start values (-5..5)
-			for (int j = -5; i < 6; i++) {
-				TriggerPolicy triggerPolicy = new CountTriggerPolicy(i, j);
-				// Add tuples without trigger
-				for (int k = 0; k < ((i - j > 0) ? i - j : 0); k++) {
-					assertFalse("Triggerpolicy with count of " + i + " and start value of " + j
-							+ " triggered at add nr. " + (k + 1),
-							triggerPolicy.notifyTrigger(tuples.get(k % 10)));
-				}
-				// Expect trigger
-				assertTrue("Triggerpolicy with count of " + i + "and start value of " + j
-						+ " did not trigger at the expected position.",
-						triggerPolicy.notifyTrigger(tuples.get(0)));
-			}
-		}
-	}
-
-	@Test
-	public void equalityTest() {
-		assertEquals(new CountTriggerPolicy<Integer>(5, 5), new CountTriggerPolicy<Integer>(5, 5));
-
-		assertEquals(new CountTriggerPolicy<Integer>(5, 5), new CountTriggerPolicy<Integer>(5, 5));
-		assertEquals(new CountTriggerPolicy<Integer>(5), new CountTriggerPolicy<Integer>(5));
-
-		assertNotEquals(new CountTriggerPolicy<Integer>(4, 5),
-				new CountTriggerPolicy<Integer>(5, 5));
-		assertNotEquals(new CountTriggerPolicy<Integer>(5, 5),
-				new CountTriggerPolicy<Integer>(5, 4));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
deleted file mode 100644
index 9ec4644..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Arrays;
-
-import static org.junit.Assert.*;
-
-public class DeltaPolicyTest {
-
-	//Dummy serializer, this is not used because the tests are done locally
-	private final static TypeSerializer<Tuple2<Integer, Integer>> SERIALIZER = null;
-
-	@SuppressWarnings({ "serial", "unchecked", "rawtypes" })
-	@Test
-	public void testDelta() {
-		DeltaPolicy deltaPolicy = new DeltaPolicy(new DeltaFunction<Tuple2<Integer, Integer>>() {
-			@Override
-			public double getDelta(Tuple2<Integer, Integer> oldDataPoint,
-					Tuple2<Integer, Integer> newDataPoint) {
-				return (double) newDataPoint.f0 - oldDataPoint.f0;
-			}
-		}, new Tuple2(0, 0), 2, SERIALIZER);
-
-		List<Tuple2> tuples = Arrays.asList(new Tuple2(1, 0), new Tuple2(2, 0), new Tuple2(3, 0),
-				new Tuple2(6, 0));
-
-		assertFalse(deltaPolicy.notifyTrigger(tuples.get(0)));
-		assertEquals(0, deltaPolicy.notifyEviction(tuples.get(0), false, 0));
-
-		assertFalse(deltaPolicy.notifyTrigger(tuples.get(1)));
-		assertEquals(0, deltaPolicy.notifyEviction(tuples.get(1), false, 1));
-
-		assertTrue(deltaPolicy.notifyTrigger(tuples.get(2)));
-		assertEquals(1, deltaPolicy.notifyEviction(tuples.get(2), true, 2));
-
-		assertTrue(deltaPolicy.notifyTrigger(tuples.get(3)));
-		assertEquals(2, deltaPolicy.notifyEviction(tuples.get(3), true, 2));
-	}
-
-	@Test
-	public void testEquality() {
-
-		DeltaFunction<Tuple2<Integer, Integer>> df = new DeltaFunction<Tuple2<Integer, Integer>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public double getDelta(Tuple2<Integer, Integer> oldDataPoint,
-					Tuple2<Integer, Integer> newDataPoint) {
-				return (double) newDataPoint.f0 - oldDataPoint.f0;
-			}
-		};
-
-		assertEquals(new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(0,
-				0), 2, SERIALIZER), new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
-				0, 0), 2, SERIALIZER));
-
-		assertNotEquals(new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
-				0, 1), 2, SERIALIZER), new DeltaPolicy<Tuple2<Integer, Integer>>(df,
-				new Tuple2<Integer, Integer>(0, 0), 2, SERIALIZER));
-
-		assertNotEquals(new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(0,
-				0), 2, SERIALIZER), new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
-				0, 0), 3, SERIALIZER));
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicyTest.java
deleted file mode 100644
index 3214aa7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicyTest.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.junit.Test;
-
-public class MultiEvictionPolicyTest {
-
-	private final List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-
-	private final CloneableEvictionPolicy<Integer> evictionPolicy1 = new CountEvictionPolicy<Integer>(
-			7, 3);
-	private final CloneableEvictionPolicy<Integer> evictionPolicy2 = new CountEvictionPolicy<Integer>(
-			3, 1);
-	private final CloneableEvictionPolicy<Integer> evictionPolicy3 = new CountEvictionPolicy<Integer>(
-			5, 2);
-
-	private final CloneableEvictionPolicy<Integer> activeEvictionPolicy1 = new ActiveCloneableEvictionPolicyWrapper<Integer>(
-			evictionPolicy1);
-	private final CloneableEvictionPolicy<Integer> activeEvictionPolicy2 = new ActiveCloneableEvictionPolicyWrapper<Integer>(
-			evictionPolicy2);
-	private final CloneableEvictionPolicy<Integer> activeEvictionPolicy3 = new ActiveCloneableEvictionPolicyWrapper<Integer>(
-			evictionPolicy3);
-
-	// From policies specified above the expected output is:
-	// 1.: 0000000300
-	// 2.: 0001111111
-	// 3.: 0000020202
-	private final Integer[] maxResult = { 0, 0, 0, 1, 1, 2, 1, 3, 1, 2 };
-	private final Integer[] minResult = { 0, 0, 0, 0, 0, 0, 0, 1, 0, 0 };
-	private final Integer[] sumResult = { 0, 0, 0, 1, 1, 3, 1, 6, 1, 3 };
-	private final Integer[] priorityResult = { 0, 0, 0, 1, 1, 1, 1, 3, 1, 1 };
-
-	/*
-	 * Test cases for not active policies
-	 */
-
-	@Test
-	public void notActiveEvictionMAXStrategyTest() {
-		runNotActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.MAX, maxResult);
-	}
-
-	@Test
-	public void notActiveEvictionMINStrategyTest() {
-		runNotActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.MIN, minResult);
-	}
-
-	@Test
-	public void notActiveEvictionSUMStrategyTest() {
-		runNotActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.SUM, sumResult);
-	}
-
-	@Test
-	public void notActiveEvictionPRIORITYStrategyTest() {
-		runNotActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.PRIORITY, priorityResult);
-	}
-
-	/*
-	 * Test cases for active policies
-	 */
-
-	@Test
-	public void activeEvictionMAXStrategyTest() {
-		runActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.MAX, maxResult);
-	}
-
-	@Test
-	public void activeEvictionMINStrategyTest() {
-		runActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.MIN, minResult);
-	}
-
-	@Test
-	public void activeEvictionSUMStrategyTest() {
-		runActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.SUM, sumResult);
-	}
-
-	@Test
-	public void activeEvictionPRIORITYStrategyTest() {
-		runActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy.PRIORITY, priorityResult);
-	}
-
-	/**
-	 * Helper method: It runs the test with the given input using the not active
-	 * policies and applies the strategy defined in the parameter.
-	 * 
-	 * @param strategy
-	 *            the eviction strategy to be used
-	 * @param expectedResult
-	 *            the result we expect
-	 */
-	private void runNotActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy strategy,
-			Integer[] expectedResult) {
-		@SuppressWarnings("unchecked")
-		MultiEvictionPolicy<Integer> multiEviction = new MultiEvictionPolicy<Integer>(strategy,
-				evictionPolicy1.clone(), evictionPolicy2.clone(), evictionPolicy3.clone());
-
-		List<Integer> result = new LinkedList<Integer>();
-
-		int buffersize = 0;
-		for (Integer tuple : tuples) {
-			// The buffer size should not matter, but we keep it for the case of
-			// later policy changes.
-			// The trigger does not matter. Always set it to false.
-			int eviction = multiEviction.notifyEviction(tuple, false, buffersize);
-			buffersize -= eviction;
-			result.add(eviction);
-
-			if (buffersize < 0) {
-				buffersize = 0;
-			}
-
-			buffersize++;
-		}
-
-		arrayEqualityCheck(expectedResult, result.toArray());
-	}
-
-	/**
-	 * Helper method: It runs the test with the given input using the active
-	 * policies and applies the strategy defined in the parameter.
-	 * 
-	 * @param strategy
-	 *            the eviction strategy to be used
-	 * @param expectedResult
-	 *            the result we expect
-	 */
-	private void runActiveEvictionTest(MultiEvictionPolicy.EvictionStrategy strategy,
-			Integer[] expectedResult) {
-		@SuppressWarnings("unchecked")
-		MultiEvictionPolicy<Integer> multiEviction = new MultiEvictionPolicy<Integer>(strategy,
-				activeEvictionPolicy1.clone(), activeEvictionPolicy2.clone(),
-				activeEvictionPolicy3.clone());
-
-		List<Integer> result = new LinkedList<Integer>();
-
-		int buffersize = 0;
-		for (Integer tuple : tuples) {
-			// The buffer size should not matter, but we keep it for the case of
-			// later policy changes.
-			// The trigger does not matter. Always set it to false.
-			int eviction = multiEviction.notifyEvictionWithFakeElement(tuple, buffersize);
-			buffersize -= eviction;
-			result.add(eviction);
-
-			if (buffersize < 0) {
-				buffersize = 0;
-			}
-
-			buffersize++;
-		}
-
-		arrayEqualityCheck(expectedResult, result.toArray());
-	}
-
-	private void arrayEqualityCheck(Object[] array1, Object[] array2) {
-		assertEquals(
-				"The result arrays must have the same length. (Expected: " + Arrays.asList(array1)
-						+ "; Actual: " + Arrays.asList(array2) + ")", array1.length, array2.length);
-		for (int i = 0; i < array1.length; i++) {
-			assertEquals("Unequal fields at position " + i + "(Expected: " + Arrays.asList(array1)
-					+ "; Actual: " + Arrays.asList(array2) + ")", array1[i], array2[i]);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
deleted file mode 100644
index 4448b59..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import com.google.common.collect.Sets;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class MultiTriggerPolicyTest {
-
-	/**
-	 * This constant defines the timeout for the test of the start ups of the
-	 * active trigger policy Threads.
-	 */
-	private static final int TIMEOUT = 120000;
-
-	// Use this to increase the timeout to be as long as possible.
-	// private static final int TIMEOUT=Integer.MAX_VALUE;
-
-	/**
-	 * This test covers all regular notify call. It takes no fake elements into
-	 * account.
-	 */
-	@Test
-	public void testWithoutActivePolicies() {
-		List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
-				16);
-
-		TriggerPolicy<Integer> firstPolicy = new CountTriggerPolicy<Integer>(3);
-		TriggerPolicy<Integer> secondPolicy = new CountTriggerPolicy<Integer>(5);
-		TriggerPolicy<Integer> thirdPolicy = new CountTriggerPolicy<Integer>(8);
-		@SuppressWarnings("unchecked")
-		TriggerPolicy<Integer> multiTrigger = new MultiTriggerPolicy<Integer>(firstPolicy,
-				secondPolicy, thirdPolicy);
-
-		// From above policies the expected output is (first element is 0):
-		// first: 3, 6, 9, 12, 15,...
-		// second: 5, 10, 15,...
-		// third: 8, 16, 24,...
-		// combination: 3,5,6,8,9,10,12,15,16
-		List<Integer> expectedResult = Arrays.asList(3, 5, 6, 8, 9, 10, 12, 15, 16);
-		List<Integer> actualResult = new LinkedList<Integer>();
-
-		for (int i = 0; i < tuples.size(); i++) {
-			if (multiTrigger.notifyTrigger(tuples.get(i))) {
-				actualResult.add(i);
-			}
-		}
-
-		// check equal sizes
-		assertTrue("The expected result list and the actual result list must have the same size,"
-				+ " but they are different. (expected: " + expectedResult.size() + "; actual: "
-				+ actualResult.size() + "). Actual result is: " + actualResult
-				+ " Expected result is: " + expectedResult,
-				expectedResult.size() == actualResult.size());
-
-		// check equal elements within result list/expected list
-		for (int i = 0; i < expectedResult.size(); i++) {
-			assertTrue("The actual and the expected result does not match at position " + i
-					+ ". (expected: " + expectedResult.get(i) + "; actual: " + actualResult.get(i)
-					+ "). Actual result is: " + actualResult + " Expected result is: "
-					+ expectedResult, expectedResult.get(i) == actualResult.get(i));
-		}
-	}
-
-	/**
-	 * This test covers the pre-notify calls to active policies. I takes no
-	 * regular notify into account.
-	 */
-	@Test
-	public void testWithActivePolicies() {
-
-		// create some test data
-		Integer[] times = { 1, 3, 20, 26 };
-
-		// create a timestamp
-		@SuppressWarnings("serial")
-		Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-
-		};
-
-		// create policy
-		TimeTriggerPolicy<Integer> firstPolicy = new TimeTriggerPolicy<Integer>(5,
-				new TimestampWrapper<Integer>(timeStamp, 0));
-		TimeTriggerPolicy<Integer> secondPolicy = new TimeTriggerPolicy<Integer>(10,
-				new TimestampWrapper<Integer>(timeStamp, 0));
-		TimeTriggerPolicy<Integer> thirdPolicy = new TimeTriggerPolicy<Integer>(22,
-				new TimestampWrapper<Integer>(timeStamp, 0));
-		@SuppressWarnings("unchecked")
-		MultiTriggerPolicy<Integer> multiTrigger = new MultiTriggerPolicy<Integer>(firstPolicy,
-				secondPolicy, thirdPolicy);
-
-		// expected result
-		// Long[][] result1 = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } };
-		// Long[][] result2 = { {}, {}, { 9L, 19L }, { } };
-		// Long[][] result3 = { {}, {}, { }, { 21L } };
-		Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L, 9L, 19L }, { 24L, 21L } };
-
-		// call policy
-		for (int i = 0; i < times.length; i++) {
-			arrayEqualityCheck(result[i], multiTrigger.preNotifyTrigger(times[i]));
-			multiTrigger.notifyTrigger(times[i]);
-		}
-	}
-
-	/**
-	 * This test verifies, that nestet active trigger runnables are started
-	 * correctly.
-	 */
-	@Test
-	public void testActiveTriggerRunnables() throws InterruptedException {
-		TriggerPolicy<Integer> firstPolicy = new ActiveTriggerWithRunnable(1);
-		TriggerPolicy<Integer> secondPolicy = new ActiveTriggerWithRunnable(2);
-		TriggerPolicy<Integer> thirdPolicy = new ActiveTriggerWithRunnable(3);
-		@SuppressWarnings("unchecked")
-		ActiveTriggerPolicy<Integer> multiTrigger = new MultiTriggerPolicy<Integer>(firstPolicy,
-				secondPolicy, thirdPolicy);
-
-		MyCallbackClass cb = new MyCallbackClass(3);
-		Runnable runnable = multiTrigger.createActiveTriggerRunnable(cb);
-		new Thread(runnable).start();
-
-		assertTrue("Even after " + TIMEOUT + "ms not all active policy runnables were started.",
-				cb.check(TIMEOUT, 1, 2, 3));
-	}
-
-	private void arrayEqualityCheck(Object[] array1, Object[] array2) {
-		assertEquals(
-				"The result arrays must have the same length. (Expected: " + Arrays.asList(array1)
-						+ "; Actual: " + Arrays.asList(array2) + ")", array1.length, array2.length);
-		for (int i = 0; i < array1.length; i++) {
-			assertEquals("Unequal fields at position " + i + "(Expected: " + Arrays.asList(array1)
-					+ "; Actual: " + Arrays.asList(array2) + ")", array1[i], array2[i]);
-		}
-	}
-
-	/**
-	 * This helper class is used to simulate active triggers which produce own
-	 * runnables.
-	 */
-	@SuppressWarnings("serial")
-	private class ActiveTriggerWithRunnable implements ActiveTriggerPolicy<Integer> {
-
-		private final int id;
-
-		public ActiveTriggerWithRunnable(int id) {
-			this.id = id;
-		}
-
-		@Override
-		public boolean notifyTrigger(Integer datapoint) {
-			// This method is not uses for any test case
-			return false;
-		}
-
-		@Override
-		public Object[] preNotifyTrigger(Integer datapoint) {
-			// This method is not used for any test case
-			return null;
-		}
-
-		@Override
-		public Runnable createActiveTriggerRunnable(final ActiveTriggerCallback callback) {
-			return new Runnable() {
-				@Override
-				public void run() {
-					callback.sendFakeElement(id);
-				}
-			};
-		}
-	}
-
-	/**
-	 * This callback class is used to checked whether all nested policy runnable
-	 * started up.
-	 */
-	private class MyCallbackClass implements ActiveTriggerCallback {
-
-		private final Set<Integer> received = Sets
-				.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
-
-		private final CountDownLatch sync;
-
-		public MyCallbackClass(int numberOfExpectedElements) {
-			checkArgument(numberOfExpectedElements >= 0);
-			this.sync = new CountDownLatch(numberOfExpectedElements);
-		}
-
-		@Override
-		public void sendFakeElement(Object datapoint) {
-			received.add((Integer) datapoint);
-
-			sync.countDown();
-		}
-
-		public boolean check(int timeout, int... expectedIds) throws InterruptedException {
-			// Wait for all elements
-			sync.await(timeout, TimeUnit.MILLISECONDS);
-
-			// Check received all expected ids
-			assertEquals(expectedIds.length, received.size());
-
-			for (int id : expectedIds) {
-				if (!received.contains(id)) {
-					return false;
-				}
-			}
-
-			return true;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java
deleted file mode 100644
index fda9cd3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-import org.apache.flink.streaming.api.windowing.extractor.FieldFromTuple;
-import org.junit.Test;
-
-public class PunctuationPolicyTest {
-
-	// This value should not effect the policy. It is changed at each call to
-	// verify this.
-	private boolean triggered = false;
-
-	@Test
-	public void PunctuationTriggerTestWithoutExtraction() {
-		PunctuationPolicy<Object, Object> policy = new PunctuationPolicy<Object, Object>(
-				new TestObject(0));
-		assertTrue("The present punctuation was not detected. (POS 1)",
-				policy.notifyTrigger(new TestObject(0)));
-		assertFalse("There was a punctuation detected which wasn't present. (POS 2)",
-				policy.notifyTrigger(new TestObject(1)));
-		policy.toString();
-	}
-
-	@Test
-	public void PunctuationTriggerTestWithExtraction() {
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		PunctuationPolicy<Tuple2<Object, Object>, Object> policy = new PunctuationPolicy<Tuple2<Object, Object>, Object>(
-				new TestObject(0), new FieldFromTuple(0));
-		assertTrue("The present punctuation was not detected. (POS 3)",
-				policy.notifyTrigger(new Tuple2<Object, Object>(new TestObject(0),
-						new TestObject(1))));
-		assertFalse("There was a punctuation detected which wasn't present. (POS 4)",
-				policy.notifyTrigger(new Tuple2<Object, Object>(new TestObject(1),
-						new TestObject(0))));
-	}
-
-	@Test
-	public void PunctuationEvictionTestWithoutExtraction() {
-		// The current buffer size should not effect the test. It's therefore
-		// always 0 here.
-
-		PunctuationPolicy<Object, Object> policy = new PunctuationPolicy<Object, Object>(
-				new TestObject(0));
-		assertEquals(
-				"The present punctuation was not detected or the number of deleted tuples was wrong. (POS 5)",
-				0, policy.notifyEviction(new TestObject(0), (triggered = !triggered), 0));
-		for (int i = 0; i < 10; i++) {
-			for (int j = 0; j < i; j++) {
-				assertEquals("There was a punctuation detected which wasn't present. (POS 6)", 0,
-						policy.notifyEviction(new TestObject(1), (triggered = !triggered), 0));
-			}
-			assertEquals(
-					"The present punctuation was not detected or the number of deleted tuples was wrong. (POS 7)",
-					i + 1, policy.notifyEviction(new TestObject(0), (triggered = !triggered), 0));
-		}
-	}
-
-	@Test
-	public void PunctuationEvictionTestWithExtraction() {
-		// The current buffer size should not effect the test. It's therefore
-		// always 0 here.
-
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		PunctuationPolicy<Tuple2<Object, Object>, Object> policy = new PunctuationPolicy<Tuple2<Object, Object>, Object>(
-				new TestObject(0), new FieldFromTuple(0));
-		assertEquals(
-				"The present punctuation was not detected or the number of deleted tuples was wrong. (POS 10)",
-				0, policy.notifyEviction(new Tuple2<Object, Object>(new TestObject(0),
-						new TestObject(1)), (triggered = !triggered), 0));
-		for (int i = 0; i < 10; i++) {
-			for (int j = 0; j < i; j++) {
-				assertEquals("There was a punctuation detected which wasn't present. (POS 9)", 0,
-						policy.notifyEviction(new Tuple2<Object, Object>(new TestObject(1),
-								new TestObject(0)), (triggered = !triggered), 0));
-			}
-			assertEquals(
-					"The present punctuation was not detected or the number of deleted tuples was wrong. (POS 10)",
-					i + 1, policy.notifyEviction(new Tuple2<Object, Object>(new TestObject(0),
-							new TestObject(1)), (triggered = !triggered), 0));
-		}
-	}
-
-	@Test
-	public void testEquals() {
-		Extractor<Integer, Integer> extractor = new Extractor<Integer, Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer extract(Integer in) {
-				return in;
-			}
-		};
-
-		assertEquals(new PunctuationPolicy<Integer, Integer>(4),
-				new PunctuationPolicy<Integer, Integer>(4));
-		assertNotEquals(new PunctuationPolicy<Integer, Integer>(4),
-				new PunctuationPolicy<Integer, Integer>(5));
-
-		assertNotEquals(new PunctuationPolicy<Integer, Integer>(4, extractor),
-				new PunctuationPolicy<Integer, Integer>(4));
-
-		assertEquals(new PunctuationPolicy<Integer, Integer>(4, extractor),
-				new PunctuationPolicy<Integer, Integer>(4, extractor));
-
-		assertNotEquals(new PunctuationPolicy<Integer, Integer>(4),
-				new PunctuationPolicy<Integer, Integer>(4, extractor));
-
-	}
-
-	private class TestObject {
-
-		private int id;
-
-		public TestObject(int id) {
-			this.id = id;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (o instanceof TestObject && ((TestObject) o).getId() == this.id) {
-				return true;
-			} else {
-				return false;
-			}
-		}
-
-		public int getId() {
-			return id;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
deleted file mode 100644
index 08a5c32..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.LinkedList;
-
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.junit.Test;
-
-public class TimeEvictionPolicyTest {
-
-	@Test
-	public void timeEvictionTest() {
-		// create some test data
-		Integer[] times = { 1, 3, 4, 6, 7, 9, 14, 20, 21, 22, 30, 31, 33, 36, 40, 41, 42, 43, 44,
-				45, 47, 55 };
-		Integer[] numToDelete = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 0, 0, 0, 3 };
-
-		// create a timestamp
-		@SuppressWarnings("serial")
-		Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-
-		};
-
-		// test different granularity
-		for (long granularity = 0; granularity < 40; granularity++) {
-			// create policy
-			TimeEvictionPolicy<Integer> policy = new TimeEvictionPolicy<Integer>(granularity,
-					new TimestampWrapper<Integer>(timeStamp, 0));
-
-			// The trigger status should not effect the policy. Therefore, it's
-			// value is changed after each usage.
-			boolean triggered = false;
-
-			// The eviction should work similar with both, fake and real
-			// elements. Which kind is used is changed on every 3rd element in
-			// this test.
-			int fakeAndRealCounter = 0;
-			boolean fake = false;
-
-			// test by adding values
-			LinkedList<Integer> buffer = new LinkedList<Integer>();
-			for (int i = 0; i < times.length; i++) {
-
-				// check if the current element should be a fake
-				fakeAndRealCounter++;
-				if (fakeAndRealCounter > 2) {
-					fake = !fake;
-					fakeAndRealCounter = 0;
-				}
-
-				int result;
-
-				if (fake) {
-					// Notify eviction with fake element
-					result = policy.notifyEvictionWithFakeElement(times[i], buffer.size());
-				} else {
-					// Notify eviction with real element
-					result = policy.notifyEviction(times[i], (triggered = !triggered),
-							buffer.size());
-				}
-
-				// handle correctness of eviction
-				for (; result > 0 && !buffer.isEmpty(); result--) {
-					if (buffer.getFirst() <= times[i] - granularity) {
-						buffer.removeFirst();
-					} else {
-						fail("The policy wanted to evict time " + buffer.getFirst()
-								+ " while the current time was " + times[i]
-								+ "and the granularity was " + granularity);
-					}
-				}
-
-				// test that all required evictions have been done
-				if (!buffer.isEmpty()) {
-					assertTrue("The policy did not evict " + buffer.getFirst()
-							+ " while the current time was " + times[i]
-							+ " and the granularity was " + granularity,
-							(buffer.getFirst() >= times[i] - granularity));
-				}
-
-				// test influence of other evictions
-				for (int j = numToDelete[i % numToDelete.length]; j > 0; j--) {
-					if (!buffer.isEmpty()) {
-						buffer.removeFirst();
-					}
-				}
-
-				// add current element to buffer if it is no fake
-				if (!fake) {
-					buffer.add(times[i]);
-				}
-
-			}
-		}
-	}
-
-	@Test
-	public void equalsTest() {
-
-		@SuppressWarnings("serial")
-		Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-
-		};
-
-		@SuppressWarnings("serial")
-		Timestamp<Integer> timeStamp2 = new Timestamp<Integer>() {
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-
-		};
-
-		assertEquals(
-				new TimeEvictionPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp, 0)),
-				new TimeEvictionPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp, 0)));
-
-		assertNotEquals(new TimeEvictionPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
-				0)), new TimeEvictionPolicy<Integer>(5,
-				new TimestampWrapper<Integer>(timeStamp2, 0)));
-
-		assertNotEquals(new TimeEvictionPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
-				0)),
-				new TimeEvictionPolicy<Integer>(2, new TimestampWrapper<Integer>(timeStamp, 0)));
-
-		assertNotEquals(new TimeEvictionPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
-				0)),
-				new TimeEvictionPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp, 3)));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
deleted file mode 100644
index 5b26854..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.junit.Test;
-
-public class TimeTriggerPolicyTest {
-
-	@Test
-	public void timeTriggerRegularNotifyTest() {
-		// create some test data
-		Integer[] times = { 1, 3, 4, 6, 7, 9, 14, 20, 21, 22, 30 };
-
-		// create a timestamp
-		@SuppressWarnings("serial")
-		Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-
-		};
-
-		// test different granularity
-		for (long granularity = 0; granularity < 31; granularity++) {
-			// create policy
-
-			TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity,
-					new TimestampWrapper<Integer>(timeStamp, 0));
-
-			// remember window border
-			long currentTime = 0;
-
-			// test by adding values
-			for (int i = 0; i < times.length; i++) {
-				boolean result = policy.notifyTrigger(times[i]);
-				// start time is included, but end time is excluded: >=
-				if (times[i] >= currentTime + granularity) {
-					if (granularity != 0) {
-						currentTime = times[i] - ((times[i] - currentTime) % granularity);
-					}
-					assertTrue("The policy did not trigger at pos " + i + " (current time border: "
-							+ currentTime + "; current granularity: " + granularity
-							+ "; data point time: " + times[i] + ")", result);
-				} else {
-					assertFalse("The policy triggered wrong at pos " + i
-							+ " (current time border: " + currentTime + "; current granularity: "
-							+ granularity + "; data point time: " + times[i] + ")", result);
-				}
-			}
-		}
-
-	}
-
-	@Test
-	public void equalsTest() {
-
-		@SuppressWarnings("serial")
-		Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-
-		};
-
-		@SuppressWarnings("serial")
-		Timestamp<Integer> timeStamp2 = new Timestamp<Integer>() {
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-
-		};
-
-		assertEquals(
-				new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp, 0)),
-				new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp, 0)));
-
-		assertNotEquals(new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
-				0)),
-				new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp2, 0)));
-
-		assertNotEquals(new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
-				0)), new TimeTriggerPolicy<Integer>(2, new TimestampWrapper<Integer>(timeStamp, 0)));
-
-		assertNotEquals(new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
-				0)), new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp, 3)));
-
-		assertEquals(SystemTimestamp.getWrapper(), SystemTimestamp.getWrapper());
-	}
-
-	@Test
-	public void timeTriggerPreNotifyTest() {
-		// create some test data
-		Integer[] times = { 1, 3, 20, 26 };
-
-		// create a timestamp
-		@SuppressWarnings("serial")
-		Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-
-		};
-
-		// create policy
-		TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5,
-				new TimestampWrapper<Integer>(timeStamp, 0));
-
-		// expected result
-		Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } };
-
-		// call policy
-		for (int i = 0; i < times.length; i++) {
-			arrayEqualityCheck(result[i], policy.preNotifyTrigger(times[i]));
-			policy.notifyTrigger(times[i]);
-		}
-	}
-
-	private void arrayEqualityCheck(Object[] array1, Object[] array2) {
-		assertEquals("The result arrays must have the same length", array1.length, array2.length);
-		for (int i = 0; i < array1.length; i++) {
-			assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
deleted file mode 100644
index a3a7d73..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-
-public class TumblingEvictionPolicyTest {
-
-	@Test
-	public void testTumblingEviction() {
-		EvictionPolicy<Integer> policy = new TumblingEvictionPolicy<Integer>();
-
-		int counter = 0;
-
-		for (int i = 0; i < 10; i++) {
-			for (int j = 0; j < i; j++) {
-				assertEquals(0, policy.notifyEviction(0, false, counter++));
-			}
-			assertEquals(counter, policy.notifyEviction(0, true, counter));
-			counter = 1;
-		}
-
-		assertEquals(new TumblingEvictionPolicy<Integer>(), new TumblingEvictionPolicy<Integer>());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
deleted file mode 100644
index 6bc0e30..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Test;
-
-public class BasicWindowBufferTest {
-
-	@Test
-	public void testEmitWindow() throws Exception {
-
-		TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-		List<StreamWindow<Integer>> collected = collector.getCollected();
-
-		WindowBuffer<Integer> wb = new BasicWindowBuffer<Integer>();
-
-		wb.store(2);
-		wb.store(10);
-
-		wb.emitWindow(collector);
-
-		assertEquals(1, collected.size());
-		assertEquals(StreamWindow.fromElements(2, 10), collected.get(0));
-
-		wb.store(4);
-		wb.evict(2);
-
-		wb.emitWindow(collector);
-
-		assertEquals(2, collected.size());
-		assertEquals(StreamWindow.fromElements(4), collected.get(1));
-
-		wb.evict(1);
-
-		wb.emitWindow(collector);
-		assertEquals(2, collected.size());
-	}
-
-	public static class TestOutput<T> implements Output<StreamRecord<T>> {
-
-		private final List<T> collected = new ArrayList<T>();
-
-		@Override
-		public void collect(StreamRecord<T> record) {
-			collected.add(record.getValue());
-		}
-
-		@Override
-		public void close() {
-		}
-
-		public List<T> getCollected() {
-			return collected;
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
deleted file mode 100644
index 8430499..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-
-import org.junit.Test;
-
-public class JumpingCountGroupedPreReducerTest {
-
-	TypeInformation<Tuple2<Integer, Integer>> type = TypeExtractor
-			.getForObject(new Tuple2<Integer, Integer>(1, 1));
-	TypeSerializer<Tuple2<Integer, Integer>> serializer = type.createSerializer(null);
-
-	KeySelector<Tuple2<Integer, Integer>, ?> key = KeySelectorUtil.getSelectorForKeys(
-			new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[] { 0 }, type), type, null);
-
-	Reducer reducer = new Reducer();
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testEmitWindow() throws Exception {
-
-		List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
-		inputs.add(new Tuple2<Integer, Integer>(1, 1));
-		inputs.add(new Tuple2<Integer, Integer>(0, 0));
-		inputs.add(new Tuple2<Integer, Integer>(1, -1));
-		inputs.add(new Tuple2<Integer, Integer>(1, -2));
-		inputs.add(new Tuple2<Integer, Integer>(100, -200));
-
-		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
-		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
-
-		WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(
-				reducer, key, serializer, 1);
-
-		wb.store(serializer.copy(inputs.get(4)));
-		wb.store(serializer.copy(inputs.get(0)));
-		wb.store(serializer.copy(inputs.get(1)));
-		wb.emitWindow(collector);
-
-		assertEquals(1, collected.size());
-
-		assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(1, 1),
-				new Tuple2<Integer, Integer>(0, 0)), collected.get(0));
-
-		wb.store(serializer.copy(inputs.get(4)));
-		wb.store(serializer.copy(inputs.get(0)));
-		wb.store(serializer.copy(inputs.get(1)));
-		wb.store(serializer.copy(inputs.get(2)));
-
-		// Nothing should happen here
-		wb.evict(3);
-
-		wb.store(serializer.copy(inputs.get(3)));
-
-		wb.emitWindow(collector);
-
-		assertEquals(2, collected.size());
-
-		assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, -2),
-				new Tuple2<Integer, Integer>(0, 0)), collected.get(1));
-
-		// Test whether function is mutating inputs or not
-		assertEquals(2, reducer.allInputs.size());
-		assertEquals(reducer.allInputs.get(0), inputs.get(2));
-		assertEquals(reducer.allInputs.get(1), inputs.get(3));
-
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testEmitWindow2() throws Exception {
-
-		List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
-		inputs.add(new Tuple2<Integer, Integer>(1, 1));
-		inputs.add(new Tuple2<Integer, Integer>(0, 0));
-		inputs.add(new Tuple2<Integer, Integer>(1, -1));
-		inputs.add(new Tuple2<Integer, Integer>(1, -2));
-		inputs.add(new Tuple2<Integer, Integer>(100, -200));
-
-		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
-		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
-
-		WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(
-				reducer, key, serializer, 1).sequentialID();
-
-		wb.store(serializer.copy(inputs.get(4)));
-		wb.store(serializer.copy(inputs.get(0)));
-		wb.store(serializer.copy(inputs.get(1)));
-		wb.emitWindow(collector);
-
-		assertSetEquals(StreamWindow.fromElements(inputs.get(0), inputs.get(1)), collected.get(0));
-
-		wb.store(serializer.copy(inputs.get(4)));
-		wb.store(serializer.copy(inputs.get(0)));
-		wb.store(serializer.copy(inputs.get(1)));
-		wb.store(serializer.copy(inputs.get(2)));
-		wb.emitWindow(collector);
-
-		assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(2, 0), inputs.get(1)), collected.get(1));
-
-
-	}
-
-	private static <T> void assertSetEquals(Collection<T> first, Collection<T> second) {
-		assertEquals(new HashSet<T>(first), new HashSet<T>(second));
-	}
-
-	@SuppressWarnings("serial")
-	private class Reducer implements ReduceFunction<Tuple2<Integer, Integer>> {
-
-		public List<Tuple2<Integer, Integer>> allInputs = new ArrayList<Tuple2<Integer, Integer>>();
-
-		@Override
-		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
-											   Tuple2<Integer, Integer> value2) throws Exception {
-			allInputs.add(value2);
-			value1.f0 = value1.f0 + value2.f0;
-			value1.f1 = value1.f1 + value2.f1;
-			return value1;
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
deleted file mode 100644
index 2279264..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class JumpingCountPreReducerTest {
-
-	TypeSerializer<Tuple2<Integer, Integer>> serializer = TypeExtractor.getForObject(
-			new Tuple2<Integer, Integer>(1, 1)).createSerializer(null);
-
-	Reducer reducer = new Reducer();
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testEmitWindow() throws Exception {
-
-		List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
-		inputs.add(new Tuple2<Integer, Integer>(1, 1));
-		inputs.add(new Tuple2<Integer, Integer>(2, 0));
-		inputs.add(new Tuple2<Integer, Integer>(3, -1));
-		inputs.add(new Tuple2<Integer, Integer>(4, -2));
-		inputs.add(new Tuple2<Integer, Integer>(5, -3));
-
-		TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
-		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
-
-		WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountPreReducer<Tuple2<Integer, Integer>>(
-				reducer, serializer, 2);
-
-		wb.store(serializer.copy(inputs.get(0)));
-		wb.store(serializer.copy(inputs.get(1)));
-		wb.store(serializer.copy(inputs.get(2)));
-		wb.store(serializer.copy(inputs.get(3)));
-		wb.store(serializer.copy(inputs.get(4)));
-
-		wb.emitWindow(collector);
-
-		assertEquals(1, collected.size());
-		assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(12, -6)),
-				collected.get(0));
-
-		wb.store(serializer.copy(inputs.get(0)));
-		wb.store(serializer.copy(inputs.get(1)));
-		wb.store(serializer.copy(inputs.get(2)));
-
-		// Nothing should happen here
-		wb.evict(3);
-
-		wb.store(serializer.copy(inputs.get(3)));
-
-		wb.emitWindow(collector);
-
-		assertEquals(2, collected.size());
-		assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(7, -3)),
-				collected.get(1));
-
-		// Test whether function is mutating inputs or not
-		assertEquals(3, reducer.allInputs.size());
-		assertEquals(reducer.allInputs.get(0), inputs.get(3));
-		assertEquals(reducer.allInputs.get(1), inputs.get(4));
-		assertEquals(reducer.allInputs.get(2), inputs.get(3));
-	}
-
-	@SuppressWarnings("serial")
-	private class Reducer implements ReduceFunction<Tuple2<Integer, Integer>> {
-
-		public List<Tuple2<Integer, Integer>> allInputs = new ArrayList<Tuple2<Integer, Integer>>();
-
-		@Override
-		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
-											   Tuple2<Integer, Integer> value2) throws Exception {
-			allInputs.add(value2);
-			value1.f0 = value1.f0 + value2.f0;
-			value1.f1 = value1.f1 + value2.f1;
-			return value1;
-		}
-
-	}
-
-}