You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:48 UTC

[07/19] flink git commit: [streaming] Major internal renaming and restructure

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java
deleted file mode 100644
index aa2a1bc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.AbstractCollection;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.commons.collections.BoundedCollection;
-import org.apache.commons.collections.Buffer;
-import org.apache.commons.collections.BufferUnderflowException;
-
-@SuppressWarnings("rawtypes")
-public class NullableCircularBuffer extends AbstractCollection implements Buffer,
-		BoundedCollection, Serializable {
-
-	/** Serialization version */
-	private static final long serialVersionUID = 5603722811189451017L;
-
-	/** Underlying storage array */
-	private transient Object[] elements;
-
-	/** Array index of first (oldest) buffer element */
-	private transient int start = 0;
-
-	/**
-	 * Index mod maxElements of the array position following the last buffer
-	 * element. Buffer elements start at elements[start] and "wrap around"
-	 * elements[maxElements-1], ending at elements[decrement(end)]. For example,
-	 * elements = {c,a,b}, start=1, end=1 corresponds to the buffer [a,b,c].
-	 */
-	private transient int end = 0;
-
-	/** Flag to indicate if the buffer is currently full. */
-	private transient boolean full = false;
-
-	/** Capacity of the buffer */
-	private final int maxElements;
-
-	/**
-	 * Constructs a new <code>BoundedFifoBuffer</code> big enough to hold 32
-	 * elements.
-	 */
-	public NullableCircularBuffer() {
-		this(32);
-	}
-
-	/**
-	 * Constructs a new <code>BoundedFifoBuffer</code> big enough to hold the
-	 * specified number of elements.
-	 *
-	 * @param size
-	 *            the maximum number of elements for this fifo
-	 * @throws IllegalArgumentException
-	 *             if the size is less than 1
-	 */
-	public NullableCircularBuffer(int size) {
-		if (size <= 0) {
-			throw new IllegalArgumentException("The size must be greater than 0");
-		}
-		elements = new Object[size];
-		maxElements = elements.length;
-	}
-
-	/**
-	 * Constructs a new <code>BoundedFifoBuffer</code> big enough to hold all of
-	 * the elements in the specified collection. That collection's elements will
-	 * also be added to the buffer.
-	 *
-	 * @param coll
-	 *            the collection whose elements to add, may not be null
-	 * @throws NullPointerException
-	 *             if the collection is null
-	 */
-	@SuppressWarnings("unchecked")
-	public NullableCircularBuffer(Collection coll) {
-		this(coll.size());
-		addAll(coll);
-	}
-
-	// -----------------------------------------------------------------------
-	/**
-	 * Write the buffer out using a custom routine.
-	 * 
-	 * @param out
-	 *            the output stream
-	 * @throws IOException
-	 */
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.defaultWriteObject();
-		out.writeInt(size());
-		for (Iterator it = iterator(); it.hasNext();) {
-			out.writeObject(it.next());
-		}
-	}
-
-	/**
-	 * Read the buffer in using a custom routine.
-	 * 
-	 * @param in
-	 *            the input stream
-	 * @throws IOException
-	 * @throws ClassNotFoundException
-	 */
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		elements = new Object[maxElements];
-		int size = in.readInt();
-		for (int i = 0; i < size; i++) {
-			elements[i] = in.readObject();
-		}
-		start = 0;
-		full = (size == maxElements);
-		if (full) {
-			end = 0;
-		} else {
-			end = size;
-		}
-	}
-
-	// -----------------------------------------------------------------------
-	/**
-	 * Returns the number of elements stored in the buffer.
-	 *
-	 * @return this buffer's size
-	 */
-	public int size() {
-		int size = 0;
-
-		if (end < start) {
-			size = maxElements - start + end;
-		} else if (end == start) {
-			size = (full ? maxElements : 0);
-		} else {
-			size = end - start;
-		}
-
-		return size;
-	}
-
-	/**
-	 * Returns true if this buffer is empty; false otherwise.
-	 *
-	 * @return true if this buffer is empty
-	 */
-	public boolean isEmpty() {
-		return size() == 0;
-	}
-
-	/**
-	 * Returns true if this collection is full and no new elements can be added.
-	 *
-	 * @return <code>true</code> if the collection is full
-	 */
-	public boolean isFull() {
-		return size() == maxElements;
-	}
-
-	/**
-	 * Gets the maximum size of the collection (the bound).
-	 *
-	 * @return the maximum number of elements the collection can hold
-	 */
-	public int maxSize() {
-		return maxElements;
-	}
-
-	/**
-	 * Clears this buffer.
-	 */
-	public void clear() {
-		full = false;
-		start = 0;
-		end = 0;
-		Arrays.fill(elements, null);
-	}
-
-	/**
-	 * Adds the given element to this buffer.
-	 *
-	 * @param element
-	 *            the element to add
-	 * @return true, always
-	 */
-	public boolean add(Object element) {
-
-		if (isFull()) {
-			remove();
-		}
-
-		elements[end++] = element;
-
-		if (end >= maxElements) {
-			end = 0;
-		}
-
-		if (end == start) {
-			full = true;
-		}
-
-		return true;
-	}
-
-	/**
-	 * Returns the least recently inserted element in this buffer.
-	 *
-	 * @return the least recently inserted element
-	 * @throws BufferUnderflowException
-	 *             if the buffer is empty
-	 */
-	public Object get() {
-		if (isEmpty()) {
-			throw new BufferUnderflowException("The buffer is already empty");
-		}
-
-		return elements[start];
-	}
-
-	/**
-	 * Removes the least recently inserted element from this buffer.
-	 *
-	 * @return the least recently inserted element
-	 * @throws BufferUnderflowException
-	 *             if the buffer is empty
-	 */
-	public Object remove() {
-		if (isEmpty()) {
-			throw new BufferUnderflowException("The buffer is already empty");
-		}
-
-		Object element = elements[start];
-
-		elements[start++] = null;
-
-		if (start >= maxElements) {
-			start = 0;
-		}
-
-		full = false;
-
-		return element;
-	}
-
-	/**
-	 * Increments the internal index.
-	 * 
-	 * @param index
-	 *            the index to increment
-	 * @return the updated index
-	 */
-	private int increment(int index) {
-		index++;
-		if (index >= maxElements) {
-			index = 0;
-		}
-		return index;
-	}
-
-	/**
-	 * Decrements the internal index.
-	 * 
-	 * @param index
-	 *            the index to decrement
-	 * @return the updated index
-	 */
-	private int decrement(int index) {
-		index--;
-		if (index < 0) {
-			index = maxElements - 1;
-		}
-		return index;
-	}
-
-	/**
-	 * Returns an iterator over this buffer's elements.
-	 *
-	 * @return an iterator over this buffer's elements
-	 */
-	public Iterator iterator() {
-		return new Iterator() {
-
-			private int index = start;
-			private int lastReturnedIndex = -1;
-			private boolean isFirst = full;
-
-			public boolean hasNext() {
-				return isFirst || (index != end);
-
-			}
-
-			public Object next() {
-				if (!hasNext()) {
-					throw new NoSuchElementException();
-				}
-				isFirst = false;
-				lastReturnedIndex = index;
-				index = increment(index);
-				return elements[lastReturnedIndex];
-			}
-
-			public void remove() {
-				if (lastReturnedIndex == -1) {
-					throw new IllegalStateException();
-				}
-
-				// First element can be removed quickly
-				if (lastReturnedIndex == start) {
-					NullableCircularBuffer.this.remove();
-					lastReturnedIndex = -1;
-					return;
-				}
-
-				int pos = lastReturnedIndex + 1;
-				if (start < lastReturnedIndex && pos < end) {
-					// shift in one part
-					System.arraycopy(elements, pos, elements, lastReturnedIndex, end - pos);
-				} else {
-					// Other elements require us to shift the subsequent
-					// elements
-					while (pos != end) {
-						if (pos >= maxElements) {
-							elements[pos - 1] = elements[0];
-							pos = 0;
-						} else {
-							elements[decrement(pos)] = elements[pos];
-							pos = increment(pos);
-						}
-					}
-				}
-
-				lastReturnedIndex = -1;
-				end = decrement(end);
-				elements[end] = null;
-				full = false;
-				index = decrement(index);
-			}
-
-		};
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
deleted file mode 100644
index 1c67c9e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import org.apache.flink.runtime.state.OperatorState;
-
-/**
- * Base class for representing operator states that can be repartitioned for
- * state state and load balancing.
- * 
- * @param <T>
- *            The type of the operator state.
- */
-public abstract class PartitionableState<T> extends OperatorState<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	PartitionableState(T initialState) {
-		super(initialState);
-	}
-
-	/**
-	 * Repartitions(divides) the current state into the given number of new
-	 * partitions. The created partitions will be used to redistribute then
-	 * rebuild the state among the parallel instances of the operator. The
-	 * implementation should reflect the partitioning of the input values to
-	 * maintain correct operator behavior.
-	 * 
-	 * </br> </br> It is also assumed that if we would {@link #reBuild} the
-	 * repartitioned state we would basically get the same as before.
-	 * 
-	 * 
-	 * @param numberOfPartitions
-	 *            The desired number of partitions. The method must return an
-	 *            array of that size.
-	 * @return The array containing the state part for each partition.
-	 */
-	public abstract OperatorState<T>[] repartition(int numberOfPartitions);
-
-	/**
-	 * Rebuilds the current state partition from the given parts. Used for
-	 * building the state after a re-balance phase.
-	 * 
-	 * @param parts
-	 *            The state parts that will be used to rebuild the current
-	 *            partition.
-	 * @return The rebuilt operator state.
-	 */
-	public abstract OperatorState<T> reBuild(OperatorState<T>... parts);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 54a7692..f489334 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -25,11 +25,11 @@ import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
-import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.streaming.api.operators.StreamReduce;
 import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 import org.junit.Test;
@@ -107,13 +107,13 @@ public class AggregationFunctionTest {
 		ReduceFunction<Integer> maxFunction0 = ComparableAggregator.getAggregator(0, type2,
 				AggregationType.MAX);
 		List<Tuple2<Integer, Integer>> sumList = MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), getInputList());
+				new StreamReduce<Tuple2<Integer, Integer>>(sumFunction), getInputList());
 
 		List<Tuple2<Integer, Integer>> minList = MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minFunction), getInputList());
+				new StreamReduce<Tuple2<Integer, Integer>>(minFunction), getInputList());
 
 		List<Tuple2<Integer, Integer>> maxList = MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList());
+				new StreamReduce<Tuple2<Integer, Integer>>(maxFunction), getInputList());
 
 		TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor
 				.getForObject(new Tuple2<Integer, Integer>(1, 1));
@@ -123,15 +123,15 @@ public class AggregationFunctionTest {
 				typeInfo, new ExecutionConfig());
 
 		List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
-				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, keySelector),
+				new StreamGroupedReduce<Tuple2<Integer, Integer>>(sumFunction, keySelector),
 				getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
-				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction, keySelector),
+				new StreamGroupedReduce<Tuple2<Integer, Integer>>(minFunction, keySelector),
 				getInputList());
 
 		List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
-				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, keySelector),
+				new StreamGroupedReduce<Tuple2<Integer, Integer>>(maxFunction, keySelector),
 				getInputList());
 
 		assertEquals(expectedSumList, sumList);
@@ -141,11 +141,11 @@ public class AggregationFunctionTest {
 		assertEquals(expectedGroupMinList, groupedMinList);
 		assertEquals(expectedGroupMaxList, groupedMaxList);
 		assertEquals(expectedSumList0, MockContext.createAndExecute(
-				new StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
+				new StreamReduce<Integer>(sumFunction0), simpleInput));
 		assertEquals(expectedMinList0, MockContext.createAndExecute(
-				new StreamReduceInvokable<Integer>(minFunction0), simpleInput));
+				new StreamReduce<Integer>(minFunction0), simpleInput));
 		assertEquals(expectedMaxList0, MockContext.createAndExecute(
-				new StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
+				new StreamReduce<Integer>(maxFunction0), simpleInput));
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
 		try {
@@ -229,11 +229,11 @@ public class AggregationFunctionTest {
 		ReduceFunction<Integer> maxFunction0 = ComparableAggregator.getAggregator(0, type2, AggregationType.MAX);
 
 		List<MyPojo> sumList = MockContext.createAndExecute(
-				new StreamReduceInvokable<MyPojo>(sumFunction), getInputPojoList());
+				new StreamReduce<MyPojo>(sumFunction), getInputPojoList());
 		List<MyPojo> minList = MockContext.createAndExecute(
-				new StreamReduceInvokable<MyPojo>(minFunction), getInputPojoList());
+				new StreamReduce<MyPojo>(minFunction), getInputPojoList());
 		List<MyPojo> maxList = MockContext.createAndExecute(
-				new StreamReduceInvokable<MyPojo>(maxFunction), getInputPojoList());
+				new StreamReduce<MyPojo>(maxFunction), getInputPojoList());
 
 		TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(1, 1));
 		KeySelector<MyPojo, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
@@ -241,13 +241,13 @@ public class AggregationFunctionTest {
 				typeInfo, config);
 
 		List<MyPojo> groupedSumList = MockContext.createAndExecute(
-				new GroupedReduceInvokable<MyPojo>(sumFunction, keySelector),
+				new StreamGroupedReduce<MyPojo>(sumFunction, keySelector),
 				getInputPojoList());
 		List<MyPojo> groupedMinList = MockContext.createAndExecute(
-				new GroupedReduceInvokable<MyPojo>(minFunction, keySelector),
+				new StreamGroupedReduce<MyPojo>(minFunction, keySelector),
 				getInputPojoList());
 		List<MyPojo> groupedMaxList = MockContext.createAndExecute(
-				new GroupedReduceInvokable<MyPojo>(maxFunction, keySelector),
+				new StreamGroupedReduce<MyPojo>(maxFunction, keySelector),
 				getInputPojoList());
 
 		assertEquals(expectedSumList, sumList);
@@ -257,11 +257,11 @@ public class AggregationFunctionTest {
 		assertEquals(expectedGroupMinList, groupedMinList);
 		assertEquals(expectedGroupMaxList, groupedMaxList);
 		assertEquals(expectedSumList0, MockContext.createAndExecute(
-				new StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
+				new StreamReduce<Integer>(sumFunction0), simpleInput));
 		assertEquals(expectedMinList0, MockContext.createAndExecute(
-				new StreamReduceInvokable<Integer>(minFunction0), simpleInput));
+				new StreamReduce<Integer>(minFunction0), simpleInput));
 		assertEquals(expectedMaxList0, MockContext.createAndExecute(
-				new StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
+				new StreamReduce<Integer>(maxFunction0), simpleInput));
 	}
 
 	@Test
@@ -324,16 +324,16 @@ public class AggregationFunctionTest {
 		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
 
 		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
+				new StreamReduce<Tuple2<Integer, Integer>>(maxByFunctionFirst),
 				getInputList()));
 		assertEquals(maxByLastExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
+				new StreamReduce<Tuple2<Integer, Integer>>(maxByFunctionLast),
 				getInputList()));
 		assertEquals(minByLastExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
+				new StreamReduce<Tuple2<Integer, Integer>>(minByFunctionLast),
 				getInputList()));
 		assertEquals(minByFirstExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
+				new StreamReduce<Tuple2<Integer, Integer>>(minByFunctionFirst),
 				getInputList()));
 	}
 
@@ -398,16 +398,16 @@ public class AggregationFunctionTest {
 		minByLastExpected.add(new MyPojo(0, 6));
 
 		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<MyPojo>(maxByFunctionFirst),
+				new StreamReduce<MyPojo>(maxByFunctionFirst),
 				getInputPojoList()));
 		assertEquals(maxByLastExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<MyPojo>(maxByFunctionLast),
+				new StreamReduce<MyPojo>(maxByFunctionLast),
 				getInputPojoList()));
 		assertEquals(minByLastExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<MyPojo>(minByFunctionLast),
+				new StreamReduce<MyPojo>(minByFunctionLast),
 				getInputPojoList()));
 		assertEquals(minByFirstExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<MyPojo>(minByFunctionFirst),
+				new StreamReduce<MyPojo>(minByFunctionFirst),
 				getInputPojoList()));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
index f527de4..4228314 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
@@ -30,8 +30,8 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.util.TestListResultSink;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
@@ -89,7 +89,7 @@ public class CoStreamTest {
 					public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
 						return true;
 					}
-				}).setChainingStrategy(StreamInvokable.ChainingStrategy.NEVER).groupBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
+				}).setChainingStrategy(StreamOperator.ChainingStrategy.NEVER).groupBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
 
 					private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 31bd147..39c3f31 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
index 792c5d2..e72f2d9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
@@ -27,8 +27,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.flink.streaming.api.function.source.FromElementsFunction;
-import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.util.MockCollector;
 import org.apache.flink.streaming.util.MockSource;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index 9a19dde..f163b9e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -28,10 +28,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.function.co.CoWindowFunction;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
+import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index bc7fe73..fc3e36f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -29,7 +29,7 @@ import java.util.Map;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.util.TestListResultSink;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index 49b3bf8..32da578 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -21,8 +21,8 @@ import static org.junit.Assert.assertArrayEquals;
 
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamvertex.MockRecordWriter;
+import org.apache.flink.streaming.api.streamtask.MockRecordWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.MockRecordWriterFactory;
 import org.apache.flink.util.Collector;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
deleted file mode 100644
index 969a06b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class CounterInvokableTest {
-
-	@Test
-	public void counterTest() {
-		CounterInvokable<String> invokable = new CounterInvokable<String>();
-
-		List<Long> expected = Arrays.asList(1L, 2L, 3L);
-		List<Long> actual = MockContext.createAndExecute(invokable, Arrays.asList("one", "two", "three"));
-		
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
deleted file mode 100644
index 403dd17..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class FilterTest implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	static class MyFilter implements FilterFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Integer value) throws Exception {
-			return value % 2 == 0;
-		}
-	}
-
-	@Test 
-	public void test() {
-		FilterInvokable<Integer> invokable = new FilterInvokable<Integer>(new MyFilter());
-
-		List<Integer> expected = Arrays.asList(2, 4, 6);
-		List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7));
-		
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
deleted file mode 100644
index 7424e21..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.util.MockContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class FlatMapTest {
-
-	public static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Integer value, Collector<Integer> out) throws Exception {
-			if (value % 2 == 0) {
-				out.collect(value);
-				out.collect(value * value);
-			}
-		}
-	}
-
-	@Test
-	public void flatMapTest() {
-		FlatMapInvokable<Integer, Integer> invokable = new FlatMapInvokable<Integer, Integer>(new MyFlatMap());
-		
-		List<Integer> expected = Arrays.asList(2, 4, 4, 16, 6, 36, 8, 64);
-		List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
-		
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
deleted file mode 100644
index 01375d2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.util.MockContext;
-
-import org.junit.Test;
-
-public class GroupedFoldInvokableTest {
-
-	private static class MyFolder implements FoldFunction<Integer, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String fold(String accumulator, Integer value) throws Exception {
-			return accumulator + value.toString();
-		}
-
-	}
-
-	@Test
-	public void test() {
-		TypeInformation<String> outType = TypeExtractor.getForObject("A string");
-
-		GroupedFoldInvokable<Integer, String> invokable1 = new GroupedFoldInvokable<Integer, String>(
-				new MyFolder(), new KeySelector<Integer, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String getKey(Integer value) throws Exception {
-				return value.toString();
-			}
-		}, "100", outType);
-
-		List<String> expected = Arrays.asList("1001","10011", "1002", "10022", "1003");
-		List<String> actual = MockContext.createAndExecute(invokable1,
-				Arrays.asList(1, 1, 2, 2, 3));
-
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
deleted file mode 100644
index ce47c67..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class GroupedReduceInvokableTest {
-
-	private static class MyReducer implements ReduceFunction<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			return value1 + value2;
-		}
-
-	}
-
-	@Test
-	public void test() {
-		GroupedReduceInvokable<Integer> invokable1 = new GroupedReduceInvokable<Integer>(
-				new MyReducer(), new KeySelector<Integer, Integer>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer getKey(Integer value) throws Exception {
-						return value;
-					}
-				});
-
-		List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3);
-		List<Integer> actual = MockContext.createAndExecute(invokable1,
-				Arrays.asList(1, 1, 2, 2, 3));
-
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
deleted file mode 100644
index 5390ec9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class MapTest {
-
-	private static class Map implements MapFunction<Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map(Integer value) throws Exception {
-			return "+" + (value + 1);
-		}
-	}
-	
-	@Test
-	public void mapInvokableTest() {
-		MapInvokable<Integer, String> invokable = new MapInvokable<Integer, String>(new Map());
-		
-		List<String> expectedList = Arrays.asList("+2", "+3", "+4");
-		List<String> actualList = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3));
-		
-		assertEquals(expectedList, actualList);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
deleted file mode 100644
index 11c44cd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.datastream.StreamProjection;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class ProjectTest implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	@Test
-	public void test() {
-
-		TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
-				.getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b",
-						4));
-
-		int[] fields = new int[] { 4, 4, 3 };
-		Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class };
-
-		@SuppressWarnings("unchecked")
-		ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> invokable = new ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
-				fields,
-				(TypeInformation<Tuple3<Integer, Integer, String>>) StreamProjection
-						.extractFieldTypes(fields, classes, inType));
-
-		List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer, String, Integer>>();
-		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
-		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2));
-		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2));
-		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "a", 7));
-
-		List<Tuple3<Integer, Integer, String>> expected = new ArrayList<Tuple3<Integer, Integer, String>>();
-		expected.add(new Tuple3<Integer, Integer, String>(4, 4, "b"));
-		expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
-		expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
-		expected.add(new Tuple3<Integer, Integer, String>(7, 7, "a"));
-
-		assertEquals(expected, MockContext.createAndExecute(invokable, input));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
deleted file mode 100644
index 90a133b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class StreamFoldTest {
-
-	private static class MyFolder implements FoldFunction<Integer, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String fold(String accumulator, Integer value) throws Exception {
-			return accumulator + value.toString();
-		}
-	}
-
-	@Test
-	public void test() {
-		TypeInformation<String> outType = TypeExtractor.getForObject("A string");
-		StreamFoldInvokable<Integer, String> invokable1 = new StreamFoldInvokable<Integer, String>(
-				new MyFolder(), "", outType);
-
-		List<String> expected = Arrays.asList("1","11","112","1123","11233");
-		List<String> actual = MockContext.createAndExecute(invokable1,
-				Arrays.asList(1, 1, 2, 3, 3));
-
-		assertEquals(expected, actual);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
deleted file mode 100644
index ae866e6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class StreamReduceTest {
-
-	private static class MyReducer implements ReduceFunction<Integer>{
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			return value1+value2;
-		}
-		
-	}
-	
-	@Test
-	public void test() {
-		StreamReduceInvokable<Integer> invokable1 = new StreamReduceInvokable<Integer>(
-				new MyReducer());
-
-		List<Integer> expected = Arrays.asList(1,2,4,7,10);
-		List<Integer> actual = MockContext.createAndExecute(invokable1,
-				Arrays.asList(1, 1, 2, 3, 3));
-
-		assertEquals(expected, actual);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapTest.java
deleted file mode 100644
index 195e67c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class CoFlatMapTest implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap1(String value, Collector<String> coll) {
-			for (int i = 0; i < value.length(); i++) {
-				coll.collect(value.substring(i, i + 1));
-			}
-		}
-
-		@Override
-		public void flatMap2(Integer value, Collector<String> coll) {
-			coll.collect(value.toString());
-		}
-	}
-
-	@Test
-	public void coFlatMapTest() {
-		CoFlatMapInvokable<String, Integer, String> invokable = new CoFlatMapInvokable<String, Integer, String>(
-				new MyCoFlatMap());
-
-		List<String> expectedList = Arrays.asList("a", "b", "c", "1", "d", "e", "f", "2", "g", "h",
-				"e", "3", "4", "5");
-		List<String> actualList = MockCoContext.createAndExecute(invokable,
-				Arrays.asList("abc", "def", "ghe"), Arrays.asList(1, 2, 3, 4, 5));
-
-		assertEquals(expectedList, actualList);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void multipleInputTest() {
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
-		DataStream<Integer> ds2 = env.fromElements(2, 4).merge(ds1);
-		
-		try {
-			ds1.forward().merge(ds2);
-			fail();
-		} catch (RuntimeException e) {
-			// expected
-		}
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceTest.java
deleted file mode 100644
index a531884..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoGroupedReduceTest {
-
-	private final static class MyCoReduceFunction implements
-			CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1,
-				Tuple3<String, String, String> value2) {
-			return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2);
-		}
-
-		@Override
-		public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1,
-				Tuple2<Integer, Integer> value2) {
-			return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
-		}
-
-		@Override
-		public String map1(Tuple3<String, String, String> value) {
-			return value.f1;
-		}
-
-		@Override
-		public String map2(Tuple2<Integer, Integer> value) {
-			return value.f1.toString();
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void coGroupedReduceTest() {
-		Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
-		Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
-		Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
-		Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
-		Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
-		Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
-		Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
-		Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
-
-		KeySelector<Tuple3<String, String, String>, ?> keySelector0 = new KeySelector<Tuple3<String, String, String>, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String getKey(Tuple3<String, String, String> value) throws Exception {
-				return value.f0;
-			}
-		};
-
-		KeySelector<Tuple2<Integer, Integer>, ?> keySelector1 = new KeySelector<Tuple2<Integer, Integer>, Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
-				return value.f0;
-			}
-		};
-
-		KeySelector<Tuple3<String, String, String>, ?> keySelector2 = new KeySelector<Tuple3<String, String, String>, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String getKey(Tuple3<String, String, String> value) throws Exception {
-				return value.f2;
-			}
-		};
-
-		CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-				new MyCoReduceFunction(), keySelector0, keySelector1);
-
-		List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
-				"7");
-
-		List<String> actualList = MockCoContext.createAndExecute(invokable,
-				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
-
-		assertEquals(expected, actualList);
-
-		invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-				new MyCoReduceFunction(), keySelector2, keySelector1);
-
-		expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
-
-		actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3),
-				Arrays.asList(int1, int2, int3, int4, int5));
-
-		assertEquals(expected, actualList);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapTest.java
deleted file mode 100644
index 6b84440..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoMapTest implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private final static class MyCoMap implements CoMapFunction<Double, Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map1(Double value) {
-			return value.toString();
-		}
-
-		@Override
-		public String map2(Integer value) {
-			return value.toString();
-		}
-	}
-
-	@Test
-	public void coMapTest() {
-		CoMapInvokable<Double, Integer, String> invokable = new CoMapInvokable<Double, Integer, String>(new MyCoMap());
-
-		List<String> expectedList = Arrays.asList("1.1", "1", "1.2", "2", "1.3", "3", "1.4", "1.5");
-		List<String> actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3));
-		
-		assertEquals(expectedList, actualList);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceTest.java
deleted file mode 100644
index 7f23fba..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoStreamReduceTest {
-
-	public static class MyCoReduceFunction implements
-			CoReduceFunction<Integer, String, Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce1(Integer value1, Integer value2) {
-			return value1 * value2;
-		}
-
-		@Override
-		public String reduce2(String value1, String value2) {
-			return value1 + value2;
-		}
-
-		@Override
-		public Integer map1(Integer value) {
-			return value;
-		}
-
-		@Override
-		public Integer map2(String value) {
-			return Integer.parseInt(value);
-		}
-
-	}
-
-	@Test
-	public void coStreamReduceTest() {
-
-		CoReduceInvokable<Integer, String, Integer> coReduce = new CoReduceInvokable<Integer, String, Integer>(
-				new MyCoReduceFunction());
-
-		List<Integer> expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 24);
-		List<Integer> result = MockCoContext.createAndExecute(coReduce,
-				Arrays.asList(1, 2, 3, 4), Arrays.asList("9", "9", "8"));
-
-		assertEquals(expected1, result);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowTest.java
deleted file mode 100644
index a4f6120..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.co.CoWindowFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class CoWindowTest {
-
-	public static final class MyCoGroup1 implements CoWindowFunction<Integer, Integer, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@SuppressWarnings("unused")
-		@Override
-		public void coWindow(List<Integer> first, List<Integer> second, Collector<Integer> out)
-				throws Exception {
-			Integer count1 = 0;
-			for (Integer i : first) {
-				count1++;
-			}
-			Integer count2 = 0;
-			for (Integer i : second) {
-				count2++;
-			}
-			out.collect(count1);
-			out.collect(count2);
-
-		}
-
-	}
-
-	public static final class MyCoGroup2 implements
-			CoWindowFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coWindow(List<Tuple2<Integer, Integer>> first,
-				List<Tuple2<Integer, Integer>> second, Collector<Integer> out) throws Exception {
-
-			Set<Integer> firstElements = new HashSet<Integer>();
-			for (Tuple2<Integer, Integer> value : first) {
-				firstElements.add(value.f1);
-			}
-			for (Tuple2<Integer, Integer> value : second) {
-				if (firstElements.contains(value.f1)) {
-					out.collect(value.f1);
-				}
-			}
-
-		}
-
-	}
-
-	private static final class MyTS1 implements Timestamp<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long getTimestamp(Integer value) {
-			return value;
-		}
-
-	}
-
-	private static final class MyTS2 implements Timestamp<Tuple2<Integer, Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long getTimestamp(Tuple2<Integer, Integer> value) {
-			return value.f0;
-		}
-
-	}
-
-	@Test
-	public void coWindowGroupReduceTest2() throws Exception {
-
-		CoWindowInvokable<Integer, Integer, Integer> invokable1 = new CoWindowInvokable<Integer, Integer, Integer>(
-				new MyCoGroup1(), 2, 1, new TimestampWrapper<Integer>(new MyTS1(), 1),
-				new TimestampWrapper<Integer>(new MyTS1(), 1));
-
-		// Windowsize 2, slide 1
-		// 1,2|2,3|3,4|4,5
-
-		List<Integer> input11 = new ArrayList<Integer>();
-		input11.add(1);
-		input11.add(1);
-		input11.add(2);
-		input11.add(3);
-		input11.add(3);
-
-		List<Integer> input12 = new ArrayList<Integer>();
-		input12.add(1);
-		input12.add(2);
-		input12.add(3);
-		input12.add(3);
-		input12.add(5);
-
-		// Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5)
-		// expected output: 3,2|3,3|2,2|0,1
-
-		List<Integer> expected1 = new ArrayList<Integer>();
-		expected1.add(3);
-		expected1.add(2);
-		expected1.add(3);
-		expected1.add(3);
-		expected1.add(2);
-		expected1.add(2);
-		expected1.add(0);
-		expected1.add(1);
-
-		List<Integer> actual1 = MockCoContext.createAndExecute(invokable1, input11, input12);
-		assertEquals(expected1, actual1);
-
-		CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>(
-				new MyCoGroup2(), 2, 3, new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(),
-						1), new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1));
-
-		// WindowSize 2, slide 3
-		// 1,2|4,5|7,8|
-
-		List<Tuple2<Integer, Integer>> input21 = new ArrayList<Tuple2<Integer, Integer>>();
-		input21.add(new Tuple2<Integer, Integer>(1, 1));
-		input21.add(new Tuple2<Integer, Integer>(1, 2));
-		input21.add(new Tuple2<Integer, Integer>(2, 3));
-		input21.add(new Tuple2<Integer, Integer>(3, 4));
-		input21.add(new Tuple2<Integer, Integer>(3, 5));
-		input21.add(new Tuple2<Integer, Integer>(4, 6));
-		input21.add(new Tuple2<Integer, Integer>(4, 7));
-		input21.add(new Tuple2<Integer, Integer>(5, 8));
-
-		List<Tuple2<Integer, Integer>> input22 = new ArrayList<Tuple2<Integer, Integer>>();
-		input22.add(new Tuple2<Integer, Integer>(1, 1));
-		input22.add(new Tuple2<Integer, Integer>(2, 0));
-		input22.add(new Tuple2<Integer, Integer>(2, 2));
-		input22.add(new Tuple2<Integer, Integer>(3, 9));
-		input22.add(new Tuple2<Integer, Integer>(3, 4));
-		input22.add(new Tuple2<Integer, Integer>(4, 10));
-		input22.add(new Tuple2<Integer, Integer>(5, 8));
-		input22.add(new Tuple2<Integer, Integer>(5, 7));
-
-		List<Integer> expected2 = new ArrayList<Integer>();
-		expected2.add(1);
-		expected2.add(2);
-		expected2.add(8);
-		expected2.add(7);
-
-		List<Integer> actual2 = MockCoContext.createAndExecute(invokable2, input21, input22);
-		assertEquals(expected2, actual2);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/SelfConnectionTest.java
deleted file mode 100644
index b58245a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/SelfConnectionTest.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class SelfConnectionTest implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private final int MEMORY_SIZE = 32;
-
-	private static List<String> expected;
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	@Test
-	public void sameDataStreamTest() {
-
-		StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE);
-
-		TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
-		Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-
-		};
-
-		KeySelector keySelector = new KeySelector<Integer, Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value;
-			}
-		};
-
-		DataStream<Integer> src = env.fromElements(1, 3, 5);
-
-		@SuppressWarnings("unused")
-		DataStream<Tuple2<Integer, Integer>> dataStream =
-				src.join(src).onWindow(50L, timeStamp, timeStamp).where(keySelector).equalTo(keySelector)
-						.map(new MapFunction<Tuple2<Integer, Integer>, String>() {
-
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public String map(Tuple2<Integer, Integer> value) throws Exception {
-								return value.toString();
-							}
-						})
-						.addSink(resultSink);
-
-
-		try {
-			env.execute();
-
-			expected = new ArrayList<String>();
-
-			expected.addAll(Arrays.asList("(1,1)", "(3,3)", "(5,5)"));
-
-			List<String> result = resultSink.getResult();
-
-			Collections.sort(expected);
-			Collections.sort(result);
-
-			assertEquals(expected, result);
-		} catch (Exception e) {
-			fail();
-			e.printStackTrace();
-		}
-	}
-
-	/**
-	 * We connect two different data streams in a chain to a CoMap.
-	 */
-	@Test
-	public void differentDataStreamSameChain() {
-
-		TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
-
-		DataStream<Integer> src = env.fromElements(1, 3, 5);
-
-		DataStream<String> stringMap = src.map(new MapFunction<Integer, String>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String map(Integer value) throws Exception {
-				return "x " + value;
-			}
-		}).setChainingStrategy(StreamInvokable.ChainingStrategy.ALWAYS);
-
-		stringMap.connect(src).map(new CoMapFunction<String, Integer, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String map1(String value) {
-				return value;
-			}
-
-			@Override
-			public String map2(Integer value) {
-				return String.valueOf(value + 1);
-			}
-		}).addSink(resultSink);
-
-		try {
-			env.execute();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-
-		expected = new ArrayList<String>();
-
-		expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"));
-
-		List<String> result = resultSink.getResult();
-
-		Collections.sort(expected);
-		Collections.sort(result);
-
-		assertEquals(expected, result);
-	}
-
-	/**
-	 * We connect two different data streams in different chains to a CoMap.
-	 * (This is not actually self-connect.)
-	 */
-	@Test
-	public void differentDataStreamDifferentChain() {
-
-		TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
-		StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE);
-
-		DataStream<Integer> src = env.fromElements(1, 3, 5).setChainingStrategy(StreamInvokable.ChainingStrategy.NEVER);
-
-		DataStream<String> stringMap = src.flatMap(new FlatMapFunction<Integer, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void flatMap(Integer value, Collector<String> out) throws Exception {
-				out.collect("x " + value);
-			}
-		}).groupBy(new KeySelector<String, Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(String value) throws Exception {
-				return value.length();
-			}
-		});
-
-		DataStream<Long> longMap = src.map(new MapFunction<Integer, Long>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Long map(Integer value) throws Exception {
-				return Long.valueOf(value + 1);
-			}
-		}).groupBy(new KeySelector<Long, Long>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Long getKey(Long value) throws Exception {
-				return value;
-			}
-		});
-
-
-		stringMap.connect(longMap).map(new CoMapFunction<String, Long, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String map1(String value) {
-				return value;
-			}
-
-			@Override
-			public String map2(Long value) {
-				return value.toString();
-			}
-		}).addSink(resultSink);
-
-		try {
-			env.execute();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-
-		expected = new ArrayList<String>();
-
-		expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"));
-
-		List<String> result = resultSink.getResult();
-
-		Collections.sort(expected);
-		Collections.sort(result);
-
-		assertEquals(expected, result);
-	}
-}