You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:44 UTC

[28/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
deleted file mode 100644
index c23a4f4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ /dev/null
@@ -1,694 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.ConnectedStreams;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
-import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class DataStreamTest extends StreamingMultipleProgramsTestBase {
-
-
-	/**
-	 * Tests {@link SingleOutputStreamOperator#name(String)} functionality.
-	 *
-	 * @throws Exception
-	 */
-	@Test
-	public void testNaming() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Long> dataStream1 = env.generateSequence(0, 0).name("testSource1")
-				.map(new MapFunction<Long, Long>() {
-					@Override
-					public Long map(Long value) throws Exception {
-						return null;
-					}
-				}).name("testMap");
-
-		DataStream<Long> dataStream2 = env.generateSequence(0, 0).name("testSource2")
-				.map(new MapFunction<Long, Long>() {
-					@Override
-					public Long map(Long value) throws Exception {
-						return null;
-					}
-				}).name("testMap");
-
-		DataStreamSink<Long> connected = dataStream1.connect(dataStream2)
-				.flatMap(new CoFlatMapFunction<Long, Long, Long>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void flatMap1(Long value, Collector<Long> out) throws Exception {
-					}
-
-					@Override
-					public void flatMap2(Long value, Collector<Long> out) throws Exception {
-					}
-				}).name("testCoFlatMap")
-				.windowAll(GlobalWindows.create())
-				.trigger(PurgingTrigger.of(CountTrigger.of(10)))
-				.fold(0L, new FoldFunction<Long, Long>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Long fold(Long accumulator, Long value) throws Exception {
-						return null;
-					}
-				})
-				.name("testWindowFold")
-				.print();
-
-		//test functionality through the operator names in the execution plan
-		String plan = env.getExecutionPlan();
-
-		assertTrue(plan.contains("testSource1"));
-		assertTrue(plan.contains("testSource2"));
-		assertTrue(plan.contains("testMap"));
-		assertTrue(plan.contains("testMap"));
-		assertTrue(plan.contains("testCoFlatMap"));
-		assertTrue(plan.contains("testWindowFold"));
-	}
-
-	/**
-	 * Tests that {@link DataStream#keyBy} and {@link DataStream#partitionByHash} result in
-	 * different and correct topologies. Does the some for the {@link ConnectedStreams}.
-	 */
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testPartitioning() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
-		DataStream<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));
-		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connected = src1.connect(src2);
-
-		//Testing DataStream grouping
-		DataStream<Tuple2<Long, Long>> group1 = src1.keyBy(0);
-		DataStream<Tuple2<Long, Long>> group2 = src1.keyBy(1, 0);
-		DataStream<Tuple2<Long, Long>> group3 = src1.keyBy("f0");
-		DataStream<Tuple2<Long, Long>> group4 = src1.keyBy(new FirstSelector());
-
-		int id1 = createDownStreamId(group1);
-		int id2 = createDownStreamId(group2);
-		int id3 = createDownStreamId(group3);
-		int id4 = createDownStreamId(group4);
-
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id1)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id2)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id3)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id4)));
-
-		assertTrue(isKeyed(group1));
-		assertTrue(isKeyed(group2));
-		assertTrue(isKeyed(group3));
-		assertTrue(isKeyed(group4));
-
-		//Testing DataStream partitioning
-		DataStream<Tuple2<Long, Long>> partition1 = src1.partitionByHash(0);
-		DataStream<Tuple2<Long, Long>> partition2 = src1.partitionByHash(1, 0);
-		DataStream<Tuple2<Long, Long>> partition3 = src1.partitionByHash("f0");
-		DataStream<Tuple2<Long, Long>> partition4 = src1.partitionByHash(new FirstSelector());
-
-		int pid1 = createDownStreamId(partition1);
-		int pid2 = createDownStreamId(partition2);
-		int pid3 = createDownStreamId(partition3);
-		int pid4 = createDownStreamId(partition4);
-
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid1)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid2)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid3)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid4)));
-
-		assertFalse(isKeyed(partition1));
-		assertFalse(isKeyed(partition3));
-		assertFalse(isKeyed(partition2));
-		assertFalse(isKeyed(partition4));
-
-		// Testing DataStream custom partitioning
-		Partitioner<Long> longPartitioner = new Partitioner<Long>() {
-			@Override
-			public int partition(Long key, int numPartitions) {
-				return 100;
-			}
-		};
-
-		DataStream<Tuple2<Long, Long>> customPartition1 = src1.partitionCustom(longPartitioner, 0);
-		DataStream<Tuple2<Long, Long>> customPartition3 = src1.partitionCustom(longPartitioner, "f0");
-		DataStream<Tuple2<Long, Long>> customPartition4 = src1.partitionCustom(longPartitioner, new FirstSelector());
-
-		int cid1 = createDownStreamId(customPartition1);
-		int cid2 = createDownStreamId(customPartition3);
-		int cid3 = createDownStreamId(customPartition4);
-
-		assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid1)));
-		assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid2)));
-		assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid3)));
-
-		assertFalse(isKeyed(customPartition1));
-		assertFalse(isKeyed(customPartition3));
-		assertFalse(isKeyed(customPartition4));
-
-		//Testing ConnectedStreams grouping
-		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup1 = connected.keyBy(0, 0);
-		Integer downStreamId1 = createDownStreamId(connectedGroup1);
-
-		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup2 = connected.keyBy(new int[]{0}, new int[]{0});
-		Integer downStreamId2 = createDownStreamId(connectedGroup2);
-
-		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup3 = connected.keyBy("f0", "f0");
-		Integer downStreamId3 = createDownStreamId(connectedGroup3);
-
-		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
-		Integer downStreamId4 = createDownStreamId(connectedGroup4);
-
-		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup5 = connected.keyBy(new FirstSelector(), new FirstSelector());
-		Integer downStreamId5 = createDownStreamId(connectedGroup5);
-
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId1)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId1)));
-
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId2)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId2)));
-
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId3)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId3)));
-
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId4)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId4)));
-
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId5)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId5)));
-
-		assertTrue(isKeyed(connectedGroup1));
-		assertTrue(isKeyed(connectedGroup2));
-		assertTrue(isKeyed(connectedGroup3));
-		assertTrue(isKeyed(connectedGroup4));
-		assertTrue(isKeyed(connectedGroup5));
-
-		//Testing ConnectedStreams partitioning
-		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition1 = connected.partitionByHash(0, 0);
-		Integer connectDownStreamId1 = createDownStreamId(connectedPartition1);
-
-		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0});
-		Integer connectDownStreamId2 = createDownStreamId(connectedPartition2);
-
-		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition3 = connected.partitionByHash("f0", "f0");
-		Integer connectDownStreamId3 = createDownStreamId(connectedPartition3);
-
-		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
-		Integer connectDownStreamId4 = createDownStreamId(connectedPartition4);
-
-		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
-		Integer connectDownStreamId5 = createDownStreamId(connectedPartition5);
-
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
-				connectDownStreamId1)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
-				connectDownStreamId1)));
-
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
-				connectDownStreamId2)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
-				connectDownStreamId2)));
-
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
-				connectDownStreamId3)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
-				connectDownStreamId3)));
-
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
-				connectDownStreamId4)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
-				connectDownStreamId4)));
-
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
-				connectDownStreamId5)));
-		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
-				connectDownStreamId5)));
-
-		assertFalse(isKeyed(connectedPartition1));
-		assertFalse(isKeyed(connectedPartition2));
-		assertFalse(isKeyed(connectedPartition3));
-		assertFalse(isKeyed(connectedPartition4));
-		assertFalse(isKeyed(connectedPartition5));
-	}
-
-	/**
-	 * Tests whether parallelism gets set.
-	 */
-	@Test
-	public void testParallelism() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Tuple2<Long, Long>> src = env.fromElements(new Tuple2<>(0L, 0L));
-		env.setParallelism(10);
-
-		SingleOutputStreamOperator<Long, ?> map = src.map(new MapFunction<Tuple2<Long, Long>, Long>() {
-			@Override
-			public Long map(Tuple2<Long, Long> value) throws Exception {
-				return null;
-			}
-		}).name("MyMap");
-
-		DataStream<Long> windowed = map
-				.windowAll(GlobalWindows.create())
-				.trigger(PurgingTrigger.of(CountTrigger.of(10)))
-				.fold(0L, new FoldFunction<Long, Long>() {
-					@Override
-					public Long fold(Long accumulator, Long value) throws Exception {
-						return null;
-					}
-				});
-
-		windowed.addSink(new NoOpSink<Long>());
-
-		DataStreamSink<Long> sink = map.addSink(new SinkFunction<Long>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void invoke(Long value) throws Exception {
-			}
-		});
-
-		assertEquals(1, env.getStreamGraph().getStreamNode(src.getId()).getParallelism());
-		assertEquals(10, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
-		assertEquals(1, env.getStreamGraph().getStreamNode(windowed.getId()).getParallelism());
-		assertEquals(10,
-				env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
-
-		env.setParallelism(7);
-
-		// Some parts, such as windowing rely on the fact that previous operators have a parallelism
-		// set when instantiating the Discretizer. This would break if we dynamically changed
-		// the parallelism of operations when changing the setting on the Execution Environment.
-		assertEquals(1, env.getStreamGraph().getStreamNode(src.getId()).getParallelism());
-		assertEquals(10, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
-		assertEquals(1, env.getStreamGraph().getStreamNode(windowed.getId()).getParallelism());
-		assertEquals(10, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
-
-		try {
-			src.setParallelism(3);
-			fail();
-		} catch (IllegalArgumentException success) {
-			// do nothing
-		}
-
-		DataStreamSource<Long> parallelSource = env.generateSequence(0, 0);
-		parallelSource.addSink(new NoOpSink<Long>());
-		assertEquals(7, env.getStreamGraph().getStreamNode(parallelSource.getId()).getParallelism());
-
-		parallelSource.setParallelism(3);
-		assertEquals(3, env.getStreamGraph().getStreamNode(parallelSource.getId()).getParallelism());
-
-		map.setParallelism(2);
-		assertEquals(2, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
-
-		sink.setParallelism(4);
-		assertEquals(4, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
-	}
-
-	@Test
-	public void testTypeInfo() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Long> src1 = env.generateSequence(0, 0);
-		assertEquals(TypeExtractor.getForClass(Long.class), src1.getType());
-
-		DataStream<Tuple2<Integer, String>> map = src1.map(new MapFunction<Long, Tuple2<Integer, String>>() {
-			@Override
-			public Tuple2<Integer, String> map(Long value) throws Exception {
-				return null;
-			}
-		});
-
-		assertEquals(TypeExtractor.getForObject(new Tuple2<>(0, "")), map.getType());
-
-		DataStream<String> window = map
-				.windowAll(GlobalWindows.create())
-				.trigger(PurgingTrigger.of(CountTrigger.of(5)))
-				.apply(new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>() {
-					@Override
-					public void apply(GlobalWindow window,
-							Iterable<Tuple2<Integer, String>> values,
-							Collector<String> out) throws Exception {
-
-					}
-				});
-
-		assertEquals(TypeExtractor.getForClass(String.class), window.getType());
-
-		DataStream<CustomPOJO> flatten = window
-				.windowAll(GlobalWindows.create())
-				.trigger(PurgingTrigger.of(CountTrigger.of(5)))
-				.fold(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public CustomPOJO fold(CustomPOJO accumulator, String value) throws Exception {
-						return null;
-					}
-				});
-
-		assertEquals(TypeExtractor.getForClass(CustomPOJO.class), flatten.getType());
-	}
-
-	@Test
-	public void operatorTest() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Long> src = env.generateSequence(0, 0);
-
-		MapFunction<Long, Integer> mapFunction = new MapFunction<Long, Integer>() {
-			@Override
-			public Integer map(Long value) throws Exception {
-				return null;
-			}
-		};
-		DataStream<Integer> map = src.map(mapFunction);
-		map.addSink(new NoOpSink<Integer>());
-		assertEquals(mapFunction, getFunctionForDataStream(map));
-
-
-		FlatMapFunction<Long, Integer> flatMapFunction = new FlatMapFunction<Long, Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void flatMap(Long value, Collector<Integer> out) throws Exception {
-			}
-		};
-		DataStream<Integer> flatMap = src.flatMap(flatMapFunction);
-		flatMap.addSink(new NoOpSink<Integer>());
-		assertEquals(flatMapFunction, getFunctionForDataStream(flatMap));
-
-		FilterFunction<Integer> filterFunction = new FilterFunction<Integer>() {
-			@Override
-			public boolean filter(Integer value) throws Exception {
-				return false;
-			}
-		};
-
-		DataStream<Integer> unionFilter = map.union(flatMap)
-				.filter(filterFunction);
-
-		unionFilter.addSink(new NoOpSink<Integer>());
-
-		assertEquals(filterFunction, getFunctionForDataStream(unionFilter));
-
-		try {
-			env.getStreamGraph().getStreamEdge(map.getId(), unionFilter.getId());
-		} catch (RuntimeException e) {
-			fail(e.getMessage());
-		}
-
-		try {
-			env.getStreamGraph().getStreamEdge(flatMap.getId(), unionFilter.getId());
-		} catch (RuntimeException e) {
-			fail(e.getMessage());
-		}
-
-		OutputSelector<Integer> outputSelector = new OutputSelector<Integer>() {
-			@Override
-			public Iterable<String> select(Integer value) {
-				return null;
-			}
-		};
-
-		SplitStream<Integer> split = unionFilter.split(outputSelector);
-		split.select("dummy").addSink(new NoOpSink<Integer>());
-		List<OutputSelector<?>> outputSelectors = env.getStreamGraph().getStreamNode(unionFilter.getId()).getOutputSelectors();
-		assertEquals(1, outputSelectors.size());
-		assertEquals(outputSelector, outputSelectors.get(0));
-
-		DataStream<Integer> select = split.select("a");
-		DataStreamSink<Integer> sink = select.print();
-
-		StreamEdge splitEdge = env.getStreamGraph().getStreamEdge(unionFilter.getId(), sink.getTransformation().getId());
-		assertEquals("a", splitEdge.getSelectedNames().get(0));
-
-		ConnectedStreams<Integer, Integer> connect = map.connect(flatMap);
-		CoMapFunction<Integer, Integer, String> coMapper = new CoMapFunction<Integer, Integer, String>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String map1(Integer value) {
-				return null;
-			}
-
-			@Override
-			public String map2(Integer value) {
-				return null;
-			}
-		};
-		DataStream<String> coMap = connect.map(coMapper);
-		coMap.addSink(new NoOpSink<String>());
-		assertEquals(coMapper, getFunctionForDataStream(coMap));
-
-		try {
-			env.getStreamGraph().getStreamEdge(map.getId(), coMap.getId());
-		} catch (RuntimeException e) {
-			fail(e.getMessage());
-		}
-
-		try {
-			env.getStreamGraph().getStreamEdge(flatMap.getId(), coMap.getId());
-		} catch (RuntimeException e) {
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void sinkKeyTest() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSink<Long> sink = env.generateSequence(1, 100).print();
-		assertTrue(env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getStatePartitioner() == null);
-		assertTrue(env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof ForwardPartitioner);
-
-		KeySelector<Long, Long> key1 = new KeySelector<Long, Long>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Long getKey(Long value) throws Exception {
-				return (long) 0;
-			}
-		};
-
-		DataStreamSink<Long> sink2 = env.generateSequence(1, 100).keyBy(key1).print();
-
-		assertNotNull(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner());
-		assertNotNull(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStateKeySerializer());
-		assertNotNull(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStateKeySerializer());
-		assertEquals(key1, env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner());
-		assertTrue(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof HashPartitioner);
-
-		KeySelector<Long, Long> key2 = new KeySelector<Long, Long>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Long getKey(Long value) throws Exception {
-				return (long) 0;
-			}
-		};
-
-		DataStreamSink<Long> sink3 = env.generateSequence(1, 100).keyBy(key2).print();
-
-		assertTrue(env.getStreamGraph().getStreamNode(sink3.getTransformation().getId()).getStatePartitioner() != null);
-		assertEquals(key2, env.getStreamGraph().getStreamNode(sink3.getTransformation().getId()).getStatePartitioner());
-		assertTrue(env.getStreamGraph().getStreamNode(sink3.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof HashPartitioner);
-	}
-
-	@Test
-	public void testChannelSelectors() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Long> src = env.generateSequence(0, 0);
-
-		DataStream<Long> broadcast = src.broadcast();
-		DataStreamSink<Long> broadcastSink = broadcast.print();
-		StreamPartitioner<?> broadcastPartitioner =
-				env.getStreamGraph().getStreamEdge(src.getId(),
-						broadcastSink.getTransformation().getId()).getPartitioner();
-		assertTrue(broadcastPartitioner instanceof BroadcastPartitioner);
-
-		DataStream<Long> shuffle = src.shuffle();
-		DataStreamSink<Long> shuffleSink = shuffle.print();
-		StreamPartitioner<?> shufflePartitioner =
-				env.getStreamGraph().getStreamEdge(src.getId(),
-						shuffleSink.getTransformation().getId()).getPartitioner();
-		assertTrue(shufflePartitioner instanceof ShufflePartitioner);
-
-		DataStream<Long> forward = src.forward();
-		DataStreamSink<Long> forwardSink = forward.print();
-		StreamPartitioner<?> forwardPartitioner =
-				env.getStreamGraph().getStreamEdge(src.getId(),
-						forwardSink.getTransformation().getId()).getPartitioner();
-		assertTrue(forwardPartitioner instanceof ForwardPartitioner);
-
-		DataStream<Long> rebalance = src.rebalance();
-		DataStreamSink<Long> rebalanceSink = rebalance.print();
-		StreamPartitioner<?> rebalancePartitioner =
-				env.getStreamGraph().getStreamEdge(src.getId(),
-						rebalanceSink.getTransformation().getId()).getPartitioner();
-		assertTrue(rebalancePartitioner instanceof RebalancePartitioner);
-
-		DataStream<Long> global = src.global();
-		DataStreamSink<Long> globalSink = global.print();
-		StreamPartitioner<?> globalPartitioner =
-				env.getStreamGraph().getStreamEdge(src.getId(),
-						globalSink.getTransformation().getId()).getPartitioner();
-		assertTrue(globalPartitioner instanceof GlobalPartitioner);
-	}
-
-	/////////////////////////////////////////////////////////////
-	// Utilities
-	/////////////////////////////////////////////////////////////
-
-	private static StreamOperator<?> getOperatorForDataStream(DataStream<?> dataStream) {
-		StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
-		StreamGraph streamGraph = env.getStreamGraph();
-		return streamGraph.getStreamNode(dataStream.getId()).getOperator();
-	}
-
-	private static Function getFunctionForDataStream(DataStream<?> dataStream) {
-		AbstractUdfStreamOperator<?, ?> operator =
-				(AbstractUdfStreamOperator<?, ?>) getOperatorForDataStream(dataStream);
-		return operator.getUserFunction();
-	}
-
-	private static Integer createDownStreamId(DataStream<?> dataStream) {
-		return dataStream.print().getTransformation().getId();
-	}
-
-	private static boolean isKeyed(DataStream<?> dataStream) {
-		return dataStream instanceof KeyedStream;
-	}
-
-	@SuppressWarnings("rawtypes,unchecked")
-	private static Integer createDownStreamId(ConnectedStreams dataStream) {
-		SingleOutputStreamOperator<?, ?> coMap = dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Object map1(Tuple2<Long, Long> value) {
-				return null;
-			}
-
-			@Override
-			public Object map2(Tuple2<Long, Long> value) {
-				return null;
-			}
-		});
-		coMap.addSink(new NoOpSink());
-		return coMap.getId();
-	}
-
-	private static boolean isKeyed(ConnectedStreams<?, ?> dataStream) {
-		return (dataStream.getFirstInput() instanceof KeyedStream && dataStream.getSecondInput() instanceof KeyedStream);
-	}
-
-	private static boolean isPartitioned(StreamEdge edge) {
-		return edge.getPartitioner() instanceof HashPartitioner;
-	}
-
-	private static boolean isCustomPartitioned(StreamEdge edge) {
-		return edge.getPartitioner() instanceof CustomPartitionerWrapper;
-	}
-
-	private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Long getKey(Tuple2<Long, Long> value) throws Exception {
-			return value.f0;
-		}
-	}
-
-	public static class CustomPOJO {
-		private String s;
-		private int i;
-
-		public CustomPOJO() {
-		}
-
-		public void setS(String s) {
-			this.s = s;
-		}
-
-		public void setI(int i) {
-			this.i = i;
-		}
-
-		public String getS() {
-			return s;
-		}
-
-		public int getI() {
-			return i;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
deleted file mode 100644
index bd97e84..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ /dev/null
@@ -1,614 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.streaming.api;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.IterativeStream;
-import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamNode;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
-import org.apache.flink.streaming.util.EvenOddOutputSelector;
-import org.apache.flink.streaming.util.NoOpIntMap;
-import org.apache.flink.streaming.util.ReceiveCheckNoOpSink;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings({ "unchecked", "unused", "serial" })
-public class IterateTest extends StreamingMultipleProgramsTestBase {
-
-	private static boolean iterated[];
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testIncorrectParallelism() throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Integer> source = env.fromElements(1, 10);
-
-		IterativeStream<Integer> iter1 = source.iterate();
-		SingleOutputStreamOperator<Integer, ?> map1 = iter1.map(NoOpIntMap);
-		iter1.closeWith(map1).print();
-	}
-
-	@Test
-	public void testDoubleClosing() throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// introduce dummy mapper to get to correct parallelism
-		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
-
-		IterativeStream<Integer> iter1 = source.iterate();
-
-		iter1.closeWith(iter1.map(NoOpIntMap));
-		iter1.closeWith(iter1.map(NoOpIntMap));
-	}
-
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testDifferingParallelism() throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// introduce dummy mapper to get to correct parallelism
-		DataStream<Integer> source = env.fromElements(1, 10)
-				.map(NoOpIntMap);
-
-		IterativeStream<Integer> iter1 = source.iterate();
-
-
-		iter1.closeWith(iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2));
-
-	}
-
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testCoDifferingParallelism() throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// introduce dummy mapper to get to correct parallelism
-		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
-
-		ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
-				Integer.class);
-
-
-		coIter.closeWith(coIter.map(NoOpIntCoMap).setParallelism(DEFAULT_PARALLELISM / 2));
-
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testClosingFromOutOfLoop() throws Exception {
-
-		// this test verifies that we cannot close an iteration with a DataStream that does not
-		// have the iteration in its predecessors
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// introduce dummy mapper to get to correct parallelism
-		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
-
-		IterativeStream<Integer> iter1 = source.iterate();
-		IterativeStream<Integer> iter2 = source.iterate();
-
-
-		iter2.closeWith(iter1.map(NoOpIntMap));
-
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testCoIterClosingFromOutOfLoop() throws Exception {
-
-		// this test verifies that we cannot close an iteration with a DataStream that does not
-		// have the iteration in its predecessors
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// introduce dummy mapper to get to correct parallelism
-		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
-
-		IterativeStream<Integer> iter1 = source.iterate();
-		ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
-				Integer.class);
-
-
-		coIter.closeWith(iter1.map(NoOpIntMap));
-
-	}
-
-	@Test(expected = IllegalStateException.class)
-	public void testExecutionWithEmptyIteration() throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
-
-		IterativeStream<Integer> iter1 = source.iterate();
-
-		iter1.map(NoOpIntMap).print();
-
-		env.execute();
-	}
-
-	@Test
-	public void testImmutabilityWithCoiteration() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap); // for rebalance
-
-		IterativeStream<Integer> iter1 = source.iterate();
-		// Calling withFeedbackType should create a new iteration
-		ConnectedIterativeStreams<Integer, String> iter2 = iter1.withFeedbackType(String.class);
-
-		iter1.closeWith(iter1.map(NoOpIntMap)).print();
-		iter2.closeWith(iter2.map(NoOpCoMap)).print();
-
-		StreamGraph graph = env.getStreamGraph();
-
-		assertEquals(2, graph.getIterationSourceSinkPairs().size());
-
-		for (Tuple2<StreamNode, StreamNode> sourceSinkPair: graph.getIterationSourceSinkPairs()) {
-			assertEquals(sourceSinkPair.f0.getOutEdges().get(0).getTargetVertex(), sourceSinkPair.f1.getInEdges().get(0).getSourceVertex());
-		}
-	}
-
-	@Test
-	public void testmultipleHeadsTailsSimple() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5)
-				.shuffle()
-				.map(NoOpIntMap).name("ParallelizeMapShuffle");
-		DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
-				.map(NoOpIntMap).name("ParallelizeMapRebalance");
-
-		IterativeStream<Integer> iter1 = source1.union(source2).iterate();
-
-		DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("IterRebalanceMap").setParallelism(DEFAULT_PARALLELISM / 2);
-		DataStream<Integer> head2 = iter1.map(NoOpIntMap).name("IterForwardMap");
-		DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).addSink(new ReceiveCheckNoOpSink<Integer>());
-		DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
-
-		SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
-				.map(NoOpIntMap).name("EvenOddSourceMap")
-				.split(new EvenOddOutputSelector());
-
-		iter1.closeWith(source3.select("even").union(
-				head1.rebalance().map(NoOpIntMap).broadcast(), head2.shuffle()));
-
-		StreamGraph graph = env.getStreamGraph();
-
-		JobGraph jg = graph.getJobGraph();
-
-		assertEquals(1, graph.getIterationSourceSinkPairs().size());
-
-		Tuple2<StreamNode, StreamNode> sourceSinkPair = graph.getIterationSourceSinkPairs().iterator().next();
-		StreamNode itSource = sourceSinkPair.f0;
-		StreamNode itSink = sourceSinkPair.f1;
-
-		assertEquals(4, itSource.getOutEdges().size());
-		assertEquals(3, itSink.getInEdges().size());
-
-		assertEquals(itSource.getParallelism(), itSink.getParallelism());
-
-		for (StreamEdge edge : itSource.getOutEdges()) {
-			if (edge.getTargetVertex().getOperatorName().equals("IterRebalanceMap")) {
-				assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
-			} else if (edge.getTargetVertex().getOperatorName().equals("IterForwardMap")) {
-				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
-			}
-		}
-		for (StreamEdge edge : itSink.getInEdges()) {
-			if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("ParallelizeMapShuffle")) {
-				assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
-			}
-
-			if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("ParallelizeMapForward")) {
-				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
-			}
-
-			if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("EvenOddSourceMap")) {
-				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
-				assertTrue(edge.getSelectedNames().contains("even"));
-			}
-		}
-
-		// Test co-location
-
-		JobVertex itSource1 = null;
-		JobVertex itSink1 = null;
-
-		for (JobVertex vertex : jg.getVertices()) {
-			if (vertex.getName().contains("IterationSource")) {
-				itSource1 = vertex;
-			} else if (vertex.getName().contains("IterationSink")) {
-
-				itSink1 = vertex;
-
-			}
-		}
-
-		assertTrue(itSource1.getCoLocationGroup() != null);
-		assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup());
-	}
-
-	@Test
-	public void testmultipleHeadsTailsWithTailPartitioning() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5)
-				.shuffle()
-				.map(NoOpIntMap);
-
-		DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
-				.map(NoOpIntMap);
-
-		IterativeStream<Integer> iter1 = source1.union(source2).iterate();
-
-		DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("map1");
-		DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).rebalance().name(
-				"shuffle");
-		DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2)
-				.addSink(new ReceiveCheckNoOpSink<Integer>());
-		DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
-
-		SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
-				.map(NoOpIntMap)
-				.name("split")
-				.split(new EvenOddOutputSelector());
-
-		iter1.closeWith(
-				source3.select("even").union(
-						head1.map(NoOpIntMap).broadcast().name("bc"),
-						head2.map(NoOpIntMap).shuffle()));
-
-		StreamGraph graph = env.getStreamGraph();
-
-		JobGraph jg = graph.getJobGraph();
-
-		assertEquals(1, graph.getIterationSourceSinkPairs().size());
-
-		Tuple2<StreamNode, StreamNode> sourceSinkPair = graph.getIterationSourceSinkPairs().iterator().next();
-		StreamNode itSource = sourceSinkPair.f0;
-		StreamNode itSink = sourceSinkPair.f1;
-
-		assertEquals(4, itSource.getOutEdges().size());
-		assertEquals(3, itSink.getInEdges().size());
-
-
-		assertEquals(itSource.getParallelism(), itSink.getParallelism());
-
-		for (StreamEdge edge : itSource.getOutEdges()) {
-			if (edge.getTargetVertex().getOperatorName().equals("map1")) {
-				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
-				assertEquals(4, edge.getTargetVertex().getParallelism());
-			} else if (edge.getTargetVertex().getOperatorName().equals("shuffle")) {
-				assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
-				assertEquals(2, edge.getTargetVertex().getParallelism());
-			}
-		}
-		for (StreamEdge edge : itSink.getInEdges()) {
-			String tailName = edge.getSourceVertex().getOperatorName();
-			if (tailName.equals("split")) {
-				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
-				assertTrue(edge.getSelectedNames().contains("even"));
-			} else if (tailName.equals("bc")) {
-				assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner);
-			} else if (tailName.equals("shuffle")) {
-				assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
-			}
-		}
-
-		// Test co-location
-
-		JobVertex itSource1 = null;
-		JobVertex itSink1 = null;
-
-		for (JobVertex vertex : jg.getVertices()) {
-			if (vertex.getName().contains("IterationSource")) {
-				itSource1 = vertex;
-			} else if (vertex.getName().contains("IterationSink")) {
-				itSink1 = vertex;
-			}
-		}
-
-		assertTrue(itSource1.getCoLocationGroup() != null);
-		assertTrue(itSink1.getCoLocationGroup() != null);
-
-		assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup());
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test
-	public void testSimpleIteration() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		iterated = new boolean[DEFAULT_PARALLELISM];
-
-		DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
-				.map(NoOpBoolMap).name("ParallelizeMap");
-
-		IterativeStream<Boolean> iteration = source.iterate(3000);
-
-		DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap);
-
-		iteration.map(NoOpBoolMap).addSink(new ReceiveCheckNoOpSink());
-
-		iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink());
-
-		env.execute();
-
-		for (boolean iter : iterated) {
-			assertTrue(iter);
-		}
-
-	}
-
-	@Test
-	public void testCoIteration() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(2);
-
-		DataStream<String> otherSource = env.fromElements("1000", "2000")
-				.map(NoOpStrMap).name("ParallelizeMap");
-
-
-		ConnectedIterativeStreams<Integer, String> coIt = env.fromElements(0, 0)
-				.map(NoOpIntMap).name("ParallelizeMap")
-				.iterate(2000)
-				.withFeedbackType("String");
-
-		try {
-			coIt.keyBy(1, 2);
-			fail();
-		} catch (InvalidProgramException e) {
-			// this is expected
-		}
-
-		DataStream<String> head = coIt
-				.flatMap(new RichCoFlatMapFunction<Integer, String, String>() {
-
-					private static final long serialVersionUID = 1L;
-					boolean seenFromSource = false;
-
-					@Override
-					public void flatMap1(Integer value, Collector<String> out) throws Exception {
-						out.collect(((Integer) (value + 1)).toString());
-					}
-
-					@Override
-					public void flatMap2(String value, Collector<String> out) throws Exception {
-						Integer intVal = Integer.valueOf(value);
-						if (intVal < 2) {
-							out.collect(((Integer) (intVal + 1)).toString());
-						}
-						if (intVal == 1000 || intVal == 2000) {
-							seenFromSource = true;
-						}
-					}
-
-					@Override
-					public void close() {
-						assertTrue(seenFromSource);
-					}
-				});
-
-		coIt.map(new CoMapFunction<Integer, String, String>() {
-
-			@Override
-			public String map1(Integer value) throws Exception {
-				return value.toString();
-			}
-
-			@Override
-			public String map2(String value) throws Exception {
-				return value;
-			}
-		}).addSink(new ReceiveCheckNoOpSink<String>());
-
-		coIt.closeWith(head.broadcast().union(otherSource));
-
-		head.addSink(new TestSink()).setParallelism(1);
-
-		assertEquals(1, env.getStreamGraph().getIterationSourceSinkPairs().size());
-
-		env.execute();
-
-		Collections.sort(TestSink.collected);
-		assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected);
-	}
-
-	@Test
-	public void testGroupByFeedback() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(DEFAULT_PARALLELISM - 1);
-
-		KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() {
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value % 3;
-			}
-		};
-
-		DataStream<Integer> source = env.fromElements(1, 2, 3)
-				.map(NoOpIntMap).name("ParallelizeMap");
-
-		IterativeStream<Integer> it = source.keyBy(key).iterate(3000);
-
-		DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>() {
-
-			int received = 0;
-			int key = -1;
-
-			@Override
-			public void flatMap(Integer value, Collector<Integer> out) throws Exception {
-				received++;
-				if (key == -1) {
-					key = value % 3;
-				} else {
-					assertEquals(key, value % 3);
-				}
-				if (value > 0) {
-					out.collect(value - 1);
-				}
-			}
-
-			@Override
-			public void close() {
-				assertTrue(received > 1);
-			}
-		});
-
-		it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new ReceiveCheckNoOpSink<Integer>());
-
-		env.execute();
-	}
-
-	@SuppressWarnings("deprecation")
-	@Test
-	public void testWithCheckPointing() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		env.enableCheckpointing();
-
-		DataStream<Boolean> source = env .fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
-				.map(NoOpBoolMap).name("ParallelizeMap");
-
-
-		IterativeStream<Boolean> iteration = source.iterate(3000);
-
-		iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>());
-
-		try {
-			env.execute();
-
-			// this statement should never be reached
-			fail();
-		} catch (UnsupportedOperationException e) {
-			// expected behaviour
-		}
-
-		// Test force checkpointing
-
-		try {
-			env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false);
-			env.execute();
-
-			// this statement should never be reached
-			fail();
-		} catch (UnsupportedOperationException e) {
-			// expected behaviour
-		}
-
-		env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true);
-		env.getStreamGraph().getJobGraph();
-	}
-
-	public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
-		public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
-			int indx = getRuntimeContext().getIndexOfThisSubtask();
-			if (value) {
-				iterated[indx] = true;
-			} else {
-				out.collect(true);
-			}
-		}
-	}
-
-	public static CoMapFunction<Integer, String, String> NoOpCoMap = new CoMapFunction<Integer, String, String>() {
-
-		public String map1(Integer value) throws Exception {
-			return value.toString();
-		}
-
-		public String map2(String value) throws Exception {
-			return value;
-		}
-	};
-
-	public static MapFunction<Integer, Integer> NoOpIntMap = new NoOpIntMap();
-
-	public static MapFunction<String, String> NoOpStrMap = new MapFunction<String, String>() {
-
-		public String map(String value) throws Exception {
-			return value;
-		}
-
-	};
-
-	public static CoMapFunction<Integer, Integer, Integer> NoOpIntCoMap = new CoMapFunction<Integer, Integer, Integer>() {
-
-		public Integer map1(Integer value) throws Exception {
-			return value;
-		}
-
-		public Integer map2(Integer value) throws Exception {
-			return value;
-		}
-
-	};
-
-	public static MapFunction<Boolean, Boolean> NoOpBoolMap = new MapFunction<Boolean, Boolean>() {
-
-		public Boolean map(Boolean value) throws Exception {
-			return value;
-		}
-
-	};
-
-	public static class TestSink implements SinkFunction<String> {
-
-		private static final long serialVersionUID = 1L;
-		public static List<String> collected = new ArrayList<String>();
-
-		@Override
-		public void invoke(String value) throws Exception {
-			collected.add(value);
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
deleted file mode 100644
index 8525d37..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Test;
-
-public class OutputSplitterTest extends StreamingMultipleProgramsTestBase {
-
-	private static ArrayList<Integer> expectedSplitterResult = new ArrayList<Integer>();
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testOnMergedDataStream() throws Exception {
-		TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
-		TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-		env.setBufferTimeout(1);
-
-		DataStream<Integer> d1 = env.fromElements(0, 2, 4, 6, 8);
-		DataStream<Integer> d2 = env.fromElements(1, 3, 5, 7, 9);
-
-		d1 = d1.union(d2);
-
-		d1.split(new OutputSelector<Integer>() {
-			private static final long serialVersionUID = 8354166915727490130L;
-
-			@Override
-			public Iterable<String> select(Integer value) {
-				List<String> s = new ArrayList<String>();
-				if (value > 4) {
-					s.add(">");
-				} else {
-					s.add("<");
-				}
-				return s;
-			}
-		}).select(">").addSink(splitterResultSink1);
-
-		d1.split(new OutputSelector<Integer>() {
-			private static final long serialVersionUID = -6822487543355994807L;
-
-			@Override
-			public Iterable<String> select(Integer value) {
-				List<String> s = new ArrayList<String>();
-				if (value % 3 == 0) {
-					s.add("yes");
-				} else {
-					s.add("no");
-				}
-				return s;
-			}
-		}).select("yes").addSink(splitterResultSink2);
-		env.execute();
-
-		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(5, 6, 7, 8, 9));
-		assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
-
-		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(0, 3, 6, 9));
-		assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
-	}
-
-	@Test
-	public void testOnSingleDataStream() throws Exception {
-		TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
-		TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-		env.setBufferTimeout(1);
-
-		DataStream<Integer> ds = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-
-		ds.split(new OutputSelector<Integer>() {
-			private static final long serialVersionUID = 2524335410904414121L;
-
-			@Override
-			public Iterable<String> select(Integer value) {
-				List<String> s = new ArrayList<String>();
-				if (value % 2 == 0) {
-					s.add("even");
-				} else {
-					s.add("odd");
-				}
-				return s;
-			}
-		}).select("even").addSink(splitterResultSink1);
-
-		ds.split(new OutputSelector<Integer>() {
-
-			private static final long serialVersionUID = -511693919586034092L;
-
-			@Override
-			public Iterable<String> select(Integer value) {
-				List<String> s = new ArrayList<String>();
-				if (value % 4 == 0) {
-					s.add("yes");
-				} else {
-					s.add("no");
-				}
-				return s;
-			}
-		}).select("yes").addSink(splitterResultSink2);
-		env.execute();
-
-		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(0, 2, 4, 6, 8));
-		assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
-
-		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(0, 4, 8));
-		assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
deleted file mode 100644
index a6c6936..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.NoOpIntMap;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-
-import org.junit.Test;
-
-/**
- * IT case that tests the different stream partitioning schemes.
- */
-public class PartitionerTest extends StreamingMultipleProgramsTestBase {
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testForwardFailsLowToHighParallelism() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Integer> src = env.fromElements(1, 2, 3);
-
-		// this doesn't work because it goes from 1 to 3
-		src.forward().map(new NoOpIntMap());
-
-		env.execute();
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testForwardFailsHightToLowParallelism() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// this does a rebalance that works
-		DataStream<Integer> src = env.fromElements(1, 2, 3).map(new NoOpIntMap());
-
-		// this doesn't work because it goes from 3 to 1
-		src.forward().map(new NoOpIntMap()).setParallelism(1);
-
-		env.execute();
-	}
-
-
-	@Test
-	public void partitionerTest() {
-
-		TestListResultSink<Tuple2<Integer, String>> hashPartitionResultSink =
-				new TestListResultSink<Tuple2<Integer, String>>();
-		TestListResultSink<Tuple2<Integer, String>> customPartitionResultSink =
-				new TestListResultSink<Tuple2<Integer, String>>();
-		TestListResultSink<Tuple2<Integer, String>> broadcastPartitionResultSink =
-				new TestListResultSink<Tuple2<Integer, String>>();
-		TestListResultSink<Tuple2<Integer, String>> forwardPartitionResultSink =
-				new TestListResultSink<Tuple2<Integer, String>>();
-		TestListResultSink<Tuple2<Integer, String>> rebalancePartitionResultSink =
-				new TestListResultSink<Tuple2<Integer, String>>();
-		TestListResultSink<Tuple2<Integer, String>> globalPartitionResultSink =
-				new TestListResultSink<Tuple2<Integer, String>>();
-
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(3);
-
-		DataStream<Tuple1<String>> src = env.fromElements(
-				new Tuple1<String>("a"),
-				new Tuple1<String>("b"),
-				new Tuple1<String>("b"),
-				new Tuple1<String>("a"),
-				new Tuple1<String>("a"),
-				new Tuple1<String>("c"),
-				new Tuple1<String>("a")
-		);
-
-		// partition by hash
-		src
-				.partitionByHash(0)
-				.map(new SubtaskIndexAssigner())
-				.addSink(hashPartitionResultSink);
-
-		// partition custom
-		DataStream<Tuple2<Integer, String>> partitionCustom = src
-				.partitionCustom(new Partitioner<String>() {
-					@Override
-					public int partition(String key, int numPartitions) {
-						if (key.equals("c")) {
-							return 2;
-						} else {
-							return 0;
-						}
-					}
-				}, 0)
-				.map(new SubtaskIndexAssigner());
-
-		partitionCustom.addSink(customPartitionResultSink);
-
-		// partition broadcast
-		src.broadcast().map(new SubtaskIndexAssigner()).addSink(broadcastPartitionResultSink);
-
-		// partition rebalance
-		src.rebalance().map(new SubtaskIndexAssigner()).addSink(rebalancePartitionResultSink);
-
-		// partition forward
-		src.map(new MapFunction<Tuple1<String>, Tuple1<String>>() {
-			private static final long serialVersionUID = 1L;
-			@Override
-			public Tuple1<String> map(Tuple1<String> value) throws Exception {
-				return value;
-			}
-		})
-				.forward()
-				.map(new SubtaskIndexAssigner())
-				.addSink(forwardPartitionResultSink);
-
-		// partition global
-		src.global().map(new SubtaskIndexAssigner()).addSink(globalPartitionResultSink);
-
-		try {
-			env.execute();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-
-		List<Tuple2<Integer, String>> hashPartitionResult = hashPartitionResultSink.getResult();
-		List<Tuple2<Integer, String>> customPartitionResult = customPartitionResultSink.getResult();
-		List<Tuple2<Integer, String>> broadcastPartitionResult = broadcastPartitionResultSink.getResult();
-		List<Tuple2<Integer, String>> forwardPartitionResult = forwardPartitionResultSink.getResult();
-		List<Tuple2<Integer, String>> rebalancePartitionResult = rebalancePartitionResultSink.getResult();
-		List<Tuple2<Integer, String>> globalPartitionResult = globalPartitionResultSink.getResult();
-
-		verifyHashPartitioning(hashPartitionResult);
-		verifyCustomPartitioning(customPartitionResult);
-		verifyBroadcastPartitioning(broadcastPartitionResult);
-		verifyRebalancePartitioning(forwardPartitionResult);
-		verifyRebalancePartitioning(rebalancePartitionResult);
-		verifyGlobalPartitioning(globalPartitionResult);
-	}
-
-	private static void verifyHashPartitioning(List<Tuple2<Integer, String>> hashPartitionResult) {
-		HashMap<String, Integer> verifier = new HashMap<String, Integer>();
-		for (Tuple2<Integer, String> elem : hashPartitionResult) {
-			Integer subtaskIndex = verifier.get(elem.f1);
-			if (subtaskIndex == null) {
-				verifier.put(elem.f1, elem.f0);
-			} else if (subtaskIndex != elem.f0) {
-				fail();
-			}
-		}
-	}
-
-	private static void verifyCustomPartitioning(List<Tuple2<Integer, String>> customPartitionResult) {
-		for (Tuple2<Integer, String> stringWithSubtask : customPartitionResult) {
-			if (stringWithSubtask.f1.equals("c")) {
-				assertEquals(new Integer(2), stringWithSubtask.f0);
-			} else {
-				assertEquals(new Integer(0), stringWithSubtask.f0);
-			}
-		}
-	}
-
-	private static void verifyBroadcastPartitioning(List<Tuple2<Integer, String>> broadcastPartitionResult) {
-		List<Tuple2<Integer, String>> expected = Arrays.asList(
-				new Tuple2<Integer, String>(0, "a"),
-				new Tuple2<Integer, String>(0, "b"),
-				new Tuple2<Integer, String>(0, "b"),
-				new Tuple2<Integer, String>(0, "a"),
-				new Tuple2<Integer, String>(0, "a"),
-				new Tuple2<Integer, String>(0, "c"),
-				new Tuple2<Integer, String>(0, "a"),
-				new Tuple2<Integer, String>(1, "a"),
-				new Tuple2<Integer, String>(1, "b"),
-				new Tuple2<Integer, String>(1, "b"),
-				new Tuple2<Integer, String>(1, "a"),
-				new Tuple2<Integer, String>(1, "a"),
-				new Tuple2<Integer, String>(1, "c"),
-				new Tuple2<Integer, String>(1, "a"),
-				new Tuple2<Integer, String>(2, "a"),
-				new Tuple2<Integer, String>(2, "b"),
-				new Tuple2<Integer, String>(2, "b"),
-				new Tuple2<Integer, String>(2, "a"),
-				new Tuple2<Integer, String>(2, "a"),
-				new Tuple2<Integer, String>(2, "c"),
-				new Tuple2<Integer, String>(2, "a"));
-
-		assertEquals(
-				new HashSet<Tuple2<Integer, String>>(expected),
-				new HashSet<Tuple2<Integer, String>>(broadcastPartitionResult));
-	}
-
-	private static void verifyRebalancePartitioning(List<Tuple2<Integer, String>> rebalancePartitionResult) {
-		List<Tuple2<Integer, String>> expected = Arrays.asList(
-				new Tuple2<Integer, String>(0, "a"),
-				new Tuple2<Integer, String>(1, "b"),
-				new Tuple2<Integer, String>(2, "b"),
-				new Tuple2<Integer, String>(0, "a"),
-				new Tuple2<Integer, String>(1, "a"),
-				new Tuple2<Integer, String>(2, "c"),
-				new Tuple2<Integer, String>(0, "a"));
-
-		assertEquals(
-				new HashSet<Tuple2<Integer, String>>(expected),
-				new HashSet<Tuple2<Integer, String>>(rebalancePartitionResult));
-	}
-
-	private static void verifyGlobalPartitioning(List<Tuple2<Integer, String>> globalPartitionResult) {
-		List<Tuple2<Integer, String>> expected = Arrays.asList(
-				new Tuple2<Integer, String>(0, "a"),
-				new Tuple2<Integer, String>(0, "b"),
-				new Tuple2<Integer, String>(0, "b"),
-				new Tuple2<Integer, String>(0, "a"),
-				new Tuple2<Integer, String>(0, "a"),
-				new Tuple2<Integer, String>(0, "c"),
-				new Tuple2<Integer, String>(0, "a"));
-
-		assertEquals(
-				new HashSet<Tuple2<Integer, String>>(expected),
-				new HashSet<Tuple2<Integer, String>>(globalPartitionResult));
-	}
-
-	private static class SubtaskIndexAssigner extends RichMapFunction<Tuple1<String>, Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
-
-		private int indexOfSubtask;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			RuntimeContext runtimeContext = getRuntimeContext();
-			indexOfSubtask = runtimeContext.getIndexOfThisSubtask();
-		}
-
-		@Override
-		public Tuple2<Integer, String> map(Tuple1<String> value) throws Exception {
-			return new Tuple2<Integer, String>(indexOfSubtask, value.f0);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
deleted file mode 100644
index b53649a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
-import org.apache.flink.streaming.util.SourceFunctionUtil;
-import org.junit.Test;
-
-public class SourceFunctionTest {
-
-	@Test
-	public void fromElementsTest() throws Exception {
-		List<Integer> expectedList = Arrays.asList(1, 2, 3);
-		List<Integer> actualList = SourceFunctionUtil.runSourceFunction(CommonTestUtils.createCopySerializable(
-				new FromElementsFunction<Integer>(
-						IntSerializer.INSTANCE,
-						1,
-						2,
-						3)));
-		assertEquals(expectedList, actualList);
-	}
-
-	@Test
-	public void fromCollectionTest() throws Exception {
-		List<Integer> expectedList = Arrays.asList(1, 2, 3);
-		List<Integer> actualList = SourceFunctionUtil.runSourceFunction(
-				CommonTestUtils.createCopySerializable(new FromElementsFunction<Integer>(
-						IntSerializer.INSTANCE,
-						Arrays.asList(1, 2, 3))));
-		assertEquals(expectedList, actualList);
-	}
-
-	@Test
-	public void generateSequenceTest() throws Exception {
-		List<Long> expectedList = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L);
-		List<Long> actualList = SourceFunctionUtil.runSourceFunction(new StatefulSequenceSource(1,
-				7));
-		assertEquals(expectedList, actualList);
-	}
-
-	@Test
-	public void socketTextStreamTest() throws Exception {
-		// TODO: does not work because we cannot set the internal socket anymore
-//		List<String> expectedList = Arrays.asList("a", "b", "c");
-//		List<String> actualList = new ArrayList<String>();
-//
-//		byte[] data = { 'a', '\n', 'b', '\n', 'c', '\n' };
-//
-//		Socket socket = mock(Socket.class);
-//		when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data));
-//		when(socket.isClosed()).thenReturn(false);
-//		when(socket.isConnected()).thenReturn(true);
-//
-//		SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, '\n', 0);
-//		source.open(new Configuration());
-//		while (!source.reachedEnd()) {
-//			actualList.add(source.next());
-//		}
-//		assertEquals(expectedList, actualList);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
deleted file mode 100644
index 606259e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.SplittableIterator;
-import org.junit.Test;
-
-public class StreamExecutionEnvironmentTest extends StreamingMultipleProgramsTestBase {
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testFromCollectionParallelism() {
-		try {
-			TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-			DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-
-			try {
-				dataStream1.setParallelism(4);
-				fail("should throw an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			dataStream1.addSink(new NoOpSink<Integer>());
-	
-			DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(),
-					typeInfo).setParallelism(4);
-
-			dataStream2.addSink(new NoOpSink<Integer>());
-
-			String plan = env.getExecutionPlan();
-
-			assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
-			assertEquals("Parallelism of parallel collection source must be 4.",
-					4,
-					env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSources() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-			}
-
-			@Override
-			public void cancel() {
-			}
-		};
-		DataStreamSource<Integer> src1 = env.addSource(srcFun);
-		src1.addSink(new NoOpSink<Integer>());
-		assertEquals(srcFun, getFunctionFromDataSource(src1));
-
-		List<Long> list = Arrays.asList(0L, 1L, 2L);
-
-		DataStreamSource<Long> src2 = env.generateSequence(0, 2);
-		assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource);
-
-		DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
-		assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction);
-
-		DataStreamSource<Long> src4 = env.fromCollection(list);
-		assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction);
-	}
-
-	/////////////////////////////////////////////////////////////
-	// Utilities
-	/////////////////////////////////////////////////////////////
-
-
-	private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) {
-		StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
-		StreamGraph streamGraph = env.getStreamGraph();
-		return streamGraph.getStreamNode(dataStream.getId()).getOperator();
-	}
-
-	private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
-		dataStreamSource.addSink(new NoOpSink<T>());
-		AbstractUdfStreamOperator<?, ?> operator =
-				(AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource);
-		return (SourceFunction<T>) operator.getUserFunction();
-	}
-
-	public static class DummySplittableIterator<T> extends SplittableIterator<T> {
-		private static final long serialVersionUID = 1312752876092210499L;
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public Iterator<T>[] split(int numPartitions) {
-			return (Iterator<T>[]) new Iterator<?>[0];
-		}
-
-		@Override
-		public int getMaximumNumberOfSplits() {
-			return 0;
-		}
-
-		@Override
-		public boolean hasNext() {
-			return false;
-		}
-
-		@Override
-		public T next() {
-			throw new NoSuchElementException();
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
deleted file mode 100644
index 42febea..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase {
-
-	private String resultPath1;
-	private String resultPath2;
-	private String expected1;
-	private String expected2;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception {
-		resultPath1 = tempFolder.newFile().toURI().toString();
-		resultPath2 = tempFolder.newFile().toURI().toString();
-		expected1 = "";
-		expected2 = "";
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expected1, resultPath1);
-		compareResultsByLinesInMemory(expected2, resultPath2);
-	}
-
-	/**
-	 * Tests the proper functioning of the streaming fold operator. For this purpose, a stream
-	 * of Tuple2<Integer, Integer> is created. The stream is grouped according to the first tuple
-	 * value. Each group is folded where the second tuple value is summed up.
-	 *
-	 * @throws Exception
-	 */
-	@Test
-	public void testFoldOperation() throws Exception {
-		int numElements = 10;
-		int numKeys = 2;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements, numKeys));
-
-		SplitStream<Tuple2<Integer, Integer>> splittedResult = sourceStream
-			.keyBy(0)
-			.fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() {
-				@Override
-				public Integer fold(Integer accumulator, Tuple2<Integer, Integer> value) throws Exception {
-					return accumulator + value.f1;
-				}
-			}).map(new RichMapFunction<Integer, Tuple2<Integer, Integer>>() {
-				@Override
-				public Tuple2<Integer, Integer> map(Integer value) throws Exception {
-					return new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), value);
-				}
-			}).split(new OutputSelector<Tuple2<Integer, Integer>>() {
-				@Override
-				public Iterable<String> select(Tuple2<Integer, Integer> value) {
-					List<String> output = new ArrayList<>();
-
-					output.add(value.f0 + "");
-
-					return output;
-				}
-			});
-
-		splittedResult.select("0").map(new MapFunction<Tuple2<Integer,Integer>, Integer>() {
-			@Override
-			public Integer map(Tuple2<Integer, Integer> value) throws Exception {
-				return value.f1;
-			}
-		}).writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
-		splittedResult.select("1").map(new MapFunction<Tuple2<Integer, Integer>, Integer>() {
-			@Override
-			public Integer map(Tuple2<Integer, Integer> value) throws Exception {
-				return value.f1;
-			}
-		}).writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
-
-		StringBuilder builder1 = new StringBuilder();
-		StringBuilder builder2 = new StringBuilder();
-		int counter1 = 0;
-		int counter2 = 0;
-
-		for (int i = 0; i < numElements; i++) {
-			if (i % 2 == 0) {
-				counter1 += i;
-				builder1.append(counter1 + "\n");
-			} else {
-				counter2 += i;
-				builder2.append(counter2 + "\n");
-			}
-		}
-
-		expected1 = builder1.toString();
-		expected2 = builder2.toString();
-
-		env.execute();
-	}
-
-	/**
-	 * Tests whether the fold operation can also be called with non Java serializable types.
-	 */
-	@Test
-	public void testFoldOperationWithNonJavaSerializableType() throws Exception {
-		final int numElements = 10;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
-
-		input
-			.keyBy(0)
-			.fold(
-				new NonSerializable(42),
-				new FoldFunction<Tuple2<Integer, NonSerializable>, NonSerializable>() {
-					@Override
-					public NonSerializable fold(NonSerializable accumulator, Tuple2<Integer, NonSerializable> value) throws Exception {
-						return new NonSerializable(accumulator.value + value.f1.value);
-					}
-			})
-			.map(new MapFunction<NonSerializable, Integer>() {
-				@Override
-				public Integer map(NonSerializable value) throws Exception {
-					return value.value;
-				}
-			})
-			.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
-		StringBuilder builder = new StringBuilder();
-
-		for (int i = 0; i < numElements; i++) {
-			builder.append(42 + i + "\n");
-		}
-
-		expected1 = builder.toString();
-
-		env.execute();
-	}
-
-	private static class NonSerializable {
-		// This makes the type non-serializable
-		private final Object obj = new Object();
-
-		private final int value;
-
-		public NonSerializable(int value) {
-			this.value = value;
-		}
-	}
-
-	private static class NonSerializableTupleSource implements SourceFunction<Tuple2<Integer, NonSerializable>> {
-		private final int numElements;
-
-		public NonSerializableTupleSource(int numElements) {
-			this.numElements = numElements;
-		}
-
-
-		@Override
-		public void run(SourceContext<Tuple2<Integer, NonSerializable>> ctx) throws Exception {
-			for (int i = 0; i < numElements; i++) {
-				ctx.collect(new Tuple2<Integer, NonSerializable>(i, new NonSerializable(i)));
-			}
-		}
-
-		@Override
-		public void cancel() {}
-	}
-
-	private static class TupleSource implements SourceFunction<Tuple2<Integer, Integer>> {
-
-		private final int numElements;
-		private final int numKeys;
-
-		public TupleSource(int numElements, int numKeys) {
-			this.numElements = numElements;
-			this.numKeys = numKeys;
-		}
-
-		@Override
-		public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-			for (int i = 0; i < numElements; i++) {
-				Tuple2<Integer, Integer> result = new Tuple2<>(i % numKeys, i);
-				ctx.collect(result);
-			}
-		}
-
-		@Override
-		public void cancel() {
-
-		}
-	}
-}