You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/08/19 18:42:12 UTC

[3/6] flink git commit: [FLINK-2398][api-breaking] Introduce StreamGraphGenerator

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
new file mode 100644
index 0000000..d392fd5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This transformation represents a split of one
+ * {@link org.apache.flink.streaming.api.datastream.DataStream} into several {@code DataStreams}
+ * using an {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
+ *
+ * <p>
+ * This does not create a physical operation, it only affects how upstream operations are
+ * connected to downstream operations.
+ *
+ * @param <T> The type of the elements that result from this {@code SplitTransformation}
+ */
+public class SplitTransformation<T> extends StreamTransformation<T> {
+
+	private final StreamTransformation<T> input;
+
+	private final OutputSelector<T> outputSelector;
+
+	/**
+	 * Creates a new {@code SplitTransformation} from the given input and {@code OutputSelector}.
+	 *
+	 * @param input The input {@code StreamTransformation}
+	 * @param outputSelector The output selector
+	 */
+	public SplitTransformation(StreamTransformation<T> input,
+			OutputSelector<T> outputSelector) {
+		super("Split", input.getOutputType(), input.getParallelism());
+		this.input = input;
+		this.outputSelector = outputSelector;
+	}
+
+	/**
+	 * Returns the input {@code StreamTransformation}.
+	 */
+	public StreamTransformation<T> getInput() {
+		return input;
+	}
+
+	/**
+	 * Returns the {@code OutputSelector}
+	 */
+	public OutputSelector<T> getOutputSelector() {
+		return outputSelector;
+	}
+
+	@Override
+	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+		List<StreamTransformation<?>> result = Lists.newArrayList();
+		result.add(this);
+		result.addAll(input.getTransitivePredecessors());
+		return result;
+	}
+
+	@Override
+	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+		throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
new file mode 100644
index 0000000..dadcfa2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+
+/**
+ * A {@code StreamTransformation} represents the operation that creates a
+ * {@link org.apache.flink.streaming.api.datastream.DataStream}. Every
+ * {@link org.apache.flink.streaming.api.datastream.DataStream} has an underlying
+ * {@code StreamTransformation} that is the origin of said DataStream.
+ *
+ * <p>
+ * API operations such as {@link org.apache.flink.streaming.api.datastream.DataStream#map} create
+ * a tree of {@code StreamTransformation}s underneath. When the stream program is to be executed this
+ * graph is translated to a {@link StreamGraph} using
+ * {@link org.apache.flink.streaming.api.graph.StreamGraphGenerator}.
+ *
+ * <p>
+ * A {@code StreamTransformation} does not necessarily correspond to a physical operation
+ * at runtime. Some operations are only logical concepts. Examples of this are union,
+ * split/select data stream, partitioning.
+ *
+ * <p>
+ * The following graph of {@code StreamTransformations}:
+ *
+ * <pre>
+ *   Source              Source        
+ *      +                   +           
+ *      |                   |           
+ *      v                   v           
+ *  Rebalance          HashPartition    
+ *      +                   +           
+ *      |                   |           
+ *      |                   |           
+ *      +------>Union<------+           
+ *                +                     
+ *                |                     
+ *                v                     
+ *              Split                   
+ *                +                     
+ *                |                     
+ *                v                     
+ *              Select                  
+ *                +                     
+ *                v                     
+ *               Map                    
+ *                +                     
+ *                |                     
+ *                v                     
+ *              Sink 
+ * </pre>
+ *
+ * Would result in this graph of operations at runtime:
+ *
+ * <pre>
+ *  Source              Source
+ *    +                   +
+ *    |                   |
+ *    |                   |
+ *    +------->Map<-------+
+ *              +
+ *              |
+ *              v
+ *             Sink
+ * </pre>
+ *
+ * The information about partitioning, union, split/select end up being encoded in the edges
+ * that connect the sources to the map operation.
+ *
+ * @param <T> The type of the elements that result from this {@code StreamTransformation}
+ */
+public abstract class StreamTransformation<T> {
+
+	// This is used to assign a unique ID to every StreamTransformation
+	protected static Integer idCounter = 0;
+	public static int getNewNodeId() {
+		idCounter++;
+		return idCounter;
+	}
+
+	protected final int id;
+
+	protected String name;
+
+	protected TypeInformation<T> outputType;
+	// This is used to handle MissingTypeInfo. As long as the outputType has not been queried
+	// it can still be changed using setOutputType(). Afterwards an exception is thrown when
+	// trying to change the output type.
+	protected boolean typeUsed;
+
+	private int parallelism;
+
+	protected long bufferTimeout = -1;
+
+	protected StreamGraph.ResourceStrategy resourceStrategy = StreamGraph.ResourceStrategy.DEFAULT;
+
+	/**
+	 * Creates a new {@code StreamTransformation} with the given name, output type and parallelism.
+	 *
+	 * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
+	 * @param outputType The output type of this {@code StreamTransformation}
+	 * @param parallelism The parallelism of this {@code StreamTransformation}
+	 */
+	public StreamTransformation(String name, TypeInformation<T> outputType, int parallelism) {
+		this.id = getNewNodeId();
+		this.name = Preconditions.checkNotNull(name);
+		this.outputType = outputType;
+		this.parallelism = parallelism;
+	}
+
+	/**
+	 * Returns the unique ID of this {@code StreamTransformation}.
+	 */
+	public int getId() {
+		return id;
+	}
+
+	/**
+	 * Changes the name of this {@code StreamTransformation}.
+	 */
+	public void setName(String name) {
+		this.name = name;
+	}
+
+	/**
+	 * Returns the name of this {@code StreamTransformation}.
+	 */
+	public String getName() {
+		return name;
+	}
+
+	/**
+	 * Returns the parallelism of this {@code StreamTransformation}
+	 */
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	/**
+	 * Sets the parallelism of this {@code StreamTransformation}
+	 * @param parallelism The new parallelism to set on this {@code StreamTransformation}
+	 */
+	public void setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero.");
+		this.parallelism = parallelism;
+	}
+
+	/**
+	 * Tries to fill in the type information. Type information can be filled in
+	 * later when the program uses a type hint. This method checks whether the
+	 * type information has ever been accessed before and does not allow
+	 * modifications if the type was accessed already. This ensures consistency
+	 * by making sure different parts of the operation do not assume different
+	 * type information.
+	 *
+	 * @param outputType The type information to fill in.
+	 *
+	 * @throws IllegalStateException Thrown, if the type information has been accessed before.
+	 */
+	public void setOutputType(TypeInformation<T> outputType) {
+		if (typeUsed) {
+			throw new IllegalStateException(
+					"TypeInformation cannot be filled in for the type after it has been used. "
+							+ "Please make sure that the type info hints are the first call after"
+							+ " the transformation function, "
+							+ "before any access to types or semantic properties, etc.");
+		}
+		this.outputType = outputType;
+	}
+
+	/**
+	 * Returns the output type of this {@code StreamTransformation} as a {@link TypeInformation}. Once
+	 * this is used once the output type cannot be changed anymore using {@link #setOutputType}.
+	 *
+	 * @return The output type of this {@code StreamTransformation}
+	 */
+	public TypeInformation<T> getOutputType() {
+		if (outputType instanceof MissingTypeInfo) {
+			MissingTypeInfo typeInfo = (MissingTypeInfo) this.outputType;
+			throw new InvalidTypesException(
+					"The return type of function '"
+							+ typeInfo.getFunctionName()
+							+ "' could not be determined automatically, due to type erasure. "
+							+ "You can give type information hints by using the returns(...) "
+							+ "method on the result of the transformation call, or by letting "
+							+ "your function implement the 'ResultTypeQueryable' "
+							+ "interface.", typeInfo.getTypeException());
+		}
+		typeUsed = true;
+		return this.outputType;
+	}
+
+	/**
+	 * Sets the chaining strategy of this {@code StreamTransformation}.
+	 */
+	public abstract void setChainingStrategy(StreamOperator.ChainingStrategy strategy);
+
+	/**
+	 * Set the buffer timeout of this {@code StreamTransformation}. The timeout is used when
+	 * sending elements over the network. The timeout specifies how long a network buffer
+	 * should be kept waiting before sending. A higher timeout means that more elements will
+	 * be sent in one buffer, this increases throughput. The latency, however, is negatively
+	 * affected by a higher timeout.
+	 */
+	public void setBufferTimeout(long bufferTimeout) {
+		this.bufferTimeout = bufferTimeout;
+	}
+
+	/**
+	 * Returns the buffer timeout of this {@code StreamTransformation}.
+	 *
+	 * <p>
+	 * {@see #setBufferTimeout}
+	 */
+	public long getBufferTimeout() {
+		return bufferTimeout;
+	}
+
+	/**
+	 * Sets the {@link org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy} of this
+	 * {@code StreamTransformation}. The resource strategy is used when scheduling operations on actual
+	 * workers when transforming the StreamTopology to an
+	 * {@link org.apache.flink.runtime.executiongraph.ExecutionGraph}.
+	 */
+	public void setResourceStrategy(StreamGraph.ResourceStrategy resourceStrategy) {
+		this.resourceStrategy = resourceStrategy;
+	}
+
+	/**
+	 * Returns the {@code ResourceStrategy} of this {@code StreamTransformation}.
+	 *
+	 * <p>
+	 * {@see #setResourceStrategy}
+	 */
+	public StreamGraph.ResourceStrategy getResourceStrategy() {
+		return resourceStrategy;
+	}
+
+	/**
+	 * Returns all transitive predecessor {@code StreamTransformation}s of this {@code StreamTransformation}. This
+	 * is, for example, used when determining whether a feedback edge of an iteration
+	 * actually has the iteration head as a predecessor.
+	 *
+	 * @return The list of transitive predecessors.
+	 */
+	public abstract Collection<StreamTransformation<?>> getTransitivePredecessors();
+
+	@Override
+	public String toString() {
+		return getClass().getSimpleName() + "{" +
+				"id=" + id +
+				", name='" + name + '\'' +
+				", outputType=" + outputType +
+				", parallelism=" + parallelism +
+				'}';
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (!(o instanceof StreamTransformation)) {
+			return false;
+		}
+
+		StreamTransformation<?> that = (StreamTransformation<?>) o;
+
+		if (bufferTimeout != that.bufferTimeout) {
+			return false;
+		}
+		if (id != that.id) {
+			return false;
+		}
+		if (parallelism != that.parallelism) {
+			return false;
+		}
+		if (!name.equals(that.name)) {
+			return false;
+		}
+		if (outputType != null ? !outputType.equals(that.outputType) : that.outputType != null) {
+			return false;
+		}
+		return resourceStrategy == that.resourceStrategy;
+	}
+
+	@Override
+	public int hashCode() {
+		int result = id;
+		result = 31 * result + name.hashCode();
+		result = 31 * result + (outputType != null ? outputType.hashCode() : 0);
+		result = 31 * result + parallelism;
+		result = 31 * result + (int) (bufferTimeout ^ (bufferTimeout >>> 32));
+		result = 31 * result + resourceStrategy.hashCode();
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
new file mode 100644
index 0000000..e7273c5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This Transformation represents the application of a
+ * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to two input
+ * {@code StreamTransformations}. The result is again only one stream.
+ *
+ * @param <IN1> The type of the elements in the first input {@code StreamTransformation}
+ * @param <IN2> The type of the elements in the second input {@code StreamTransformation}
+ * @param <OUT> The type of the elements that result from this {@code TwoInputTransformation}
+ */
+public class TwoInputTransformation<IN1, IN2, OUT> extends StreamTransformation<OUT> {
+
+	private final StreamTransformation<IN1> input1;
+	private final StreamTransformation<IN2> input2;
+
+	private final TwoInputStreamOperator<IN1, IN2, OUT> operator;
+
+	/**
+	 * Creates a new {@code TwoInputTransformation} from the given inputs and operator.
+	 *
+	 * @param input1 The first input {@code StreamTransformation}
+	 * @param input2 The second input {@code StreamTransformation}
+	 * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
+	 * @param operator The {@code TwoInputStreamOperator}
+	 * @param outputType The type of the elements produced by this Transformation
+	 * @param parallelism The parallelism of this Transformation
+	 */
+	public TwoInputTransformation(
+			StreamTransformation<IN1> input1,
+			StreamTransformation<IN2> input2,
+			String name,
+			TwoInputStreamOperator<IN1, IN2, OUT> operator,
+			TypeInformation<OUT> outputType,
+			int parallelism) {
+		super(name, outputType, parallelism);
+		this.input1 = input1;
+		this.input2 = input2;
+		this.operator = operator;
+	}
+
+	/**
+	 * Returns the first input {@code StreamTransformation} of this {@code TwoInputTransformation}.
+	 */
+	public StreamTransformation<IN1> getInput1() {
+		return input1;
+	}
+
+	/**
+	 * Returns the first input {@code StreamTransformation} of this {@code TwoInputTransformation}.
+	 */
+	public StreamTransformation<IN2> getInput2() {
+		return input2;
+	}
+
+	/**
+	 * Returns the {@code TypeInformation} for the elements from the first input.
+	 */
+	public TypeInformation<IN1> getInputType1() {
+		return input1.getOutputType();
+	}
+
+	/**
+	 * Returns the {@code TypeInformation} for the elements from the first input.
+	 */
+	public TypeInformation<IN2> getInputType2() {
+		return input2.getOutputType();
+	}
+
+	/**
+	 * Returns the {@code TwoInputStreamOperator} of this Transformation.
+	 */
+	public TwoInputStreamOperator<IN1, IN2, OUT> getOperator() {
+		return operator;
+	}
+
+	@Override
+	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+		List<StreamTransformation<?>> result = Lists.newArrayList();
+		result.add(this);
+		result.addAll(input1.getTransitivePredecessors());
+		result.addAll(input2.getTransitivePredecessors());
+		return result;
+	}
+
+	@Override
+	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+		operator.setChainingStrategy(strategy);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
new file mode 100644
index 0000000..4fa3c0a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This transformation represents a union of several input
+ * {@link StreamTransformation StreamTransformations}.
+ *
+ * <p>
+ * This does not create a physical operation, it only affects how upstream operations are
+ * connected to downstream operations.
+ *
+ * @param <T> The type of the elements that result from this {@code UnionTransformation}
+ */
+public class UnionTransformation<T> extends StreamTransformation<T> {
+	private final List<StreamTransformation<T>> inputs;
+
+	/**
+	 * Creates a new {@code UnionTransformation} from the given input {@code StreamTransformations}.
+	 *
+	 * <p>
+	 * The input {@code StreamTransformations} must all have the same type.
+	 *
+	 * @param inputs The list of input {@code StreamTransformations}
+	 */
+	public UnionTransformation(List<StreamTransformation<T>> inputs) {
+		super("Union", inputs.get(0).getOutputType(), inputs.get(0).getParallelism());
+
+		for (StreamTransformation<T> input: inputs) {
+			if (!input.getOutputType().equals(getOutputType())) {
+				throw new UnsupportedOperationException("Type mismatch in input " + input);
+			}
+		}
+
+		this.inputs = Lists.newArrayList(inputs);
+	}
+
+	/**
+	 * Returns the list of input {@code StreamTransformations}.
+	 */
+	public List<StreamTransformation<T>> getInputs() {
+		return inputs;
+	}
+
+	@Override
+	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+		List<StreamTransformation<?>> result = Lists.newArrayList();
+		result.add(this);
+		for (StreamTransformation<T> input: inputs) {
+			result.addAll(input.getTransitivePredecessors());
+		}
+		return result;
+	}
+
+	@Override
+	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+		throw new UnsupportedOperationException("Cannot set chaining strategy on Union Transformation.");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
index f51a04f..f3d851c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
@@ -23,8 +23,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 /**
  * Partitioner that selects all the output channels.
  *
- * @param <T>
- *            Type of the Tuple
+ * @param <T> Type of the elements in the Stream being broadcast
  */
 public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
@@ -33,10 +32,6 @@ public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
 	boolean set;
 	int setNumber;
 
-	public BroadcastPartitioner() {
-		super(PartitioningStrategy.BROADCAST);
-	}
-
 	@Override
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
 			int numberOfOutputChannels) {
@@ -52,4 +47,14 @@ public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
 			return returnArray;
 		}
 	}
+
+	@Override
+	public StreamPartitioner<T> copy() {
+		return this;
+	}
+
+	@Override
+	public String toString() {
+		return "BROADCAST";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
index 6c40c03..7bb9480 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
@@ -38,7 +38,6 @@ public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
 	KeySelector<T, K> keySelector;
 
 	public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
-		super(PartitioningStrategy.CUSTOM);
 		this.partitioner = partitioner;
 		this.keySelector = keySelector;
 	}
@@ -58,4 +57,14 @@ public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
 
 		return returnArray;
 	}
+
+	@Override
+	public StreamPartitioner<T> copy() {
+		return this;
+	}
+
+	@Override
+	public String toString() {
+		return "CUSTOM";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
deleted file mode 100644
index 7026d45..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that selects the same (one) channel for two Tuples having a
- * specified fields equal.
- * 
- * @param <T>
- *            Type of the Tuple
- */
-public class FieldsPartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private int[] returnArray = new int[1];
-	KeySelector<T, ?> keySelector;
-
-	public FieldsPartitioner(KeySelector<T, ?> keySelector) {
-		super(PartitioningStrategy.GROUPBY);
-		this.keySelector = keySelector;
-	}
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		Object key;
-		try {
-			key = keySelector.getKey(record.getInstance().getValue());
-		} catch (Exception e) {
-			throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
-		}
-		returnArray[0] = Math.abs(key.hashCode() % numberOfOutputChannels);
-
-		return returnArray;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
new file mode 100644
index 0000000..4fb460c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner that forwards elements only to the locally running downstream operation.
+ * 
+ * @param <T> Type of the elements in the Stream
+ */
+public class ForwardPartitioner<T> extends StreamPartitioner<T> {
+	private static final long serialVersionUID = 1L;
+
+	private int[] returnArray = new int[] {0};
+
+	@Override
+	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
+		return returnArray;
+	}
+	
+	public StreamPartitioner<T> copy() {
+		return this;
+	}
+	
+	@Override
+	public String toString() {
+		return "FORWARD";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
index 46b290b..b19fb41 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
@@ -20,19 +20,29 @@ package org.apache.flink.streaming.runtime.partitioner;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-//Group to the partitioner with the lowest id
+/**
+ * Partitioner that sends all elements to the downstream operator with subtask ID=0;
+ *
+ * @param <T> Type of the elements in the Stream being partitioned
+ */
 public class GlobalPartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
 	private int[] returnArray = new int[] { 0 };
 
-	public GlobalPartitioner() {
-		super(PartitioningStrategy.GLOBAL);
-	}
-
 	@Override
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
 			int numberOfOutputChannels) {
 		return returnArray;
 	}
+
+	@Override
+	public StreamPartitioner<T> copy() {
+		return this;
+	}
+
+	@Override
+	public String toString() {
+		return "GLOBAL";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
new file mode 100644
index 0000000..a3f5158
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner selects the target channel based on the hash value of a key from a
+ * {@link KeySelector}.
+ *
+ * @param <T> Type of the elements in the Stream being partitioned
+ */
+public class HashPartitioner<T> extends StreamPartitioner<T> {
+	private static final long serialVersionUID = 1L;
+
+	private int[] returnArray = new int[1];
+	KeySelector<T, ?> keySelector;
+
+	public HashPartitioner(KeySelector<T, ?> keySelector) {
+		this.keySelector = keySelector;
+	}
+
+	@Override
+	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
+			int numberOfOutputChannels) {
+		Object key;
+		try {
+			key = keySelector.getKey(record.getInstance().getValue());
+		} catch (Exception e) {
+			throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
+		}
+		returnArray[0] = Math.abs(key.hashCode() % numberOfOutputChannels);
+
+		return returnArray;
+	}
+
+	@Override
+	public StreamPartitioner<T> copy() {
+		return this;
+	}
+
+	@Override
+	public String toString() {
+		return "HASH";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
index e6ad821..2dfff0e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
@@ -24,34 +24,26 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  * Partitioner that distributes the data equally by cycling through the output
  * channels.
  * 
- * @param <T>
- *            Type of the Tuple
+ * @param <T> Type of the elements in the Stream being rebalanced
  */
 public class RebalancePartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
 	private int[] returnArray = new int[] {-1};
-	private boolean forward;
-
-	public RebalancePartitioner(boolean forward) {
-		super(forward ? PartitioningStrategy.FORWARD : PartitioningStrategy.DISTRIBUTE);
-		this.forward = forward;
-	}
 
 	@Override
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
 			int numberOfOutputChannels) {
 		this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
-
 		return this.returnArray;
 	}
 	
 	public StreamPartitioner<T> copy() {
-		return new RebalancePartitioner<T>(forward);
+		return this;
 	}
 	
 	@Override
 	public String toString() {
-		return forward ? "ForwardPartitioner" : "RebalancePartitioner";
+		return "REBALANCE";
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
index ba50113..93c6f9c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
@@ -36,14 +36,20 @@ public class ShufflePartitioner<T> extends StreamPartitioner<T> {
 
 	private int[] returnArray = new int[1];
 
-	public ShufflePartitioner() {
-		super(PartitioningStrategy.SHUFFLE);
-	}
-
 	@Override
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
 			int numberOfOutputChannels) {
 		returnArray[0] = random.nextInt(numberOfOutputChannels);
 		return returnArray;
 	}
+
+	@Override
+	public StreamPartitioner<T> copy() {
+		return new ShufflePartitioner<T>();
+	}
+
+	@Override
+	public String toString() {
+		return "SHUFFLE";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
index b37655b..4ef360d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -24,30 +24,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public abstract class StreamPartitioner<T> implements
 		ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
-
-	public enum PartitioningStrategy {
-
-		FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY, CUSTOM
-
-	}
-
 	private static final long serialVersionUID = 1L;
-	private PartitioningStrategy strategy;
-
-	public StreamPartitioner(PartitioningStrategy strategy) {
-		this.strategy = strategy;
-	}
-
-	public PartitioningStrategy getStrategy() {
-		return strategy;
-	}
 
-	public StreamPartitioner<T> copy() {
-		return this;
-	}
-	
-	@Override
-	public String toString() {
-		return this.getClass().getSimpleName();
-	}
+	public abstract StreamPartitioner<T> copy();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
index 3ad6b8e..fdf7697 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
@@ -24,19 +24,23 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.NoOpSink;
+import org.apache.flink.streaming.util.ReceiveCheckNoOpSink;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
 @SuppressWarnings("serial")
-public class ChainedRuntimeContextTest {
-	private static final long MEMORYSIZE = 32;
+public class ChainedRuntimeContextTest extends StreamingMultipleProgramsTestBase {
 	private static RuntimeContext srcContext;
 	private static RuntimeContext mapContext;
 
 	@Test
 	public void test() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
-		env.addSource(new TestSource()).map(new TestMap());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		env.addSource(new TestSource()).map(new TestMap()).addSink(new NoOpSink<Integer>());
 		env.execute();
 
 		assertNotEquals(srcContext, mapContext);

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
index 369b384..7ea1309 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
@@ -31,21 +31,21 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestListResultSink;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class CoStreamTest {
-
-	private static final long MEMORY_SIZE = 32;
+public class CoStreamTest extends StreamingMultipleProgramsTestBase {
 
 	private static ArrayList<String> expected = new ArrayList<String>();
 
 	@Test
 	public void test() {
 
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
 
 		TestListResultSink<String> resultSink = new TestListResultSink<String>();
 
@@ -129,4 +129,4 @@ public class CoStreamTest {
 
 		assertEquals(expected, result);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 324143f..9775392 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -54,19 +54,20 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
-import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.NoOpSink;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class DataStreamTest {
+public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 
-	private static final long MEMORYSIZE = 32;
-	private static int PARALLELISM = 2;
 
 	/**
 	 * Tests {@link SingleOutputStreamOperator#name(String)} functionality.
@@ -75,7 +76,7 @@ public class DataStreamTest {
 	 */
 	@Test
 	public void testNaming() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<Long> dataStream1 = env.generateSequence(0, 0).name("testSource1")
 				.map(new MapFunction<Long, Long>() {
@@ -93,7 +94,7 @@ public class DataStreamTest {
 					}
 				}).name("testMap");
 
-		DataStream<Long> connected = dataStream1.connect(dataStream2)
+		DataStreamSink<Long> connected = dataStream1.connect(dataStream2)
 				.flatMap(new CoFlatMapFunction<Long, Long, Long>() {
 					@Override
 					public void flatMap1(Long value, Collector<Long> out) throws Exception {
@@ -110,7 +111,8 @@ public class DataStreamTest {
 						return null;
 					}
 				}).name("testWindowFold")
-				.flatten();
+				.flatten()
+				.print();
 
 		//test functionality through the operator names in the execution plan
 		String plan = env.getExecutionPlan();
@@ -130,8 +132,7 @@ public class DataStreamTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testPartitioning() {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
-		StreamGraph graph = env.getStreamGraph();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream src1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
 		DataStream src2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
@@ -143,10 +144,15 @@ public class DataStreamTest {
 		DataStream group3 = src1.groupBy("f0");
 		DataStream group4 = src1.groupBy(new FirstSelector());
 
-		assertTrue(isPartitioned(graph.getStreamEdge(group1.getId(), createDownStreamId(group1))));
-		assertTrue(isPartitioned(graph.getStreamEdge(group2.getId(), createDownStreamId(group2))));
-		assertTrue(isPartitioned(graph.getStreamEdge(group3.getId(), createDownStreamId(group3))));
-		assertTrue(isPartitioned(graph.getStreamEdge(group4.getId(), createDownStreamId(group4))));
+		int id1 = createDownStreamId(group1);
+		int id2 = createDownStreamId(group2);
+		int id3 = createDownStreamId(group3);
+		int id4 = createDownStreamId(group4);
+
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id1)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id2)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id3)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id4)));
 
 		assertTrue(isGrouped(group1));
 		assertTrue(isGrouped(group2));
@@ -159,10 +165,15 @@ public class DataStreamTest {
 		DataStream partition3 = src1.partitionByHash("f0");
 		DataStream partition4 = src1.partitionByHash(new FirstSelector());
 
-		assertTrue(isPartitioned(graph.getStreamEdge(partition1.getId(), createDownStreamId(partition1))));
-		assertTrue(isPartitioned(graph.getStreamEdge(partition2.getId(), createDownStreamId(partition2))));
-		assertTrue(isPartitioned(graph.getStreamEdge(partition3.getId(), createDownStreamId(partition3))));
-		assertTrue(isPartitioned(graph.getStreamEdge(partition4.getId(), createDownStreamId(partition4))));
+		int pid1 = createDownStreamId(partition1);
+		int pid2 = createDownStreamId(partition2);
+		int pid3 = createDownStreamId(partition3);
+		int pid4 = createDownStreamId(partition4);
+
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid1)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid2)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid3)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid4)));
 
 		assertFalse(isGrouped(partition1));
 		assertFalse(isGrouped(partition3));
@@ -181,9 +192,13 @@ public class DataStreamTest {
 		DataStream customPartition3 = src1.partitionCustom(longPartitioner, "f0");
 		DataStream customPartition4 = src1.partitionCustom(longPartitioner, new FirstSelector());
 
-		assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition1.getId(), createDownStreamId(customPartition1))));
-		assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition3.getId(), createDownStreamId(customPartition3))));
-		assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition4.getId(), createDownStreamId(customPartition4))));
+		int cid1 = createDownStreamId(customPartition1);
+		int cid2 = createDownStreamId(customPartition3);
+		int cid3 = createDownStreamId(customPartition4);
+
+		assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid1)));
+		assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid2)));
+		assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid3)));
 
 		assertFalse(isGrouped(customPartition1));
 		assertFalse(isGrouped(customPartition3));
@@ -205,20 +220,20 @@ public class DataStreamTest {
 		ConnectedDataStream connectedGroup5 = connected.groupBy(new FirstSelector(), new FirstSelector());
 		Integer downStreamId5 = createDownStreamId(connectedGroup5);
 
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup1.getFirst().getId(), downStreamId1)));
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup1.getSecond().getId(), downStreamId1)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId1)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId1)));
 
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup2.getFirst().getId(), downStreamId2)));
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup2.getSecond().getId(), downStreamId2)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId2)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId2)));
 
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup3.getFirst().getId(), downStreamId3)));
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup3.getSecond().getId(), downStreamId3)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId3)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId3)));
 
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup4.getFirst().getId(), downStreamId4)));
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup4.getSecond().getId(), downStreamId4)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId4)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId4)));
 
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup5.getFirst().getId(), downStreamId5)));
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup5.getSecond().getId(), downStreamId5)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId5)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId5)));
 
 		assertTrue(isGrouped(connectedGroup1));
 		assertTrue(isGrouped(connectedGroup2));
@@ -242,20 +257,30 @@ public class DataStreamTest {
 		ConnectedDataStream connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
 		Integer connectDownStreamId5 = createDownStreamId(connectedPartition5);
 
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition1.getFirst().getId(), connectDownStreamId1)));
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition1.getSecond().getId(), connectDownStreamId1)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
+				connectDownStreamId1)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
+				connectDownStreamId1)));
 
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition2.getFirst().getId(), connectDownStreamId2)));
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition2.getSecond().getId(), connectDownStreamId2)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
+				connectDownStreamId2)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
+				connectDownStreamId2)));
 
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition3.getFirst().getId(), connectDownStreamId3)));
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition3.getSecond().getId(), connectDownStreamId3)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
+				connectDownStreamId3)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
+				connectDownStreamId3)));
 
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition4.getFirst().getId(), connectDownStreamId4)));
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition4.getSecond().getId(), connectDownStreamId4)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
+				connectDownStreamId4)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
+				connectDownStreamId4)));
 
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition5.getFirst().getId(), connectDownStreamId5)));
-		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition5.getSecond().getId(), connectDownStreamId5)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
+				connectDownStreamId5)));
+		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
+				connectDownStreamId5)));
 
 		assertFalse(isGrouped(connectedPartition1));
 		assertFalse(isGrouped(connectedPartition2));
@@ -269,17 +294,17 @@ public class DataStreamTest {
 	 */
 	@Test
 	public void testParallelism() {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(10, MEMORYSIZE);
-		StreamGraph graph = env.getStreamGraph();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStreamSource<Tuple2<Long, Long>> src = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+		env.setParallelism(10);
 
 		SingleOutputStreamOperator<Long, ?> map = src.map(new MapFunction<Tuple2<Long, Long>, Long>() {
 			@Override
 			public Long map(Tuple2<Long, Long> value) throws Exception {
 				return null;
 			}
-		});
+		}).name("MyMap");
 
 		DataStream<Long> windowed = map
 				.window(Count.of(10))
@@ -288,7 +313,10 @@ public class DataStreamTest {
 					public Long fold(Long accumulator, Long value) throws Exception {
 						return null;
 					}
-				}).flatten();
+				})
+				.flatten();
+
+		windowed.addSink(new NoOpSink<Long>());
 
 		DataStreamSink<Long> sink = map.addSink(new SinkFunction<Long>() {
 			@Override
@@ -296,16 +324,21 @@ public class DataStreamTest {
 			}
 		});
 
-		assertEquals(1, graph.getStreamNode(src.getId()).getParallelism());
-		assertEquals(10, graph.getStreamNode(map.getId()).getParallelism());
-		assertEquals(10, graph.getStreamNode(windowed.getId()).getParallelism());
-		assertEquals(10, graph.getStreamNode(sink.getId()).getParallelism());
+		assertEquals(1, env.getStreamGraph().getStreamNode(src.getId()).getParallelism());
+		assertEquals(10, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
+		assertEquals(1, env.getStreamGraph().getStreamNode(windowed.getId()).getParallelism());
+		assertEquals(10,
+				env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
 
 		env.setParallelism(7);
-		assertEquals(1, graph.getStreamNode(src.getId()).getParallelism());
-		assertEquals(7, graph.getStreamNode(map.getId()).getParallelism());
-		assertEquals(7, graph.getStreamNode(windowed.getId()).getParallelism());
-		assertEquals(7, graph.getStreamNode(sink.getId()).getParallelism());
+
+		// Some parts, such as windowing rely on the fact that previous operators have a parallelism
+		// set when instantiating the Discretizer. This would break if we dynamically changed
+		// the parallelism of operations when changing the setting on the Execution Environment.
+		assertEquals(1, env.getStreamGraph().getStreamNode(src.getId()).getParallelism());
+		assertEquals(10, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
+		assertEquals(1, env.getStreamGraph().getStreamNode(windowed.getId()).getParallelism());
+		assertEquals(10, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
 
 		try {
 			src.setParallelism(3);
@@ -314,21 +347,22 @@ public class DataStreamTest {
 		}
 
 		DataStreamSource<Long> parallelSource = env.generateSequence(0, 0);
-		assertEquals(7, graph.getStreamNode(parallelSource.getId()).getParallelism());
+		parallelSource.addSink(new NoOpSink<Long>());
+		assertEquals(7, env.getStreamGraph().getStreamNode(parallelSource.getId()).getParallelism());
 
 		parallelSource.setParallelism(3);
-		assertEquals(3, graph.getStreamNode(parallelSource.getId()).getParallelism());
+		assertEquals(3, env.getStreamGraph().getStreamNode(parallelSource.getId()).getParallelism());
 
 		map.setParallelism(2);
-		assertEquals(2, graph.getStreamNode(map.getId()).getParallelism());
+		assertEquals(2, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
 
 		sink.setParallelism(4);
-		assertEquals(4, graph.getStreamNode(sink.getId()).getParallelism());
+		assertEquals(4, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
 	}
 
 	@Test
 	public void testTypeInfo() {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<Long> src1 = env.generateSequence(0, 0);
 		assertEquals(TypeExtractor.getForClass(Long.class), src1.getType());
@@ -366,9 +400,7 @@ public class DataStreamTest {
 
 	@Test
 	public void operatorTest() {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
-
-		StreamGraph streamGraph = env.getStreamGraph();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStreamSource<Long> src = env.generateSequence(0, 0);
 
@@ -379,6 +411,7 @@ public class DataStreamTest {
 			}
 		};
 		DataStream<Integer> map = src.map(mapFunction);
+		map.addSink(new NoOpSink<Integer>());
 		assertEquals(mapFunction, getFunctionForDataStream(map));
 
 
@@ -388,6 +421,7 @@ public class DataStreamTest {
 			}
 		};
 		DataStream<Integer> flatMap = src.flatMap(flatMapFunction);
+		flatMap.addSink(new NoOpSink<Integer>());
 		assertEquals(flatMapFunction, getFunctionForDataStream(flatMap));
 
 		FilterFunction<Integer> filterFunction = new FilterFunction<Integer>() {
@@ -401,16 +435,18 @@ public class DataStreamTest {
 				.union(flatMap)
 				.filter(filterFunction);
 
+		unionFilter.addSink(new NoOpSink<Integer>());
+
 		assertEquals(filterFunction, getFunctionForDataStream(unionFilter));
 
 		try {
-			streamGraph.getStreamEdge(map.getId(), unionFilter.getId());
+			env.getStreamGraph().getStreamEdge(map.getId(), unionFilter.getId());
 		} catch (RuntimeException e) {
 			fail(e.getMessage());
 		}
 
 		try {
-			streamGraph.getStreamEdge(flatMap.getId(), unionFilter.getId());
+			env.getStreamGraph().getStreamEdge(flatMap.getId(), unionFilter.getId());
 		} catch (RuntimeException e) {
 			fail(e.getMessage());
 		}
@@ -423,14 +459,15 @@ public class DataStreamTest {
 		};
 
 		SplitDataStream<Integer> split = unionFilter.split(outputSelector);
-		List<OutputSelector<?>> outputSelectors = streamGraph.getStreamNode(split.getId()).getOutputSelectors();
+		split.select("dummy").addSink(new NoOpSink<Integer>());
+		List<OutputSelector<?>> outputSelectors = env.getStreamGraph().getStreamNode(unionFilter.getId()).getOutputSelectors();
 		assertEquals(1, outputSelectors.size());
 		assertEquals(outputSelector, outputSelectors.get(0));
 
 		DataStream<Integer> select = split.select("a");
 		DataStreamSink<Integer> sink = select.print();
 
-		StreamEdge splitEdge = streamGraph.getStreamEdge(select.getId(), sink.getId());
+		StreamEdge splitEdge = env.getStreamGraph().getStreamEdge(unionFilter.getId(), sink.getTransformation().getId());
 		assertEquals("a", splitEdge.getSelectedNames().get(0));
 
 		ConnectedDataStream<Integer, Integer> connect = map.connect(flatMap);
@@ -446,16 +483,17 @@ public class DataStreamTest {
 			}
 		};
 		DataStream<String> coMap = connect.map(coMapper);
+		coMap.addSink(new NoOpSink<String>());
 		assertEquals(coMapper, getFunctionForDataStream(coMap));
 
 		try {
-			streamGraph.getStreamEdge(map.getId(), coMap.getId());
+			env.getStreamGraph().getStreamEdge(map.getId(), coMap.getId());
 		} catch (RuntimeException e) {
 			fail(e.getMessage());
 		}
 
 		try {
-			streamGraph.getStreamEdge(flatMap.getId(), coMap.getId());
+			env.getStreamGraph().getStreamEdge(flatMap.getId(), coMap.getId());
 		} catch (RuntimeException e) {
 			fail(e.getMessage());
 		}
@@ -463,12 +501,11 @@ public class DataStreamTest {
 	
 	@Test
 	public void sinkKeyTest() {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
-		StreamGraph streamGraph = env.getStreamGraph();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		DataStream<Long> sink = env.generateSequence(1, 100).print();
-		assertTrue(streamGraph.getStreamNode(sink.getId()).getStatePartitioner() == null);
-		assertTrue(streamGraph.getStreamNode(sink.getId()).getInEdges().get(0).getPartitioner() instanceof RebalancePartitioner);
+		DataStreamSink<Long> sink = env.generateSequence(1, 100).print();
+		assertTrue(env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getStatePartitioner() == null);
+		assertTrue(env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof ForwardPartitioner);
 
 		KeySelector<Long, Long> key1 = new KeySelector<Long, Long>() {
 
@@ -480,11 +517,11 @@ public class DataStreamTest {
 			}
 		};
 
-		DataStream<Long> sink2 = env.generateSequence(1, 100).keyBy(key1).print();
+		DataStreamSink<Long> sink2 = env.generateSequence(1, 100).keyBy(key1).print();
 
-		assertTrue(streamGraph.getStreamNode(sink2.getId()).getStatePartitioner() != null);
-		assertEquals(key1, streamGraph.getStreamNode(sink2.getId()).getStatePartitioner());
-		assertTrue(streamGraph.getStreamNode(sink2.getId()).getInEdges().get(0).getPartitioner() instanceof FieldsPartitioner);
+		assertTrue(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner() != null);
+		assertEquals(key1, env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner());
+		assertTrue(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof HashPartitioner);
 
 		KeySelector<Long, Long> key2 = new KeySelector<Long, Long>() {
 
@@ -496,49 +533,52 @@ public class DataStreamTest {
 			}
 		};
 
-		DataStream<Long> sink3 = env.generateSequence(1, 100).keyBy(key2).print();
+		DataStreamSink<Long> sink3 = env.generateSequence(1, 100).keyBy(key2).print();
 
-		assertTrue(streamGraph.getStreamNode(sink3.getId()).getStatePartitioner() != null);
-		assertEquals(key2, streamGraph.getStreamNode(sink3.getId()).getStatePartitioner());
-		assertTrue(streamGraph.getStreamNode(sink3.getId()).getInEdges().get(0).getPartitioner() instanceof FieldsPartitioner);
+		assertTrue(env.getStreamGraph().getStreamNode(sink3.getTransformation().getId()).getStatePartitioner() != null);
+		assertEquals(key2, env.getStreamGraph().getStreamNode(sink3.getTransformation().getId()).getStatePartitioner());
+		assertTrue(env.getStreamGraph().getStreamNode(sink3.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof HashPartitioner);
 	}
 
 	@Test
 	public void testChannelSelectors() {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
-
-		StreamGraph streamGraph = env.getStreamGraph();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStreamSource<Long> src = env.generateSequence(0, 0);
 
 		DataStream<Long> broadcast = src.broadcast();
 		DataStreamSink<Long> broadcastSink = broadcast.print();
 		StreamPartitioner<?> broadcastPartitioner =
-				streamGraph.getStreamEdge(broadcast.getId(), broadcastSink.getId()).getPartitioner();
+				env.getStreamGraph().getStreamEdge(src.getId(),
+						broadcastSink.getTransformation().getId()).getPartitioner();
 		assertTrue(broadcastPartitioner instanceof BroadcastPartitioner);
 
 		DataStream<Long> shuffle = src.shuffle();
 		DataStreamSink<Long> shuffleSink = shuffle.print();
 		StreamPartitioner<?> shufflePartitioner =
-				streamGraph.getStreamEdge(shuffle.getId(), shuffleSink.getId()).getPartitioner();
+				env.getStreamGraph().getStreamEdge(src.getId(),
+						shuffleSink.getTransformation().getId()).getPartitioner();
 		assertTrue(shufflePartitioner instanceof ShufflePartitioner);
 
 		DataStream<Long> forward = src.forward();
 		DataStreamSink<Long> forwardSink = forward.print();
 		StreamPartitioner<?> forwardPartitioner =
-				streamGraph.getStreamEdge(forward.getId(), forwardSink.getId()).getPartitioner();
-		assertTrue(forwardPartitioner instanceof RebalancePartitioner);
+				env.getStreamGraph().getStreamEdge(src.getId(),
+						forwardSink.getTransformation().getId()).getPartitioner();
+		assertTrue(forwardPartitioner instanceof ForwardPartitioner);
 
 		DataStream<Long> rebalance = src.rebalance();
 		DataStreamSink<Long> rebalanceSink = rebalance.print();
 		StreamPartitioner<?> rebalancePartitioner =
-				streamGraph.getStreamEdge(rebalance.getId(), rebalanceSink.getId()).getPartitioner();
+				env.getStreamGraph().getStreamEdge(src.getId(),
+						rebalanceSink.getTransformation().getId()).getPartitioner();
 		assertTrue(rebalancePartitioner instanceof RebalancePartitioner);
 
 		DataStream<Long> global = src.global();
 		DataStreamSink<Long> globalSink = global.print();
 		StreamPartitioner<?> globalPartitioner =
-				streamGraph.getStreamEdge(global.getId(), globalSink.getId()).getPartitioner();
+				env.getStreamGraph().getStreamEdge(src.getId(),
+						globalSink.getTransformation().getId()).getPartitioner();
 		assertTrue(globalPartitioner instanceof GlobalPartitioner);
 	}
 
@@ -559,7 +599,7 @@ public class DataStreamTest {
 	}
 
 	private static Integer createDownStreamId(DataStream dataStream) {
-		return dataStream.print().getId();
+		return dataStream.print().getTransformation().getId();
 	}
 
 	private static boolean isGrouped(DataStream dataStream) {
@@ -567,7 +607,7 @@ public class DataStreamTest {
 	}
 
 	private static Integer createDownStreamId(ConnectedDataStream dataStream) {
-		return dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
+		SingleOutputStreamOperator coMap = dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
 			@Override
 			public Object map1(Tuple2<Long, Long> value) {
 				return null;
@@ -577,7 +617,9 @@ public class DataStreamTest {
 			public Object map2(Tuple2<Long, Long> value) {
 				return null;
 			}
-		}).getId();
+		});
+		coMap.addSink(new NoOpSink());
+		return coMap.getId();
 	}
 
 	private static boolean isGrouped(ConnectedDataStream dataStream) {
@@ -585,7 +627,7 @@ public class DataStreamTest {
 	}
 
 	private static boolean isPartitioned(StreamEdge edge) {
-		return edge.getPartitioner() instanceof FieldsPartitioner;
+		return edge.getPartitioner() instanceof HashPartitioner;
 	}
 
 	private static boolean isCustomPartitioned(StreamEdge edge) {