You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:42 UTC

[26/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
deleted file mode 100644
index 3b098c3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ConcatenatedExtractTest {
-
-	private String[] testStringArray1 = { "1", "2", "3" };
-	private int[] testIntArray1 = { 1, 2, 3 };
-	private String[] testStringArray2 = { "4", "5", "6" };
-	private int[] testIntArray2 = { 4, 5, 6 };
-	private String[] testStringArray3 = { "7", "8", "9" };
-	private int[] testIntArray3 = { 7, 8, 9 };
-	private Tuple2<String[], int[]>[] testTuple2Array;
-	private Tuple2<String[], int[]> testTuple2;
-	private Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]> testData;
-
-	@SuppressWarnings("unchecked")
-	@Before
-	public void setupData() {
-		testTuple2Array = new Tuple2[2];
-		testTuple2Array[0] = new Tuple2<String[], int[]>(testStringArray1, testIntArray2);
-		testTuple2Array[1] = new Tuple2<String[], int[]>(testStringArray2, testIntArray1);
-
-		testTuple2 = new Tuple2<String[], int[]>(testStringArray3, testIntArray3);
-
-		testData = new Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]>(testTuple2,
-				testTuple2Array);
-	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void test1() {
-		Extractor ext = new ConcatenatedExtract(new FieldFromTuple(0), new FieldFromTuple(1))
-				.add(new FieldsFromArray(Integer.class, 2, 1, 0));
-		int[] expected = { testIntArray3[2], testIntArray3[1], testIntArray3[0] };
-		assertEquals(new Integer(expected[0]), ((Integer[]) ext.extract(testData))[0]);
-		assertEquals(new Integer(expected[1]), ((Integer[]) ext.extract(testData))[1]);
-		assertEquals(new Integer(expected[2]), ((Integer[]) ext.extract(testData))[2]);
-	}
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	@Test
-	public void test2() {
-		Extractor ext = new ConcatenatedExtract(new FieldFromTuple(1), // Tuple2<String[],int[]>[]
-				new FieldsFromArray(Tuple2.class, 1)) // Tuple2<String[],int[]>[]
-				.add(new FieldFromArray(0)) // Tuple2<String[],int[]>
-				.add(new ArrayFromTuple(0)) // Object[] (Containing String[])
-				.add(new FieldFromArray(0)) // String[]
-				.add(new FieldFromArray(1)); // String
-
-		String expected2 = testStringArray2[1];
-		assertEquals(expected2, ext.extract(testData));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
deleted file mode 100644
index d274f4e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class FieldFromArrayTest {
-
-	String[] testStringArray = { "0", "1", "2", "3", "4" };
-	Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
-	int[] testIntArray = { 20, 21, 22, 23, 24 };
-
-	@Test
-	public void testStringArray() {
-		for (int i = 0; i < this.testStringArray.length; i++) {
-			assertEquals(this.testStringArray[i],
-					new FieldFromArray<String>(i).extract(testStringArray));
-		}
-	}
-
-	@Test
-	public void testIntegerArray() {
-		for (int i = 0; i < this.testIntegerArray.length; i++) {
-			assertEquals(this.testIntegerArray[i],
-					new FieldFromArray<String>(i).extract(testIntegerArray));
-		}
-	}
-
-	@Test
-	public void testIntArray() {
-		for (int i = 0; i < this.testIntArray.length; i++) {
-			assertEquals(new Integer(this.testIntArray[i]),
-					new FieldFromArray<Integer>(i).extract(testIntArray));
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
deleted file mode 100644
index c05f281..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTupleTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class FieldFromTupleTest {
-
-	private String[] testStrings;
-
-	@Before
-	public void init() {
-		testStrings = new String[Tuple.MAX_ARITY];
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			testStrings[i] = Integer.toString(i);
-		}
-	}
-
-	@Test
-	public void testSingleFieldExtraction() throws InstantiationException, IllegalAccessException {
-		// extract single fields
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			Tuple current = (Tuple) CLASSES[i].newInstance();
-			for (int j = 0; j < i; j++) {
-				current.setField(testStrings[j], j);
-			}
-			for (int j = 0; j < i; j++) {
-				assertEquals(testStrings[j], new FieldFromTuple<String>(j).extract(current));
-			}
-		}
-	}
-
-	private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
-			Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
-			Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
-			Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
-			Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
-			Tuple24.class, Tuple25.class };
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
deleted file mode 100644
index 7a9a716..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArrayTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.FieldsFromArray;
-import org.junit.Test;
-
-public class FieldsFromArrayTest {
-
-	String[] testStringArray = { "0", "1", "2", "3", "4" };
-	Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
-	int[] testIntArray = { 20, 21, 22, 23, 24 };
-
-	@Test
-	public void testStringArray() {
-		// check single field extraction
-		for (int i = 0; i < testStringArray.length; i++) {
-			String[] tmp = { testStringArray[i] };
-			arrayEqualityCheck(tmp,
-					new FieldsFromArray<String>(String.class, i).extract(testStringArray));
-		}
-
-		// check reverse order
-		String[] reverseOrder = new String[testStringArray.length];
-		for (int i = 0; i < testStringArray.length; i++) {
-			reverseOrder[i] = testStringArray[testStringArray.length - i - 1];
-		}
-		arrayEqualityCheck(reverseOrder,
-				new FieldsFromArray<String>(String.class, 4, 3, 2, 1, 0).extract(testStringArray));
-
-		// check picking fields and reorder
-		String[] crazyOrder = { testStringArray[4], testStringArray[1], testStringArray[2] };
-		arrayEqualityCheck(crazyOrder,
-				new FieldsFromArray<String>(String.class, 4, 1, 2).extract(testStringArray));
-	}
-
-	@Test
-	public void testIntegerArray() {
-		// check single field extraction
-		for (int i = 0; i < testIntegerArray.length; i++) {
-			Integer[] tmp = { testIntegerArray[i] };
-			arrayEqualityCheck(tmp,
-					new FieldsFromArray<Integer>(Integer.class, i).extract(testIntegerArray));
-		}
-
-		// check reverse order
-		Integer[] reverseOrder = new Integer[testIntegerArray.length];
-		for (int i = 0; i < testIntegerArray.length; i++) {
-			reverseOrder[i] = testIntegerArray[testIntegerArray.length - i - 1];
-		}
-		arrayEqualityCheck(reverseOrder,
-				new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0)
-						.extract(testIntegerArray));
-
-		// check picking fields and reorder
-		Integer[] crazyOrder = { testIntegerArray[4], testIntegerArray[1], testIntegerArray[2] };
-		arrayEqualityCheck(crazyOrder,
-				new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntegerArray));
-
-	}
-
-	@Test
-	public void testIntArray() {
-		for (int i = 0; i < testIntArray.length; i++) {
-			Integer[] tmp = { testIntArray[i] };
-			arrayEqualityCheck(tmp,
-					new FieldsFromArray<Integer>(Integer.class, i).extract(testIntArray));
-		}
-
-		// check reverse order
-		Integer[] reverseOrder = new Integer[testIntArray.length];
-		for (int i = 0; i < testIntArray.length; i++) {
-			reverseOrder[i] = testIntArray[testIntArray.length - i - 1];
-		}
-		arrayEqualityCheck(reverseOrder,
-				new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0).extract(testIntArray));
-
-		// check picking fields and reorder
-		Integer[] crazyOrder = { testIntArray[4], testIntArray[1], testIntArray[2] };
-		arrayEqualityCheck(crazyOrder,
-				new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntArray));
-
-	}
-
-	private void arrayEqualityCheck(Object[] array1, Object[] array2) {
-		assertEquals("The result arrays must have the same length", array1.length, array2.length);
-		for (int i = 0; i < array1.length; i++) {
-			assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
deleted file mode 100644
index 025ed8a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTupleTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.FieldsFromTuple;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldsFromTupleTest {
-
-	private double[] testDouble;
-
-	@Before
-	public void init() {
-		testDouble = new double[Tuple.MAX_ARITY];
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			testDouble[i] = i;
-		}
-	}
-
-	@Test
-	public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException {
-		Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance();
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			currentTuple.setField(testDouble[i], i);
-		}
-
-		double[] expected = { testDouble[5], testDouble[3], testDouble[6], testDouble[7],
-				testDouble[0] };
-		arrayEqualityCheck(expected, new FieldsFromTuple(5, 3, 6, 7, 0).extract(currentTuple));
-
-		double[] expected2 = { testDouble[0], testDouble[Tuple.MAX_ARITY - 1] };
-		arrayEqualityCheck(expected2,
-				new FieldsFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple));
-
-		double[] expected3 = { testDouble[Tuple.MAX_ARITY - 1], testDouble[0] };
-		arrayEqualityCheck(expected3,
-				new FieldsFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple));
-
-		double[] expected4 = { testDouble[13], testDouble[4], testDouble[5], testDouble[4],
-				testDouble[2], testDouble[8], testDouble[6], testDouble[2], testDouble[8],
-				testDouble[3], testDouble[5], testDouble[2], testDouble[16], testDouble[4],
-				testDouble[3], testDouble[2], testDouble[6], testDouble[4], testDouble[7],
-				testDouble[4], testDouble[2], testDouble[8], testDouble[7], testDouble[2] };
-		arrayEqualityCheck(expected4, new FieldsFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16,
-				4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple));
-	}
-
-	private void arrayEqualityCheck(double[] array1, double[] array2) {
-		assertEquals("The result arrays must have the same length", array1.length, array2.length);
-		for (int i = 0; i < array1.length; i++) {
-			assertEquals("Unequal fields at position " + i, array1[i], array2[i], 0d);
-		}
-	}
-
-	private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
-			Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
-			Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
-			Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
-			Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
-			Tuple24.class, Tuple25.class };
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
deleted file mode 100644
index 8038cfb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import static org.junit.Assert.*;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class SlotAllocationTest {
-	
-	@Test
-	public void test() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		FilterFunction<Long> dummyFilter = new FilterFunction<Long>() {
-			@Override
-			public boolean filter(Long value) { return false; }
-		};
-
-		env.generateSequence(1, 10).filter(dummyFilter).isolateResources().filter(dummyFilter)
-				.disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter)
-				.startNewChain().print();
-
-		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-
-		List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-
-		assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup());
-		assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(1).getSlotSharingGroup());
-		assertNotEquals(vertices.get(2).getSlotSharingGroup(), vertices.get(3).getSlotSharingGroup());
-		assertEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
deleted file mode 100644
index c316604..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.graph;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.ConnectedStreams;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.util.EvenOddOutputSelector;
-import org.apache.flink.streaming.util.NoOpIntMap;
-import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link StreamGraphGenerator}. This only tests correct translation of split/select,
- * union, partitioning since the other translation routines are tested already in operation
- * specific tests, for example in {@link org.apache.flink.streaming.api.IterateTest} for
- * iterations.
- */
-public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase {
-
-	/**
-	 * This tests whether virtual Transformations behave correctly.
-	 *
-	 * <p>
-	 * Verifies that partitioning, output selector, selected names are correctly set in the
-	 * StreamGraph when they are intermixed.
-	 */
-	@Test
-	public void testVirtualTransformations() throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Integer> source = env.fromElements(1, 10);
-
-		DataStream<Integer> rebalanceMap = source.rebalance().map(new NoOpIntMap());
-
-		// verify that only the partitioning that was set last is used
-		DataStream<Integer> broadcastMap = rebalanceMap
-				.forward()
-				.global()
-				.broadcast()
-				.map(new NoOpIntMap());
-
-		broadcastMap.addSink(new NoOpSink<Integer>());
-
-		// verify that partitioning is preserved across union and split/select
-		EvenOddOutputSelector selector1 = new EvenOddOutputSelector();
-		EvenOddOutputSelector selector2 = new EvenOddOutputSelector();
-		EvenOddOutputSelector selector3 = new EvenOddOutputSelector();
-
-		DataStream<Integer> map1Operator = rebalanceMap
-				.map(new NoOpIntMap());
-
-		DataStream<Integer> map1 = map1Operator
-				.broadcast()
-				.split(selector1)
-				.select("even");
-
-		DataStream<Integer> map2Operator = rebalanceMap
-				.map(new NoOpIntMap());
-
-		DataStream<Integer> map2 = map2Operator
-				.split(selector2)
-				.select("odd")
-				.global();
-
-		DataStream<Integer> map3Operator = rebalanceMap
-				.map(new NoOpIntMap());
-
-		DataStream<Integer> map3 = map3Operator
-				.global()
-				.split(selector3)
-				.select("even")
-				.shuffle();
-
-
-		SingleOutputStreamOperator<Integer, ?> unionedMap = map1.union(map2).union(map3)
-				.map(new NoOpIntMap());
-
-		unionedMap.addSink(new NoOpSink<Integer>());
-
-		StreamGraph graph = env.getStreamGraph();
-
-		// rebalanceMap
-		assertTrue(graph.getStreamNode(rebalanceMap.getId()).getInEdges().get(0).getPartitioner() instanceof RebalancePartitioner);
-
-		// verify that only last partitioning takes precedence
-		assertTrue(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
-		assertEquals(rebalanceMap.getId(), graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getSourceVertex().getId());
-
-		// verify that partitioning in unions is preserved and that it works across split/select
-		assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
-		assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
-		assertTrue(graph.getStreamNode(map1Operator.getId()).getOutputSelectors().contains(selector1));
-
-		assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof GlobalPartitioner);
-		assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("odd"));
-		assertTrue(graph.getStreamNode(map2Operator.getId()).getOutputSelectors().contains(selector2));
-
-		assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof ShufflePartitioner);
-		assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
-		assertTrue(graph.getStreamNode(map3Operator.getId()).getOutputSelectors().contains(selector3));
-	}
-
-	/**
-	 * This tests whether virtual Transformations behave correctly.
-	 *
-	 * Checks whether output selector, partitioning works correctly when applied on a union.
-	 */
-	@Test
-	public void testVirtualTransformations2() throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Integer> source = env.fromElements(1, 10);
-
-		DataStream<Integer> rebalanceMap = source.rebalance().map(new NoOpIntMap());
-
-		DataStream<Integer> map1 = rebalanceMap
-				.map(new NoOpIntMap());
-
-		DataStream<Integer> map2 = rebalanceMap
-				.map(new NoOpIntMap());
-
-		DataStream<Integer> map3 = rebalanceMap
-				.map(new NoOpIntMap());
-
-		EvenOddOutputSelector selector = new EvenOddOutputSelector();
-
-		SingleOutputStreamOperator<Integer, ?> unionedMap = map1.union(map2).union(map3)
-				.broadcast()
-				.split(selector)
-				.select("foo")
-				.map(new NoOpIntMap());
-
-		unionedMap.addSink(new NoOpSink<Integer>());
-
-		StreamGraph graph = env.getStreamGraph();
-
-		// verify that the properties are correctly set on all input operators
-		assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
-		assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
-		assertTrue(graph.getStreamNode(map1.getId()).getOutputSelectors().contains(selector));
-
-		assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
-		assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
-		assertTrue(graph.getStreamNode(map2.getId()).getOutputSelectors().contains(selector));
-
-		assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
-		assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
-		assertTrue(graph.getStreamNode(map3.getId()).getOutputSelectors().contains(selector));
-
-	}
-
-	/**
-	 * Test whether an {@link OutputTypeConfigurable} implementation gets called with the correct
-	 * output type. In this test case the output type must be BasicTypeInfo.INT_TYPE_INFO.
-	 *
-	 * @throws Exception
-	 */
-	@Test
-	public void testOutputTypeConfigurationWithOneInputTransformation() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Integer> source = env.fromElements(1, 10);
-
-		OutputTypeConfigurableOperationWithOneInput outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithOneInput();
-
-		DataStream<Integer> result = source.transform(
-			"Single input and output type configurable operation",
-			BasicTypeInfo.INT_TYPE_INFO,
-			outputTypeConfigurableOperation);
-
-		result.addSink(new NoOpSink<Integer>());
-
-		StreamGraph graph = env.getStreamGraph();
-
-		assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation());
-	}
-
-	@Test
-	public void testOutputTypeConfigurationWithTwoInputTransformation() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Integer> source1 = env.fromElements(1, 10);
-		DataStream<Integer> source2 = env.fromElements(2, 11);
-
-		ConnectedStreams<Integer, Integer> connectedSource = source1.connect(source2);
-
-		OutputTypeConfigurableOperationWithTwoInputs outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithTwoInputs();
-
-		DataStream<Integer> result = connectedSource.transform(
-				"Two input and output type configurable operation",
-				BasicTypeInfo.INT_TYPE_INFO,
-				outputTypeConfigurableOperation);
-
-		result.addSink(new NoOpSink<Integer>());
-
-		StreamGraph graph = env.getStreamGraph();
-
-		assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation());
-	}
-
-	private static class OutputTypeConfigurableOperationWithTwoInputs
-			extends AbstractStreamOperator<Integer>
-			implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer> {
-
-		TypeInformation<Integer> tpeInformation;
-
-		public TypeInformation<Integer> getTypeInformation() {
-			return tpeInformation;
-		}
-
-		@Override
-		public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
-			tpeInformation = outTypeInfo;
-		}
-
-		@Override
-		public void processElement1(StreamRecord element) throws Exception {
-			output.collect(element);
-		}
-
-		@Override
-		public void processElement2(StreamRecord element) throws Exception {
-			output.collect(element);
-		}
-
-		@Override
-		public void processWatermark1(Watermark mark) throws Exception {}
-
-		@Override
-		public void processWatermark2(Watermark mark) throws Exception {}
-
-		@Override
-		public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Integer>> output) {}
-	}
-
-	private static class OutputTypeConfigurableOperationWithOneInput
-			extends AbstractStreamOperator<Integer>
-			implements OneInputStreamOperator<Integer, Integer>, OutputTypeConfigurable<Integer> {
-
-		TypeInformation<Integer> tpeInformation;
-
-		public TypeInformation<Integer> getTypeInformation() {
-			return tpeInformation;
-		}
-
-		@Override
-		public void processElement(StreamRecord<Integer> element) throws Exception {
-			output.collect(element);
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-
-		}
-
-		@Override
-		public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
-			tpeInformation = outTypeInfo;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
deleted file mode 100644
index e806428..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.graph;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamingJobGraphGeneratorTest extends StreamingMultipleProgramsTestBase {
-	private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGeneratorTest.class);
-	
-	@Test
-	public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException {
-		final long seed = System.currentTimeMillis();
-		LOG.info("Test seed: {}", new Long(seed));
-		final Random r = new Random(seed);
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		StreamGraph streamingJob = new StreamGraph(env);
-		StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
-		
-		boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
-		int dop = 1 + r.nextInt(10);
-		
-		ExecutionConfig config = streamingJob.getExecutionConfig();
-		if(closureCleanerEnabled) {
-			config.enableClosureCleaner();
-		} else {
-			config.disableClosureCleaner();
-		}
-		if(forceAvroEnabled) {
-			config.enableForceAvro();
-		} else {
-			config.disableForceAvro();
-		}
-		if(forceKryoEnabled) {
-			config.enableForceKryo();
-		} else {
-			config.disableForceKryo();
-		}
-		if(objectReuseEnabled) {
-			config.enableObjectReuse();
-		} else {
-			config.disableObjectReuse();
-		}
-		if(sysoutLoggingEnabled) {
-			config.enableSysoutLogging();
-		} else {
-			config.disableSysoutLogging();
-		}
-		config.setParallelism(dop);
-		
-		JobGraph jobGraph = compiler.createJobGraph("test");
-		ExecutionConfig executionConfig = (ExecutionConfig) InstantiationUtil.readObjectFromConfig(
-				jobGraph.getJobConfiguration(),
-				ExecutionConfig.CONFIG_KEY,
-				Thread.currentThread().getContextClassLoader());
-		
-		Assert.assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
-		Assert.assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());
-		Assert.assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled());
-		Assert.assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled());
-		Assert.assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled());
-		Assert.assertEquals(dop, executionConfig.getParallelism());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
deleted file mode 100644
index dc8024c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamCounter}. These test that:
- *
- * <ul>
- *     <li>Timestamps of processed elements match the input timestamp</li>
- *     <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class StreamCounterTest {
-
-	@Test
-	public void testCount() throws Exception {
-		StreamCounter<String> operator = new StreamCounter<String>();
-
-		OneInputStreamOperatorTestHarness<String, Long> testHarness = new OneInputStreamOperatorTestHarness<String, Long>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<String>("eins", initialTime + 1));
-		testHarness.processElement(new StreamRecord<String>("zwei", initialTime + 2));
-		testHarness.processWatermark(new Watermark(initialTime + 2));
-		testHarness.processElement(new StreamRecord<String>("drei", initialTime + 3));
-
-		expectedOutput.add(new StreamRecord<Long>(1L, initialTime + 1));
-		expectedOutput.add(new StreamRecord<Long>(2L, initialTime + 2));
-		expectedOutput.add(new Watermark(initialTime + 2));
-		expectedOutput.add(new StreamRecord<Long>(3L, initialTime + 3));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
deleted file mode 100644
index 047aad8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamFilter}. These test that:
- *
- * <ul>
- *     <li>RichFunction methods are called correctly</li>
- *     <li>Timestamps of processed elements match the input timestamp</li>
- *     <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class StreamFilterTest {
-
-	static class MyFilter implements FilterFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Integer value) throws Exception {
-			return value % 2 == 0;
-		}
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testFilter() throws Exception {
-		StreamFilter<Integer> operator = new StreamFilter<Integer>(new MyFilter());
-
-		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
-		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
-		testHarness.processWatermark(new Watermark(initialTime + 2));
-		testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
-		testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
-		testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
-		testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
-		testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
-
-		expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
-		expectedOutput.add(new Watermark(initialTime + 2));
-		expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
-		expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-	}
-
-	@Test
-	public void testOpenClose() throws Exception {
-		StreamFilter<String> operator = new StreamFilter<String>(new TestOpenCloseFilterFunction());
-
-		OneInputStreamOperatorTestHarness<String, String> testHarness = new OneInputStreamOperatorTestHarness<String, String>(operator);
-
-		long initialTime = 0L;
-
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<String>("fooHello", initialTime));
-		testHarness.processElement(new StreamRecord<String>("bar", initialTime));
-
-		testHarness.close();
-
-		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFilterFunction.closeCalled);
-		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
-	}
-
-	// This must only be used in one test, otherwise the static fields will be changed
-	// by several tests concurrently
-	private static class TestOpenCloseFilterFunction extends RichFilterFunction<String> {
-		private static final long serialVersionUID = 1L;
-
-		public static boolean openCalled = false;
-		public static boolean closeCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			if (closeCalled) {
-				Assert.fail("Close called before open.");
-			}
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			if (!openCalled) {
-				Assert.fail("Open was not called before close.");
-			}
-			closeCalled = true;
-		}
-
-		@Override
-		public boolean filter(String value) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
-			}
-			return value.startsWith("foo");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
deleted file mode 100644
index e4e29c1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamMap}. These test that:
- *
- * <ul>
- *     <li>RichFunction methods are called correctly</li>
- *     <li>Timestamps of processed elements match the input timestamp</li>
- *     <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class StreamFlatMapTest {
-
-	public static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Integer value, Collector<Integer> out) throws Exception {
-			if (value % 2 == 0) {
-				out.collect(value);
-				out.collect(value * value);
-			}
-		}
-	}
-
-	@Test
-	public void testFlatMap() throws Exception {
-		StreamFlatMap<Integer, Integer> operator = new StreamFlatMap<Integer, Integer>(new MyFlatMap());
-
-		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
-		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
-		testHarness.processWatermark(new Watermark(initialTime + 2));
-		testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
-		testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
-		testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
-		testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
-		testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
-		testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));
-
-		expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
-		expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));
-		expectedOutput.add(new Watermark(initialTime + 2));
-		expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
-		expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));
-		expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
-		expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));
-		expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));
-		expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-	}
-
-	@Test
-	public void testOpenClose() throws Exception {
-		StreamFlatMap<String, String> operator = new StreamFlatMap<String, String>(new TestOpenCloseFlatMapFunction());
-
-		OneInputStreamOperatorTestHarness<String, String> testHarness = new OneInputStreamOperatorTestHarness<String, String>(operator);
-
-		long initialTime = 0L;
-
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<String>("Hello", initialTime));
-
-		testHarness.close();
-
-		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFlatMapFunction.closeCalled);
-		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
-	}
-
-	// This must only be used in one test, otherwise the static fields will be changed
-	// by several tests concurrently
-	private static class TestOpenCloseFlatMapFunction extends RichFlatMapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		public static boolean openCalled = false;
-		public static boolean closeCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			if (closeCalled) {
-				Assert.fail("Close called before open.");
-			}
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			if (!openCalled) {
-				Assert.fail("Open was not called before close.");
-			}
-			closeCalled = true;
-		}
-
-		@Override
-		public void flatMap(String value, Collector<String> out) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
-			}
-			out.collect(value);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
deleted file mode 100644
index f6e7e6b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.RichFoldFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link StreamGroupedFold}. These test that:
- *
- * <ul>
- *     <li>RichFunction methods are called correctly</li>
- *     <li>Timestamps of processed elements match the input timestamp</li>
- *     <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-@SuppressWarnings("serial")
-public class StreamGroupedFoldTest {
-
-	private static class MyFolder implements FoldFunction<Integer, String> {
-
-		@Override
-		public String fold(String accumulator, Integer value) throws Exception {
-			return accumulator + value.toString();
-		}
-	}
-
-	@Test
-	public void testGroupedFold() throws Exception {
-
-		KeySelector<Integer, String> keySelector = new KeySelector<Integer, String>() {
-			
-			@Override
-			public String getKey(Integer value) {
-				return value.toString();
-			}
-		};
-		
-		StreamGroupedFold<Integer, String, String> operator = new StreamGroupedFold<>(new MyFolder(), "100");
-		operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
-
-		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
-		testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
-		testHarness.processElement(new StreamRecord<>(1, initialTime + 2));
-		testHarness.processWatermark(new Watermark(initialTime + 2));
-		testHarness.processElement(new StreamRecord<>(2, initialTime + 3));
-		testHarness.processElement(new StreamRecord<>(2, initialTime + 4));
-		testHarness.processElement(new StreamRecord<>(3, initialTime + 5));
-
-		expectedOutput.add(new StreamRecord<>("1001", initialTime + 1));
-		expectedOutput.add(new StreamRecord<>("10011", initialTime + 2));
-		expectedOutput.add(new Watermark(initialTime + 2));
-		expectedOutput.add(new StreamRecord<>("1002", initialTime + 3));
-		expectedOutput.add(new StreamRecord<>("10022", initialTime + 4));
-		expectedOutput.add(new StreamRecord<>("1003", initialTime + 5));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-	}
-
-	@Test
-	public void testOpenClose() throws Exception {
-		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
-			@Override
-			public Integer getKey(Integer value) {
-				return value;
-			}
-		};
-		
-		StreamGroupedFold<Integer, String, Integer> operator = new StreamGroupedFold<>(
-				new TestOpenCloseFoldFunction(), "init");
-		operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
-
-		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
-		testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
-		
-		
-		long initialTime = 0L;
-
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<>(1, initialTime));
-		testHarness.processElement(new StreamRecord<>(2, initialTime));
-
-		testHarness.close();
-
-		assertTrue("RichFunction methods where not called.", TestOpenCloseFoldFunction.closeCalled);
-		assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
-	}
-
-	// This must only be used in one test, otherwise the static fields will be changed
-	// by several tests concurrently
-	private static class TestOpenCloseFoldFunction extends RichFoldFunction<Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		public static boolean openCalled = false;
-		public static boolean closeCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			if (closeCalled) {
-				fail("Close called before open.");
-			}
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			if (!openCalled) {
-				fail("Open was not called before close.");
-			}
-			closeCalled = true;
-		}
-
-		@Override
-		public String fold(String acc, Integer in) throws Exception {
-			if (!openCalled) {
-				fail("Open was not called before run.");
-			}
-			return acc + in;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
deleted file mode 100644
index 6cb46c9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamGroupedReduce}. These test that:
- *
- * <ul>
- *     <li>RichFunction methods are called correctly</li>
- *     <li>Timestamps of processed elements match the input timestamp</li>
- *     <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-
-public class StreamGroupedReduceTest {
-
-	@Test
-	public void testGroupedReduce() throws Exception {
-
-		KeySelector<Integer, Integer> keySelector = new IntegerKeySelector();
-		
-		StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<>(new MyReducer(), IntSerializer.INSTANCE);
-
-		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
-		testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
-		testHarness.processElement(new StreamRecord<>(1, initialTime + 2));
-		testHarness.processWatermark(new Watermark(initialTime + 2));
-		testHarness.processElement(new StreamRecord<>(2, initialTime + 3));
-		testHarness.processElement(new StreamRecord<>(2, initialTime + 4));
-		testHarness.processElement(new StreamRecord<>(3, initialTime + 5));
-
-		expectedOutput.add(new StreamRecord<>(1, initialTime + 1));
-		expectedOutput.add(new StreamRecord<>(2, initialTime + 2));
-		expectedOutput.add(new Watermark(initialTime + 2));
-		expectedOutput.add(new StreamRecord<>(2, initialTime + 3));
-		expectedOutput.add(new StreamRecord<>(4, initialTime + 4));
-		expectedOutput.add(new StreamRecord<>(3, initialTime + 5));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-	}
-
-	@Test
-	public void testOpenClose() throws Exception {
-
-		KeySelector<Integer, Integer> keySelector = new IntegerKeySelector();
-		
-		StreamGroupedReduce<Integer> operator =
-				new StreamGroupedReduce<>(new TestOpenCloseReduceFunction(), IntSerializer.INSTANCE);
-		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
-		testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
-
-		long initialTime = 0L;
-
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<>(1, initialTime));
-		testHarness.processElement(new StreamRecord<>(2, initialTime));
-
-		testHarness.close();
-
-		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseReduceFunction.closeCalled);
-		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
-	}
-
-	// This must only be used in one test, otherwise the static fields will be changed
-	// by several tests concurrently
-	private static class TestOpenCloseReduceFunction extends RichReduceFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		public static boolean openCalled = false;
-		public static boolean closeCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			if (closeCalled) {
-				Assert.fail("Close called before open.");
-			}
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			if (!openCalled) {
-				Assert.fail("Open was not called before close.");
-			}
-			closeCalled = true;
-		}
-
-		@Override
-		public Integer reduce(Integer in1, Integer in2) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
-			}
-			return in1 + in2;
-		}
-	}
-
-	// Utilities
-
-	private static class MyReducer implements ReduceFunction<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			return value1 + value2;
-		}
-
-	}
-
-	private static class IntegerKeySelector implements KeySelector<Integer, Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer getKey(Integer value) throws Exception {
-			return value;
-		}
-	}
-
-	private static TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
deleted file mode 100644
index f0113d1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamMap}. These test that:
- *
- * <ul>
- *     <li>RichFunction methods are called correctly</li>
- *     <li>Timestamps of processed elements match the input timestamp</li>
- *     <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class StreamMapTest {
-
-	private static class Map implements MapFunction<Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map(Integer value) throws Exception {
-			return "+" + (value + 1);
-		}
-	}
-	
-	@Test
-	public void testMap() throws Exception {
-		StreamMap<Integer, String> operator = new StreamMap<Integer, String>(new Map());
-
-		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
-		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
-		testHarness.processWatermark(new Watermark(initialTime + 2));
-		testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
-
-		expectedOutput.add(new StreamRecord<String>("+2", initialTime + 1));
-		expectedOutput.add(new StreamRecord<String>("+3", initialTime + 2));
-		expectedOutput.add(new Watermark(initialTime + 2));
-		expectedOutput.add(new StreamRecord<String>("+4", initialTime + 3));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-	}
-
-	@Test
-	public void testOpenClose() throws Exception {
-		StreamMap<String, String> operator = new StreamMap<String, String>(new TestOpenCloseMapFunction());
-
-		OneInputStreamOperatorTestHarness<String, String> testHarness = new OneInputStreamOperatorTestHarness<String, String>(operator);
-
-		long initialTime = 0L;
-
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<String>("Hello", initialTime));
-
-		testHarness.close();
-
-		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
-		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
-	}
-
-	// This must only be used in one test, otherwise the static fields will be changed
-	// by several tests concurrently
-	private static class TestOpenCloseMapFunction extends RichMapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		public static boolean openCalled = false;
-		public static boolean closeCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			if (closeCalled) {
-				Assert.fail("Close called before open.");
-			}
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			if (!openCalled) {
-				Assert.fail("Open was not called before close.");
-			}
-			closeCalled = true;
-		}
-
-		@Override
-		public String map(String value) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
-			}
-			return value;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
deleted file mode 100644
index 14abd18..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.HashSet;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.streaming.api.datastream.StreamProjection;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamProject}. These test that:
- *
- * <ul>
- *     <li>Timestamps of processed elements match the input timestamp</li>
- *     <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class StreamProjectTest extends StreamingMultipleProgramsTestBase {
-
-	@Test
-	public void testProject() throws Exception {
-
-		TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
-				.getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
-
-		int[] fields = new int[]{4, 4, 3};
-
-		TupleSerializer<Tuple3<Integer, Integer, String>> serializer =
-				new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection.extractFieldTypes(fields, inType))
-						.createSerializer(new ExecutionConfig());
-		@SuppressWarnings("unchecked")
-		StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator =
-				new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
-						fields, serializer);
-
-		OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> testHarness = new OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4), initialTime + 1));
-		testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2), initialTime + 2));
-		testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2), initialTime + 3));
-		testHarness.processWatermark(new Watermark(initialTime + 2));
-		testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "a", 7), initialTime + 4));
-
-		expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(4, 4, "b"), initialTime + 1));
-		expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(2, 2, "c"), initialTime + 2));
-		expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(2, 2, "c"), initialTime + 3));
-		expectedOutput.add(new Watermark(initialTime + 2));
-		expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(7, 7, "a"), initialTime + 4));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-	}
-
-
-	// tests using projection from the API without explicitly specifying the types
-	private static HashSet<Tuple2<Long, Double>> expected = new HashSet<Tuple2<Long, Double>>();
-	private static HashSet<Tuple2<Long, Double>> actual = new HashSet<Tuple2<Long, Double>>();
-
-	@Test
-	public void APIWithoutTypesTest() {
-
-		for (Long i = 1L; i < 11L; i++) {
-			expected.add(new Tuple2<Long, Double>(i, i.doubleValue()));
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-
-		env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Tuple3<Long, Character, Double> map(Long value) throws Exception {
-				return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
-			}
-		})
-			.project(0, 2)
-			.addSink(new SinkFunction<Tuple>() {
-				private static final long serialVersionUID = 1L;
-
-				@Override
-				@SuppressWarnings("unchecked")
-				public void invoke(Tuple value) throws Exception {
-					actual.add( (Tuple2<Long,Double>) value);
-				}
-			});
-
-		try {
-			env.execute();
-		} catch (Exception e) {
-			fail(e.getMessage());
-		}
-
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java
deleted file mode 100644
index 39e85e9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *    http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.streaming.api.operators.co;
-//
-//import static org.junit.Assert.assertEquals;
-//
-//import java.util.Arrays;
-//import java.util.List;
-//
-//import org.apache.flink.api.java.functions.KeySelector;
-//import org.apache.flink.api.java.tuple.Tuple2;
-//import org.apache.flink.api.java.tuple.Tuple3;
-//import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
-//import org.apache.flink.streaming.api.operators.co.CoStreamGroupedReduce;
-//import org.apache.flink.streaming.util.MockCoContext;
-//import org.junit.Test;
-//
-//public class CoGroupedReduceTest {
-//
-//	private final static class MyCoReduceFunction implements
-//			CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
-//		private static final long serialVersionUID = 1L;
-//
-//		@Override
-//		public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1,
-//				Tuple3<String, String, String> value2) {
-//			return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2);
-//		}
-//
-//		@Override
-//		public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1,
-//				Tuple2<Integer, Integer> value2) {
-//			return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
-//		}
-//
-//		@Override
-//		public String map1(Tuple3<String, String, String> value) {
-//			return value.f1;
-//		}
-//
-//		@Override
-//		public String map2(Tuple2<Integer, Integer> value) {
-//			return value.f1.toString();
-//		}
-//	}
-//
-//	@SuppressWarnings("unchecked")
-//	@Test
-//	public void coGroupedReduceTest() {
-//		Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
-//		Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
-//		Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
-//		Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
-//		Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
-//		Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
-//		Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
-//		Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
-//
-//		KeySelector<Tuple3<String, String, String>, ?> keySelector0 = new KeySelector<Tuple3<String, String, String>, String>() {
-//
-//			private static final long serialVersionUID = 1L;
-//
-//			@Override
-//			public String getKey(Tuple3<String, String, String> value) throws Exception {
-//				return value.f0;
-//			}
-//		};
-//
-//		KeySelector<Tuple2<Integer, Integer>, ?> keySelector1 = new KeySelector<Tuple2<Integer, Integer>, Integer>() {
-//
-//			private static final long serialVersionUID = 1L;
-//
-//			@Override
-//			public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
-//				return value.f0;
-//			}
-//		};
-//
-//		KeySelector<Tuple3<String, String, String>, ?> keySelector2 = new KeySelector<Tuple3<String, String, String>, String>() {
-//
-//			private static final long serialVersionUID = 1L;
-//
-//			@Override
-//			public String getKey(Tuple3<String, String, String> value) throws Exception {
-//				return value.f2;
-//			}
-//		};
-//
-//		CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-//				new MyCoReduceFunction(), keySelector0, keySelector1);
-//
-//		List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
-//				"7");
-//
-//		List<String> actualList = MockCoContext.createAndExecute(invokable,
-//				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
-//
-//		assertEquals(expected, actualList);
-//
-//		invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-//				new MyCoReduceFunction(), keySelector2, keySelector1);
-//
-//		expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
-//
-//		actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3),
-//				Arrays.asList(int1, int2, int3, int4, int5));
-//
-//		assertEquals(expected, actualList);
-//	}
-//}