You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/08/19 18:42:11 UTC

[2/6] flink git commit: [FLINK-2398][api-breaking] Introduce StreamGraphGenerator

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 0fad3dd..285ee57 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -1,27 +1,22 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 
 package org.apache.flink.streaming.api;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -31,146 +26,235 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamLoop;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.util.EvenOddOutputSelector;
+import org.apache.flink.streaming.util.NoOpIntMap;
+import org.apache.flink.streaming.util.ReceiveCheckNoOpSink;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 @SuppressWarnings({ "unchecked", "unused", "serial" })
 public class IterateTest extends StreamingMultipleProgramsTestBase {
 
-	private static final long MEMORYSIZE = 32;
 	private static boolean iterated[];
-	private static int PARALLELISM = 2;
 
-	@Test
-	public void testException() throws Exception {
+	@Test(expected = UnsupportedOperationException.class)
+	public void testIncorrectParallelism() throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
 		DataStream<Integer> source = env.fromElements(1, 10);
+
+		IterativeDataStream<Integer> iter1 = source.iterate();
+		SingleOutputStreamOperator<Integer, ?> map1 = iter1.map(NoOpIntMap);
+		iter1.closeWith(map1).print();
+	}
+
+	@Test
+	public void testDoubleClosing() throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// introduce dummy mapper to get to correct parallelism
+		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
 		IterativeDataStream<Integer> iter1 = source.iterate();
-		IterativeDataStream<Integer> iter2 = source.iterate();
 
 		iter1.closeWith(iter1.map(NoOpIntMap));
-		// Check for double closing
-		try {
-			iter1.closeWith(iter1.map(NoOpIntMap));
-			fail();
-		} catch (Exception e) {
-		}
+		iter1.closeWith(iter1.map(NoOpIntMap));
+	}
 
-		// Check for closing iteration without head
-		try {
-			iter2.closeWith(iter1.map(NoOpIntMap));
-			fail();
-		} catch (Exception e) {
-		}
 
-		iter2.map(NoOpIntMap);
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDifferingParallelism() throws Exception {
 
-		// Check for executing with empty iteration
-		try {
-			env.execute();
-			fail();
-		} catch (Exception e) {
-		}
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// introduce dummy mapper to get to correct parallelism
+		DataStream<Integer> source = env.fromElements(1, 10)
+				.map(NoOpIntMap);
+
+		IterativeDataStream<Integer> iter1 = source.iterate();
+
+
+		iter1.closeWith(iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2));
+
+	}
+
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testCoDifferingParallelism() throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// introduce dummy mapper to get to correct parallelism
+		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
+		ConnectedIterativeDataStream<Integer, Integer> coIter = source.iterate().withFeedbackType(
+				Integer.class);
+
+
+		coIter.closeWith(coIter.map(NoOpIntCoMap).setParallelism(DEFAULT_PARALLELISM / 2));
+
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClosingFromOutOfLoop() throws Exception {
+
+		// this test verifies that we cannot close an iteration with a DataStream that does not
+		// have the iteration in its predecessors
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// introduce dummy mapper to get to correct parallelism
+		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
+		IterativeDataStream<Integer> iter1 = source.iterate();
+		IterativeDataStream<Integer> iter2 = source.iterate();
+
+
+		iter2.closeWith(iter1.map(NoOpIntMap));
+
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testCoIterClosingFromOutOfLoop() throws Exception {
+
+		// this test verifies that we cannot close an iteration with a DataStream that does not
+		// have the iteration in its predecessors
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// introduce dummy mapper to get to correct parallelism
+		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
+		IterativeDataStream<Integer> iter1 = source.iterate();
+		ConnectedIterativeDataStream<Integer, Integer> coIter = source.iterate().withFeedbackType(
+				Integer.class);
+
+
+		coIter.closeWith(iter1.map(NoOpIntMap));
+
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testExecutionWithEmptyIteration() throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
+		IterativeDataStream<Integer> iter1 = source.iterate();
+
+		iter1.map(NoOpIntMap).print();
+
+		env.execute();
 	}
 
 	@Test
 	public void testImmutabilityWithCoiteration() {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
-		DataStream<Integer> source = env.fromElements(1, 10);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap); // for rebalance
 
 		IterativeDataStream<Integer> iter1 = source.iterate();
 		// Calling withFeedbackType should create a new iteration
 		ConnectedIterativeDataStream<Integer, String> iter2 = iter1.withFeedbackType(String.class);
 
-		iter1.closeWith(iter1.map(NoOpIntMap));
-		iter2.closeWith(iter2.map(NoOpCoMap));
+		iter1.closeWith(iter1.map(NoOpIntMap)).print();
+		iter2.closeWith(iter2.map(NoOpCoMap)).print();
 
 		StreamGraph graph = env.getStreamGraph();
 
-		graph.getJobGraph();
+		assertEquals(2, graph.getIterationSourceSinkPairs().size());
 
-		assertEquals(2, graph.getStreamLoops().size());
-		for (StreamLoop loop : graph.getStreamLoops()) {
-			assertEquals(loop.getHeads(), loop.getTails());
-			List<Tuple2<StreamNode, StreamNode>> sourceSinkPairs = loop.getSourceSinkPairs();
-			assertEquals(1, sourceSinkPairs.size());
+		for (Tuple2<StreamNode, StreamNode> sourceSinkPair: graph.getIterationSourceSinkPairs()) {
+			assertEquals(sourceSinkPair.f0.getOutEdges().get(0).getTargetVertex(), sourceSinkPair.f1.getInEdges().get(0).getSourceVertex());
 		}
 	}
 
 	@Test
 	public void testmultipleHeadsTailsSimple() {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
-		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5).shuffle();
-		DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
+		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5)
+				.shuffle()
+				.map(NoOpIntMap).name("ParallelizeMapShuffle");
+		DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
+				.map(NoOpIntMap).name("ParallelizeMapRebalance");
 
-		DataStream<Integer> head1 = iter1.map(NoOpIntMap);
-		DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(2);
-		DataStream<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(2)
-				.addSink(new NoOpSink<Integer>());
-		DataStream<Integer> head4 = iter1.map(NoOpIntMap).addSink(new NoOpSink<Integer>());
+		IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
 
-		SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5).split(
-				new OutputSelector<Integer>() {
+		DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("IterRebalanceMap").setParallelism(DEFAULT_PARALLELISM / 2);
+		DataStream<Integer> head2 = iter1.map(NoOpIntMap).name("IterForwardMap");
+		DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).addSink(new ReceiveCheckNoOpSink<Integer>());
+		DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
 
-					@Override
-					public Iterable<String> select(Integer value) {
-						return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd");
-					}
-				});
+		SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
+				.map(NoOpIntMap).name("EvenOddSourceMap")
+				.split(new EvenOddOutputSelector());
 
 		iter1.closeWith(source3.select("even").union(
-				head1.map(NoOpIntMap).broadcast().setParallelism(1), head2.shuffle()));
+				head1.rebalance().map(NoOpIntMap).broadcast(), head2.shuffle()));
 
 		StreamGraph graph = env.getStreamGraph();
 
 		JobGraph jg = graph.getJobGraph();
 
-		assertEquals(1, graph.getStreamLoops().size());
-		StreamLoop loop = new ArrayList<StreamLoop>(graph.getStreamLoops()).get(0);
+		assertEquals(1, graph.getIterationSourceSinkPairs().size());
 
-		assertEquals(4, loop.getHeads().size());
-		assertEquals(3, loop.getTails().size());
+		Tuple2<StreamNode, StreamNode> sourceSinkPair = graph.getIterationSourceSinkPairs().iterator().next();
+		StreamNode itSource = sourceSinkPair.f0;
+		StreamNode itSink = sourceSinkPair.f1;
 
-		assertEquals(1, loop.getSourceSinkPairs().size());
-		Tuple2<StreamNode, StreamNode> pair = loop.getSourceSinkPairs().get(0);
+		assertEquals(4, itSource.getOutEdges().size());
+		assertEquals(3, itSink.getInEdges().size());
 
-		assertEquals(pair.f0.getParallelism(), pair.f1.getParallelism());
-		assertEquals(4, pair.f0.getOutEdges().size());
-		assertEquals(3, pair.f1.getInEdges().size());
+		assertEquals(itSource.getParallelism(), itSink.getParallelism());
 
-		for (StreamEdge edge : pair.f0.getOutEdges()) {
-			assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
-		}
-		for (StreamEdge edge : pair.f1.getInEdges()) {
-			assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+		for (StreamEdge edge : itSource.getOutEdges()) {
+			if (edge.getTargetVertex().getOperatorName().equals("IterRebalanceMap")) {
+				assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+			} else if (edge.getTargetVertex().getOperatorName().equals("IterForwardMap")) {
+				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+			}
 		}
+		for (StreamEdge edge : itSink.getInEdges()) {
+			if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("ParallelizeMapShuffle")) {
+				assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
+			}
 
-		assertTrue(loop.getTailSelectedNames().contains(Arrays.asList("even")));
+			if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("ParallelizeMapForward")) {
+				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+			}
+
+			if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("EvenOddSourceMap")) {
+				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+				assertTrue(edge.getSelectedNames().contains("even"));
+			}
+		}
 
 		// Test co-location
 
@@ -193,142 +277,106 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 
 	@Test
 	public void testmultipleHeadsTailsWithTailPartitioning() {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
-		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5).shuffle();
-		DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
+		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5)
+				.shuffle()
+				.map(NoOpIntMap);
 
-		DataStream<Integer> head1 = iter1.map(NoOpIntMap);
-		DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(2).name("shuffle");
-		DataStream<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(2)
-				.addSink(new NoOpSink<Integer>());
-		DataStream<Integer> head4 = iter1.map(NoOpIntMap).addSink(new NoOpSink<Integer>());
+		DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
+				.map(NoOpIntMap);
 
-		SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5).name("split")
-				.split(new OutputSelector<Integer>() {
+		IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
 
-					@Override
-					public Iterable<String> select(Integer value) {
-						return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd");
-					}
-				});
+		DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("map1");
+		DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).rebalance().name(
+				"shuffle");
+		DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2)
+				.addSink(new ReceiveCheckNoOpSink<Integer>());
+		DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
+
+		SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
+				.map(NoOpIntMap)
+				.name("split")
+				.split(new EvenOddOutputSelector());
 
 		iter1.closeWith(
 				source3.select("even").union(
-						head1.map(NoOpIntMap).broadcast().setParallelism(1).name("bc"),
-						head2.shuffle()), true);
+						head1.map(NoOpIntMap).broadcast().name("bc"),
+						head2.map(NoOpIntMap).shuffle()));
 
 		StreamGraph graph = env.getStreamGraph();
 
 		JobGraph jg = graph.getJobGraph();
 
-		assertEquals(1, graph.getStreamLoops().size());
+		assertEquals(1, graph.getIterationSourceSinkPairs().size());
 
-		StreamLoop loop = new ArrayList<StreamLoop>(graph.getStreamLoops()).get(0);
+		Tuple2<StreamNode, StreamNode> sourceSinkPair = graph.getIterationSourceSinkPairs().iterator().next();
+		StreamNode itSource = sourceSinkPair.f0;
+		StreamNode itSink = sourceSinkPair.f1;
 
-		assertEquals(4, loop.getHeads().size());
-		assertEquals(3, loop.getTails().size());
+		assertEquals(4, itSource.getOutEdges().size());
+		assertEquals(3, itSink.getInEdges().size());
 
-		assertEquals(2, loop.getSourceSinkPairs().size());
-		List<Tuple2<StreamNode, StreamNode>> pairs = loop.getSourceSinkPairs();
-		Tuple2<StreamNode, StreamNode> pair1 = pairs.get(0).f0.getParallelism() == 2 ? pairs.get(0)
-				: pairs.get(1);
-		Tuple2<StreamNode, StreamNode> pair2 = pairs.get(0).f0.getParallelism() == 4 ? pairs.get(0)
-				: pairs.get(1);
 
-		assertEquals(pair1.f0.getParallelism(), pair1.f1.getParallelism());
-		assertEquals(2, pair1.f0.getParallelism());
-		assertEquals(2, pair1.f0.getOutEdges().size());
-		assertEquals(3, pair1.f1.getInEdges().size());
+		assertEquals(itSource.getParallelism(), itSink.getParallelism());
 
-		for (StreamEdge edge : pair1.f0.getOutEdges()) {
-			assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
-			assertEquals(2, edge.getTargetVertex().getParallelism());
-		}
-		for (StreamEdge edge : pair1.f1.getInEdges()) {
-			String tailName = edge.getSourceVertex().getOperatorName();
-			if (tailName.equals("split")) {
+		for (StreamEdge edge : itSource.getOutEdges()) {
+			if (edge.getTargetVertex().getOperatorName().equals("map1")) {
+				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+				assertEquals(4, edge.getTargetVertex().getParallelism());
+			} else if (edge.getTargetVertex().getOperatorName().equals("shuffle")) {
 				assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
-			} else if (tailName.equals("bc")) {
-				assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner);
-			} else if (tailName.equals("shuffle")) {
-				assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
+				assertEquals(2, edge.getTargetVertex().getParallelism());
 			}
-
 		}
-
-		assertEquals(pair2.f0.getParallelism(), pair2.f1.getParallelism());
-		assertEquals(4, pair2.f0.getParallelism());
-		assertEquals(2, pair2.f0.getOutEdges().size());
-		assertEquals(3, pair2.f1.getInEdges().size());
-
-		for (StreamEdge edge : pair2.f0.getOutEdges()) {
-			assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
-			assertEquals(4, edge.getTargetVertex().getParallelism());
-		}
-		for (StreamEdge edge : pair2.f1.getInEdges()) {
+		for (StreamEdge edge : itSink.getInEdges()) {
 			String tailName = edge.getSourceVertex().getOperatorName();
 			if (tailName.equals("split")) {
-				assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+				assertTrue(edge.getSelectedNames().contains("even"));
 			} else if (tailName.equals("bc")) {
 				assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner);
 			} else if (tailName.equals("shuffle")) {
 				assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
 			}
-
 		}
 
-		assertTrue(loop.getTailSelectedNames().contains(Arrays.asList("even")));
-
 		// Test co-location
 
 		JobVertex itSource1 = null;
-		JobVertex itSource2 = null;
 		JobVertex itSink1 = null;
-		JobVertex itSink2 = null;
 
 		for (JobVertex vertex : jg.getVertices()) {
 			if (vertex.getName().contains("IterationSource")) {
-				if (vertex.getName().contains("_0")) {
-					itSource1 = vertex;
-				} else if (vertex.getName().contains("_1")) {
-					itSource2 = vertex;
-				}
+				itSource1 = vertex;
 			} else if (vertex.getName().contains("IterationSink")) {
-				if (vertex.getName().contains("_0")) {
-					itSink1 = vertex;
-				} else if (vertex.getName().contains("_1")) {
-					itSink2 = vertex;
-				}
+				itSink1 = vertex;
 			}
 		}
 
 		assertTrue(itSource1.getCoLocationGroup() != null);
-		assertTrue(itSource2.getCoLocationGroup() != null);
+		assertTrue(itSink1.getCoLocationGroup() != null);
 
 		assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup());
-		assertEquals(itSource2.getCoLocationGroup(), itSink2.getCoLocationGroup());
-		assertNotEquals(itSource1.getCoLocationGroup(), itSource2.getCoLocationGroup());
 	}
 
 	@SuppressWarnings("rawtypes")
 	@Test
 	public void testSimpleIteration() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(PARALLELISM);
-		iterated = new boolean[PARALLELISM];
+		iterated = new boolean[DEFAULT_PARALLELISM];
 
-		DataStream<Boolean> source = env
-				.fromCollection(Collections.nCopies(PARALLELISM * 2, false));
+		DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
+				.map(NoOpBoolMap).name("ParallelizeMap");
 
 		IterativeDataStream<Boolean> iteration = source.iterate(3000);
 
 		DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap);
 
-		iteration.map(NoOpBoolMap).addSink(new NoOpSink());
+		iteration.map(NoOpBoolMap).addSink(new ReceiveCheckNoOpSink());
 
-		iteration.closeWith(increment).addSink(new NoOpSink());
+		iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink());
 
 		env.execute();
 
@@ -343,7 +391,13 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(2);
 
-		ConnectedIterativeDataStream<Integer, String> coIt = env.fromElements(0, 0).iterate(2000)
+		DataStream<String> otherSource = env.fromElements("1000", "2000")
+				.map(NoOpStrMap).name("ParallelizeMap");
+
+
+		ConnectedIterativeDataStream<Integer, String> coIt = env.fromElements(0, 0)
+				.map(NoOpIntMap).name("ParallelizeMap")
+				.iterate(2000)
 				.withFeedbackType("String");
 
 		try {
@@ -392,25 +446,24 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 			public String map2(String value) throws Exception {
 				return value;
 			}
-		}).setParallelism(1).addSink(new NoOpSink<String>());
+		}).addSink(new ReceiveCheckNoOpSink<String>());
 
-		coIt.closeWith(head.broadcast().union(env.fromElements("1000", "2000").rebalance()));
+		coIt.closeWith(head.broadcast().union(otherSource));
 
 		head.addSink(new TestSink()).setParallelism(1);
 
+		assertEquals(1, env.getStreamGraph().getIterationSourceSinkPairs().size());
+
 		env.execute();
 
 		Collections.sort(TestSink.collected);
 		assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected);
-		assertEquals(2, new ArrayList<StreamLoop>(env.getStreamGraph().getStreamLoops()).get(0)
-				.getSourceSinkPairs().size());
-
 	}
 
 	@Test
 	public void testGroupByFeedback() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(3);
+		env.setParallelism(DEFAULT_PARALLELISM - 1);
 
 		KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() {
 
@@ -420,7 +473,8 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 			}
 		};
 
-		DataStream<Integer> source = env.fromElements(1, 2, 3);
+		DataStream<Integer> source = env.fromElements(1, 2, 3)
+				.map(NoOpIntMap).name("ParallelizeMap");
 
 		IterativeDataStream<Integer> it = source.groupBy(key).iterate(3000);
 
@@ -448,8 +502,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 			}
 		});
 
-		it.closeWith(head.groupBy(key).union(head.map(NoOpIntMap).setParallelism(2).groupBy(key)),
-				true).addSink(new NoOpSink<Integer>());
+		it.closeWith(head.groupBy(key).union(head.map(NoOpIntMap).groupBy(key))).addSink(new ReceiveCheckNoOpSink<Integer>());
 
 		env.execute();
 	}
@@ -457,15 +510,17 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 	@SuppressWarnings("deprecation")
 	@Test
 	public void testWithCheckPointing() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
 		env.enableCheckpointing();
 
-		DataStream<Boolean> source = env
-				.fromCollection(Collections.nCopies(PARALLELISM * 2, false));
+		DataStream<Boolean> source = env .fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
+				.map(NoOpBoolMap).name("ParallelizeMap");
+
 
 		IterativeDataStream<Boolean> iteration = source.iterate(3000);
 
-		iteration.closeWith(iteration.flatMap(new IterationHead()));
+		iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>());
 
 		try {
 			env.execute();
@@ -503,22 +558,6 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
-	public static final class NoOpSink<T> extends RichSinkFunction<T> {
-		private List<T> received;
-
-		public void invoke(T tuple) {
-			received.add(tuple);
-		}
-
-		public void open(Configuration conf) {
-			received = new ArrayList<T>();
-		}
-
-		public void close() {
-			assertTrue(received.size() > 0);
-		}
-	}
-
 	public static CoMapFunction<Integer, String, String> NoOpCoMap = new CoMapFunction<Integer, String, String>() {
 
 		public String map1(Integer value) throws Exception {
@@ -530,9 +569,23 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		}
 	};
 
-	public static MapFunction<Integer, Integer> NoOpIntMap = new MapFunction<Integer, Integer>() {
+	public static MapFunction<Integer, Integer> NoOpIntMap = new NoOpIntMap();
+
+	public static MapFunction<String, String> NoOpStrMap = new MapFunction<String, String>() {
+
+		public String map(String value) throws Exception {
+			return value;
+		}
+
+	};
+
+	public static CoMapFunction<Integer, Integer, Integer> NoOpIntCoMap = new CoMapFunction<Integer, Integer, Integer>() {
+
+		public Integer map1(Integer value) throws Exception {
+			return value;
+		}
 
-		public Integer map(Integer value) throws Exception {
+		public Integer map2(Integer value) throws Exception {
 			return value;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
index fc78d27..8525d37 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -26,13 +26,12 @@ import java.util.List;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestListResultSink;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
-public class OutputSplitterTest {
-
-	private static final long MEMORYSIZE = 32;
+public class OutputSplitterTest extends StreamingMultipleProgramsTestBase {
 
 	private static ArrayList<Integer> expectedSplitterResult = new ArrayList<Integer>();
 
@@ -42,7 +41,8 @@ public class OutputSplitterTest {
 		TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
 		TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
 
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
 		env.setBufferTimeout(1);
 
 		DataStream<Integer> d1 = env.fromElements(0, 2, 4, 6, 8);
@@ -95,7 +95,8 @@ public class OutputSplitterTest {
 		TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
 		TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
 
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
 		env.setBufferTimeout(1);
 
 		DataStream<Integer> ds = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
index c858834..987a8fb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
@@ -25,14 +25,18 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.NoOpIntMap;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestListResultSink;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
@@ -40,10 +44,33 @@ import org.junit.Test;
 /**
  * IT case that tests the different stream partitioning schemes.
  */
-public class PartitionerTest {
+public class PartitionerTest extends StreamingMultipleProgramsTestBase {
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testForwardFailsLowToHighParallelism() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> src = env.fromElements(1, 2, 3);
+
+		// this doesn't work because it goes from 1 to 3
+		src.forward().map(new NoOpIntMap());
+
+		env.execute();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testForwardFailsHightToLowParallelism() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// this does a rebalance that works
+		DataStream<Integer> src = env.fromElements(1, 2, 3).map(new NoOpIntMap());
+
+		// this doesn't work because it goes from 3 to 1
+		src.forward().map(new NoOpIntMap()).setParallelism(1);
+
+		env.execute();
+	}
 
-	public static final int PARALLELISM = 3;
-	public static final int MEMORY_SIZE = 32;
 
 	@Test
 	public void partitionerTest() {
@@ -62,7 +89,9 @@ public class PartitionerTest {
 				new TestListResultSink<Tuple2<Integer, String>>();
 
 
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORY_SIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(3);
+
 		DataStream<Tuple1<String>> src = env.fromElements(
 				new Tuple1<String>("a"),
 				new Tuple1<String>("b"),
@@ -98,12 +127,21 @@ public class PartitionerTest {
 		// partition broadcast
 		src.broadcast().map(new SubtaskIndexAssigner()).addSink(broadcastPartitionResultSink);
 
-		// partition forward
-		src.map(new SubtaskIndexAssigner()).addSink(forwardPartitionResultSink);
-
 		// partition rebalance
 		src.rebalance().map(new SubtaskIndexAssigner()).addSink(rebalancePartitionResultSink);
 
+		// partition forward
+		src.map(new MapFunction<Tuple1<String>, Tuple1<String>>() {
+			private static final long serialVersionUID = 1L;
+			@Override
+			public Tuple1<String> map(Tuple1<String> value) throws Exception {
+				return value;
+			}
+		})
+				.forward()
+				.map(new SubtaskIndexAssigner())
+				.addSink(forwardPartitionResultSink);
+
 		// partition global
 		src.global().map(new SubtaskIndexAssigner()).addSink(globalPartitionResultSink);
 
@@ -209,8 +247,8 @@ public class PartitionerTest {
 				new HashSet<Tuple2<Integer, String>>(globalPartitionResult));
 	}
 
-	private static class SubtaskIndexAssigner
-			extends RichMapFunction<Tuple1<String>, Tuple2<Integer, String>> {
+	private static class SubtaskIndexAssigner extends RichMapFunction<Tuple1<String>, Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
 
 		private int indexOfSubtask;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index e2fe599..606259e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -37,24 +37,23 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.util.NoOpSink;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.SplittableIterator;
 import org.junit.Test;
 
-public class StreamExecutionEnvironmentTest {
-
-	private static final long MEMORYSIZE = 32;
-	private static int PARALLELISM = 4;
+public class StreamExecutionEnvironmentTest extends StreamingMultipleProgramsTestBase {
 
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testFromCollectionParallelism() {
 		try {
 			TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
-			StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 			DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-			
+
 			try {
 				dataStream1.setParallelism(4);
 				fail("should throw an exception");
@@ -62,15 +61,20 @@ public class StreamExecutionEnvironmentTest {
 			catch (IllegalArgumentException e) {
 				// expected
 			}
+
+			dataStream1.addSink(new NoOpSink<Integer>());
 	
-			env.fromParallelCollection(new DummySplittableIterator<Integer>(), typeInfo).setParallelism(4);
-	
+			DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(),
+					typeInfo).setParallelism(4);
+
+			dataStream2.addSink(new NoOpSink<Integer>());
+
 			String plan = env.getExecutionPlan();
-			
-			assertTrue("Parallelism for dataStream1 is not right.",
-					plan.contains("\"contents\":\"Collection Source\",\"parallelism\":1"));
-			assertTrue("Parallelism for dataStream2 is not right.",
-					plan.contains("\"contents\":\"Parallel Collection Source\",\"parallelism\":4"));
+
+			assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
+			assertEquals("Parallelism of parallel collection source must be 4.",
+					4,
+					env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -80,7 +84,7 @@ public class StreamExecutionEnvironmentTest {
 
 	@Test
 	public void testSources() {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
 			private static final long serialVersionUID = 1L;
@@ -94,6 +98,7 @@ public class StreamExecutionEnvironmentTest {
 			}
 		};
 		DataStreamSource<Integer> src1 = env.addSource(srcFun);
+		src1.addSink(new NoOpSink<Integer>());
 		assertEquals(srcFun, getFunctionFromDataSource(src1));
 
 		List<Long> list = Arrays.asList(0L, 1L, 2L);
@@ -120,6 +125,7 @@ public class StreamExecutionEnvironmentTest {
 	}
 
 	private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
+		dataStreamSource.addSink(new NoOpSink<T>());
 		AbstractUdfStreamOperator<?, ?> operator =
 				(AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource);
 		return (SourceFunction<T>) operator.getUserFunction();

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index 7d08709..0989128 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -33,11 +33,12 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
 import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
 @SuppressWarnings("serial")
-public class TypeFillTest {
+public class TypeFillTest extends StreamingMultipleProgramsTestBase {
 
 	@Test
 	public void test() {

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index cccec40..508f1a2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -30,15 +30,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestListResultSink;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
-public class WindowCrossJoinTest implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final long MEMORYSIZE = 32;
+public class WindowCrossJoinTest extends StreamingMultipleProgramsTestBase {
 
 	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
 	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
@@ -55,7 +52,8 @@ public class WindowCrossJoinTest implements Serializable {
 
 	@Test
 	public void test() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
 		env.setBufferTimeout(1);
 
 		TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> joinResultSink =

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index fc3e36f..b7df2ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -30,11 +30,11 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
-public class DirectedOutputTest {
+public class DirectedOutputTest extends StreamingMultipleProgramsTestBase {
 
 	private static final String TEN = "ten";
 	private static final String ODD = "odd";
@@ -94,7 +94,8 @@ public class DirectedOutputTest {
 
 	@Test
 	public void outputSelectorTest() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(3, 32);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(3);
 
 		TestListResultSink<Long> evenSink = new TestListResultSink<Long>();
 		TestListResultSink<Long> oddAndTenSink = new TestListResultSink<Long>();

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 46e4327..f2c253c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.complex;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -44,7 +43,6 @@ import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.After;
 import org.junit.Before;
@@ -131,7 +129,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 			}
 			
 		})
-				
 				.setParallelism(1).filter(new FilterFunction
 				<Tuple2<Long, Tuple2<String, Long>>>() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
index 767eaa4..39a13b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
@@ -25,14 +25,15 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.junit.Test;
 
-public class SlotAllocationTest {
+public class SlotAllocationTest extends StreamingMultipleProgramsTestBase{
 
 	@SuppressWarnings("serial")
 	@Test
 	public void test() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		FilterFunction<Long> dummyFilter = new FilterFunction<Long>() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
new file mode 100644
index 0000000..fb2ef56
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.util.EvenOddOutputSelector;
+import org.apache.flink.streaming.util.NoOpIntMap;
+import org.apache.flink.streaming.util.NoOpSink;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link StreamGraphGenerator}. This only tests correct translation of split/select,
+ * union, partitioning since the other translation routines are tested already in operation
+ * specific tests, for example in {@link org.apache.flink.streaming.api.IterateTest} for
+ * iterations.
+ */
+public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase {
+
+	/**
+	 * This tests whether virtual Transformations behave correctly.
+	 *
+	 * <p>
+	 * Verifies that partitioning, output selector, selected names are correctly set in the
+	 * StreamGraph when they are intermixed.
+	 */
+	@Test
+	public void testVirtualTransformations() throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> source = env.fromElements(1, 10);
+
+		DataStream<Integer> rebalanceMap = source.rebalance().map(new NoOpIntMap());
+
+		// verify that only the partitioning that was set last is used
+		DataStream<Integer> broadcastMap = rebalanceMap
+				.forward()
+				.global()
+				.broadcast()
+				.map(new NoOpIntMap());
+
+		broadcastMap.addSink(new NoOpSink<Integer>());
+
+		// verify that partitioning is preserved across union and split/select
+		EvenOddOutputSelector selector1 = new EvenOddOutputSelector();
+		EvenOddOutputSelector selector2 = new EvenOddOutputSelector();
+		EvenOddOutputSelector selector3 = new EvenOddOutputSelector();
+
+		DataStream<Integer> map1Operator = rebalanceMap
+				.map(new NoOpIntMap());
+
+		DataStream<Integer> map1 = map1Operator
+				.broadcast()
+				.split(selector1)
+				.select("even");
+
+		DataStream<Integer> map2Operator = rebalanceMap
+				.map(new NoOpIntMap());
+
+		DataStream<Integer> map2 = map2Operator
+				.split(selector2)
+				.select("odd")
+				.global();
+
+		DataStream<Integer> map3Operator = rebalanceMap
+				.map(new NoOpIntMap());
+
+		DataStream<Integer> map3 = map3Operator
+				.global()
+				.split(selector3)
+				.select("even")
+				.shuffle();
+
+
+		SingleOutputStreamOperator<Integer, ?> unionedMap = map1.union(map2).union(map3)
+				.map(new NoOpIntMap());
+
+		unionedMap.addSink(new NoOpSink<Integer>());
+
+		StreamGraph graph = env.getStreamGraph();
+
+		// rebalanceMap
+		assertTrue(graph.getStreamNode(rebalanceMap.getId()).getInEdges().get(0).getPartitioner() instanceof RebalancePartitioner);
+
+		// verify that only last partitioning takes precedence
+		assertTrue(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
+		assertEquals(rebalanceMap.getId(), graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getSourceVertex().getId());
+
+		// verify that partitioning in unions is preserved and that it works across split/select
+		assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
+		assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
+		assertTrue(graph.getStreamNode(map1Operator.getId()).getOutputSelectors().contains(selector1));
+
+		assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof GlobalPartitioner);
+		assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("odd"));
+		assertTrue(graph.getStreamNode(map2Operator.getId()).getOutputSelectors().contains(selector2));
+
+		assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof ShufflePartitioner);
+		assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
+		assertTrue(graph.getStreamNode(map3Operator.getId()).getOutputSelectors().contains(selector3));
+	}
+
+	/**
+	 * This tests whether virtual Transformations behave correctly.
+	 *
+	 * Checks whether output selector, partitioning works correctly when applied on a union.
+	 */
+	@Test
+	public void testVirtualTransformations2() throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> source = env.fromElements(1, 10);
+
+		DataStream<Integer> rebalanceMap = source.rebalance().map(new NoOpIntMap());
+
+		DataStream<Integer> map1 = rebalanceMap
+				.map(new NoOpIntMap());
+
+		DataStream<Integer> map2 = rebalanceMap
+				.map(new NoOpIntMap());
+
+		DataStream<Integer> map3 = rebalanceMap
+				.map(new NoOpIntMap());
+
+		EvenOddOutputSelector selector = new EvenOddOutputSelector();
+
+		SingleOutputStreamOperator<Integer, ?> unionedMap = map1.union(map2).union(map3)
+				.broadcast()
+				.split(selector)
+				.select("foo")
+				.map(new NoOpIntMap());
+
+		unionedMap.addSink(new NoOpSink<Integer>());
+
+		StreamGraph graph = env.getStreamGraph();
+
+		// verify that the properties are correctly set on all input operators
+		assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
+		assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
+		assertTrue(graph.getStreamNode(map1.getId()).getOutputSelectors().contains(selector));
+
+		assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
+		assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
+		assertTrue(graph.getStreamNode(map2.getId()).getOutputSelectors().contains(selector));
+
+		assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
+		assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
+		assertTrue(graph.getStreamNode(map3.getId()).getOutputSelectors().contains(selector));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 4e7c963..e806428 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -21,6 +21,8 @@ import java.util.Random;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
@@ -28,7 +30,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StreamingJobGraphGeneratorTest {
+public class StreamingJobGraphGeneratorTest extends StreamingMultipleProgramsTestBase {
 	private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGeneratorTest.class);
 	
 	@Test
@@ -36,8 +38,9 @@ public class StreamingJobGraphGeneratorTest {
 		final long seed = System.currentTimeMillis();
 		LOG.info("Test seed: {}", new Long(seed));
 		final Random r = new Random(seed);
-		
-		TestStreamEnvironment env = new TestStreamEnvironment(4, 32);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
 		StreamGraph streamingJob = new StreamGraph(env);
 		StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
index e8f0a03..bb9dad7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
@@ -52,8 +53,7 @@ import org.junit.Test;
  *     <li>Watermarks are correctly forwarded</li>
  * </ul>
  */
-public class StreamProjectTest implements Serializable {
-	private static final long serialVersionUID = 1L;
+public class StreamProjectTest extends StreamingMultipleProgramsTestBase {
 
 	@Test
 	public void testProject() throws Exception {
@@ -95,7 +95,6 @@ public class StreamProjectTest implements Serializable {
 
 
 	// tests using projection from the API without explicitly specifying the types
-	private static final long MEMORY_SIZE = 32;
 	private static HashSet<Tuple2<Long, Double>> expected = new HashSet<Tuple2<Long, Double>>();
 	private static HashSet<Tuple2<Long, Double>> actual = new HashSet<Tuple2<Long, Double>>();
 
@@ -106,7 +105,8 @@ public class StreamProjectTest implements Serializable {
 			expected.add(new Tuple2<Long, Double>(i, i.doubleValue()));
 		}
 
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
 
 		env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
 			private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
index b8e9619..4c644a9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
@@ -30,10 +30,12 @@ import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.NoOpSink;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -169,21 +171,4 @@ public class CoStreamFlatMapTest implements Serializable {
 		}
 
 	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void multipleInputTest() {
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
-		DataStream<Integer> ds2 = env.fromElements(2, 4).union(ds1);
-		
-		try {
-			ds1.forward().union(ds2);
-			fail();
-		} catch (RuntimeException e) {
-			// expected
-		}
-		
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
index 5986a30..512a0df 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
@@ -31,19 +31,17 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestListResultSink;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class SelfConnectionTest implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private final int MEMORY_SIZE = 32;
+public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
 
 	private static List<String> expected;
 
@@ -51,20 +49,12 @@ public class SelfConnectionTest implements Serializable {
 	@Test
 	public void sameDataStreamTest() {
 
-		StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(3);
 
 		TestListResultSink<String> resultSink = new TestListResultSink<String>();
 
-		Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-
-		};
+		Timestamp<Integer> timeStamp = new IntegerTimestamp();
 
 		KeySelector keySelector = new KeySelector<Integer, Integer>() {
 
@@ -79,7 +69,7 @@ public class SelfConnectionTest implements Serializable {
 		DataStream<Integer> src = env.fromElements(1, 3, 5);
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple2<Integer, Integer>> dataStream =
+		DataStreamSink<Tuple2<Integer, Integer>> dataStream =
 				src.join(src).onWindow(50L, timeStamp, timeStamp).where(keySelector).equalTo(keySelector)
 						.map(new MapFunction<Tuple2<Integer, Integer>, String>() {
 
@@ -107,8 +97,8 @@ public class SelfConnectionTest implements Serializable {
 
 			assertEquals(expected, result);
 		} catch (Exception e) {
-			fail();
 			e.printStackTrace();
+			fail();
 		}
 	}
 
@@ -120,7 +110,8 @@ public class SelfConnectionTest implements Serializable {
 
 		TestListResultSink<String> resultSink = new TestListResultSink<String>();
 
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
 
 		DataStream<Integer> src = env.fromElements(1, 3, 5);
 
@@ -175,7 +166,8 @@ public class SelfConnectionTest implements Serializable {
 
 		TestListResultSink<String> resultSink = new TestListResultSink<String>();
 
-		StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(3);
 
 		DataStream<Integer> src = env.fromElements(1, 3, 5).disableChaining();
 
@@ -248,4 +240,15 @@ public class SelfConnectionTest implements Serializable {
 
 		assertEquals(expected, result);
 	}
+
+	private static class IntegerTimestamp implements Timestamp<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long getTimestamp(Integer value) {
+			return value;
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
index 5e6ffa2..db09373 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
@@ -40,14 +40,12 @@ import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.FullStream;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class WindowingITCase implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	private static final Integer MEMORYSIZE = 32;
+public class WindowingITCase extends StreamingMultipleProgramsTestBase {
 
 	@SuppressWarnings("serial")
 	public static class ModKey implements KeySelector<Integer, Integer> {
@@ -98,17 +96,10 @@ public class WindowingITCase implements Serializable {
 
 		KeySelector<Integer, ?> key = new ModKey(2);
 
-		Timestamp<Integer> ts = new Timestamp<Integer>() {
-
-			private static final long serialVersionUID = 1L;
+		Timestamp<Integer> ts = new IntegerTimestamp();
 
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-		};
-
-		StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(2);
 		env.disableOperatorChaining();
 
 		DataStream<Integer> source = env.fromCollection(inputs);
@@ -116,14 +107,18 @@ public class WindowingITCase implements Serializable {
 		source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
 				.addSink(new TestSink1());
 
-		source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
-				.flatten().addSink(new TestSink2());
+		source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2))
+				.mapWindow(new IdentityWindowMap())
+				.flatten()
+				.addSink(new TestSink2()).name("TESTSIUNK2");
 
 		source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
 				.addSink(new TestSink4());
 
 		source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
-				.mapWindow(new IdentityWindowMap()).flatten().addSink(new TestSink5());
+				.mapWindow(new IdentityWindowMap())
+				.flatten()
+				.addSink(new TestSink5());
 
 		source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
 				.addSink(new TestSink3());
@@ -131,11 +126,13 @@ public class WindowingITCase implements Serializable {
 		source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
 				.addSink(new TestSink6());
 
-		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
+		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap())
+				.flatten()
 				.addSink(new TestSink7());
 
 		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
-				.getDiscretizedStream().addSink(new TestSink8());
+				.getDiscretizedStream()
+				.addSink(new TestSink8());
 
 		try {
 			source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream();
@@ -156,7 +153,8 @@ public class WindowingITCase implements Serializable {
 		source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11());
 
 		source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0)
-				.getDiscretizedStream().addSink(new TestSink12());
+				.getDiscretizedStream()
+				.addSink(new TestSink12());
 
 		DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
 			private static final long serialVersionUID = 1L;
@@ -202,12 +200,15 @@ public class WindowingITCase implements Serializable {
 		source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream()
 				.addSink(new TestSink10());
 
-		source.map(new MapFunction<Integer, Integer>() {
-			@Override
-			public Integer map(Integer value) throws Exception {
-				return value;
-			}
-		}).every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink13());
+		source
+				.map(new MapFunction<Integer, Integer>() {
+					@Override
+					public Integer map(Integer value) throws Exception {
+						return value;
+					}
+				})
+				.every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream()
+				.addSink(new TestSink13());
 
 		env.execute();
 
@@ -516,4 +517,13 @@ public class WindowingITCase implements Serializable {
 
 	}
 
+	private static class IntegerTimestamp implements Timestamp<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long getTimestamp(Integer value) {
+			return value;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
index 68e2a75..7ac5616 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
@@ -40,9 +40,9 @@ public class CsvOutputFormatITCase extends StreamingProgramTestBase {
 
 		DataStream<String> text = env.fromElements(WordCountData.TEXT);
 
-		DataStream<Tuple2<String, Integer>> counts =
-				text.flatMap(new Tokenizer())
-						.groupBy(0).sum(1);
+		DataStream<Tuple2<String, Integer>> counts = text
+				.flatMap(new Tokenizer())
+				.groupBy(0).sum(1);
 
 		counts.writeAsCsv(resultPath);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
index 3c48b3f..6bbcea8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
@@ -38,9 +38,9 @@ public class TextOutputFormatITCase extends StreamingProgramTestBase {
 
 		DataStream<String> text = env.fromElements(WordCountData.TEXT);
 
-		DataStream<Tuple2<String, Integer>> counts =
-				text.flatMap(new CsvOutputFormatITCase.Tokenizer())
-						.groupBy(0).sum(1);
+		DataStream<Tuple2<String, Integer>> counts = text
+				.flatMap(new CsvOutputFormatITCase.Tokenizer())
+				.groupBy(0).sum(1);
 
 		counts.writeAsText(resultPath);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 60db798..32b3455 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -51,6 +51,7 @@ import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.InstantiationUtil;
 import org.junit.Test;
@@ -62,7 +63,7 @@ import com.google.common.collect.ImmutableMap;
  * partitioned and non-partitioned user states. This test mimics the runtime
  * behavior of stateful stream operators.
  */
-public class StatefulOperatorTest {
+public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
 
 	@Test
 	public void simpleStateTest() throws Exception {
@@ -104,7 +105,8 @@ public class StatefulOperatorTest {
 	
 	@Test
 	public void apiTest() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(3, 32);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(3);
 
 		KeyedDataStream<Integer> keyedStream = env.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).keyBy(new ModKey(4));
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
index f45125b..122aa8a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
@@ -36,10 +36,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
-public class StreamVertexTest {
+public class StreamVertexTest extends StreamingMultipleProgramsTestBase {
 
 	private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
 
@@ -86,14 +87,12 @@ public class StreamVertexTest {
 	}
 
 	@SuppressWarnings("unused")
-	private static final int PARALLELISM = 1;
 	private static final int SOURCE_PARALELISM = 1;
-	private static final long MEMORYSIZE = 32;
 
 	@Test
 	public void wrongJobGraph() {
-		LocalStreamEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(SOURCE_PARALELISM);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(SOURCE_PARALELISM);
 
 		try {
 			env.fromCollection(null);
@@ -155,7 +154,8 @@ public class StreamVertexTest {
 
 	@Test
 	public void coTest() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(SOURCE_PARALELISM);
 
 		DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
 		DataStream<Long> generatedSequence = env.generateSequence(0, 3);
@@ -171,7 +171,8 @@ public class StreamVertexTest {
 
 	@Test
 	public void runStream() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(SOURCE_PARALELISM);
 
 		env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask())
 				.addSink(new MySink());

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
index 89679ea..9d9d47b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
@@ -24,12 +24,13 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
 @SuppressWarnings("serial")
-public class TranslationTest {
+public class TranslationTest extends StreamingMultipleProgramsTestBase {
 	
 	@Test
 	public void testCheckpointModeTranslation() {
@@ -66,7 +67,8 @@ public class TranslationTest {
 		env.generateSequence(1, 10000000)
 				.addSink(new SinkFunction<Long>() {
 					@Override
-					public void invoke(Long value) {}
+					public void invoke(Long value) {
+					}
 				});
 		
 		return env;

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
deleted file mode 100644
index 2643bba..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class DistributePartitionerTest {
-	
-	private RebalancePartitioner<Tuple> distributePartitioner;
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
-	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
-			null);
-	
-	@Before
-	public void setPartitioner() {
-		distributePartitioner = new RebalancePartitioner<Tuple>(false);
-	}
-	
-	@Test
-	public void testSelectChannelsLength() {
-		sd.setInstance(streamRecord);
-		assertEquals(1, distributePartitioner.selectChannels(sd, 1).length);
-		assertEquals(1, distributePartitioner.selectChannels(sd, 2).length);
-		assertEquals(1, distributePartitioner.selectChannels(sd, 1024).length);
-	}
-	
-	@Test
-	public void testSelectChannelsInterval() {
-		sd.setInstance(streamRecord);
-		assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
-		assertEquals(1, distributePartitioner.selectChannels(sd, 3)[0]);
-		assertEquals(2, distributePartitioner.selectChannels(sd, 3)[0]);
-		assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
-	}
-}