You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/03 15:05:19 UTC

[3/4] flink git commit: [streaming] [api-breaking] Consolidate DataStream method names

[streaming] [api-breaking] Consolidate DataStream method names

To better match the names of the DataSet API the following renamings were made:
- DataStream.merge -> DataStream.union
- DataStream.distribute -> DataStream.rebalance
- DataStream.partitionBy -> DataStream.partitionByHash

Closes #743


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3527f40f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3527f40f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3527f40f

Branch: refs/heads/master
Commit: 3527f40f6e1e43e8e10c983ff3e214484cbb4d04
Parents: 6b28bdf
Author: mbalassi <mb...@apache.org>
Authored: Tue Jun 2 17:01:48 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Jun 3 12:47:41 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |  24 +--
 .../api/datastream/ConnectedDataStream.java     |  32 ++--
 .../streaming/api/datastream/DataStream.java    |  60 +++----
 .../api/datastream/GroupedDataStream.java       |   2 +-
 .../datastream/SingleOutputStreamOperator.java  |   4 +-
 .../api/datastream/SplitDataStream.java         |   2 +-
 .../streaming/api/graph/JSONGenerator.java      |   2 +-
 .../flink/streaming/api/graph/StreamGraph.java  |   2 +-
 .../api/graph/StreamingJobGraphGenerator.java   |   2 +-
 .../streaming/api/graph/WindowingOptimizer.java |   6 +-
 .../partitioner/DistributePartitioner.java      |  52 ------
 .../partitioner/RebalancePartitioner.java       |  52 ++++++
 .../flink/streaming/api/CoStreamTest.java       |   2 +-
 .../flink/streaming/api/DataStreamTest.java     | 175 +++++++++++++++++++
 .../apache/flink/streaming/api/IterateTest.java |   2 +-
 .../flink/streaming/api/OutputSplitterTest.java |   2 +-
 .../api/complex/ComplexIntegrationTest.java     |   7 +-
 .../api/operators/co/CoFlatMapTest.java         |   5 +-
 .../partitioner/DistributePartitionerTest.java  |   5 +-
 .../partitioner/ForwardPartitionerTest.java     |   5 +-
 .../examples/windowing/StockPrices.java         |   6 +-
 .../scala/examples/windowing/StockPrices.scala  |  14 +-
 .../api/scala/ConnectedDataStream.scala         |  27 ++-
 .../flink/streaming/api/scala/DataStream.scala  |  18 +-
 .../ProcessFailureStreamingRecoveryITCase.java  |   2 +-
 25 files changed, 338 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index e041e45..49dc068 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -278,12 +278,12 @@ There are several partitioning types supported in Flink Streaming:
 
  * *Forward(default)*: Forward partitioning directs the output data to the next operator on the same machine (if possible) avoiding expensive network I/O. If there are more processing nodes than inputs or vice verse the load is distributed among the extra nodes in a round-robin fashion. This is the default partitioner.
 Usage: `dataStream.forward()`
- * *Shuffle*: Shuffle partitioning randomly partitions the output data stream to the next operator using uniform distribution. Use this only when it is important that the partitioning is randomised. If you only care about an even load use *Distribute*
+ * *Shuffle*: Shuffle partitioning randomly partitions the output data stream to the next operator using uniform distribution. Use this only when it is important that the partitioning is randomised. If you only care about an even load use *Rebalance*
 Usage: `dataStream.shuffle()`
- * *Distribute*: Distribute partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
-Usage: `dataStream.distribute()`
+ * *Rebalance*: Rebalance partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
+Usage: `dataStream.rebalance()`
  * *Field/Key Partitioning*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. 
-Usage: `dataStream.partitionBy(fields…)`
+Usage: `dataStream.partitionByHash(fields…)`
 * *Field/Key Grouping*: Field/Key grouping takes partitioning one step further and seperates the elements to disjoint groups based on the hash code. These groups are processed separately by the next downstream operator. 
 Usage: `dataStream.groupBy(fields…)`
  * *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances of the next operator.
@@ -457,11 +457,11 @@ dataStream.reduce(new ReduceFunction<Integer>() {
     </tr>
 
     <tr>
-      <td><strong>Merge</strong></td>
+      <td><strong>Union</strong></td>
       <td>
-        <p>Merges two or more data streams creating a new stream containing all the elements from all the streams.</p>
+        <p>Union of two or more data streams creating a new stream containing all the elements from all the streams.</p>
 {% highlight java %}
-dataStream.merge(otherStream1, otherStream2, …)
+dataStream.union(otherStream1, otherStream2, …)
 {% endhighlight %}
       </td>
     </tr>
@@ -563,11 +563,11 @@ dataStream.reduce{ _ + _ }
     </tr>
 
     <tr>
-      <td><strong>Merge</strong></td>
+      <td><strong>Union</strong></td>
       <td>
-        <p>Merges two or more data streams creating a new stream containing all the elements from all the streams.</p>
+        <p>Union of two or more data streams creating a new stream containing all the elements from all the streams.</p>
 {% highlight scala %}
-dataStream.merge(otherStream1, otherStream2, …)
+dataStream.union(otherStream1, otherStream2, …)
 {% endhighlight %}
       </td>
     </tr>
@@ -862,8 +862,8 @@ dataStream1 cross dataStream2 onWindow (windowing_params)
 
 ### Co operators
 
-Co operators allow the users to jointly transform two `DataStream`s of different types providing a simple way to jointly manipulate streams with a shared state. It is designed to support joint stream transformations where merging is not appropriate due to different data types or in case the user needs explicit tracking of the origin of individual elements.
-Co operators can be applied to `ConnectedDataStream`s which represent two `DataStream`s of possibly different types. A `ConnectedDataStream` can be created by calling the `connect(otherDataStream)` method of a `DataStream`. Please note that the two connected `DataStream`s can also be merged data streams.
+Co operators allow the users to jointly transform two `DataStream`s of different types providing a simple way to jointly manipulate streams with a shared state. It is designed to support joint stream transformations where union is not appropriate due to different data types or in case the user needs explicit tracking of the origin of individual elements.
+Co operators can be applied to `ConnectedDataStream`s which represent two `DataStream`s of possibly different types. A `ConnectedDataStream` can be created by calling the `connect(otherDataStream)` method of a `DataStream`.
 
 #### Map on ConnectedDataStream
 Applies a CoMap transformation on two separate DataStreams, mapping them to a common output type. The transformation calls a `CoMapFunction.map1()` for each element of the first input and `CoMapFunction.map2()` for each element of the second input. Each CoMapFunction call returns exactly one element.

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 35418e0..0176277 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -236,9 +236,9 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            second input stream.
 	 * @return The partitioned {@link ConnectedDataStream}
 	 */
-	public ConnectedDataStream<IN1, IN2> partitionBy(int keyPosition1, int keyPosition2) {
-		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keyPosition1),
-				dataStream2.partitionBy(keyPosition2));
+	public ConnectedDataStream<IN1, IN2> partitionByHash(int keyPosition1, int keyPosition2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionByHash(keyPosition1),
+				dataStream2.partitionByHash(keyPosition2));
 	}
 
 	/**
@@ -251,9 +251,9 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            The fields used to group the second input stream.
 	 * @return The partitioned {@link ConnectedDataStream}
 	 */
-	public ConnectedDataStream<IN1, IN2> partitionBy(int[] keyPositions1, int[] keyPositions2) {
-		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keyPositions1),
-				dataStream2.partitionBy(keyPositions2));
+	public ConnectedDataStream<IN1, IN2> partitionByHash(int[] keyPositions1, int[] keyPositions2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionByHash(keyPositions1),
+				dataStream2.partitionByHash(keyPositions2));
 	}
 
 	/**
@@ -269,9 +269,9 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            The partitioning expressions for the second input
 	 * @return The partitioned {@link ConnectedDataStream}
 	 */
-	public ConnectedDataStream<IN1, IN2> partitionBy(String field1, String field2) {
-		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(field1),
-				dataStream2.partitionBy(field2));
+	public ConnectedDataStream<IN1, IN2> partitionByHash(String field1, String field2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionByHash(field1),
+				dataStream2.partitionByHash(field2));
 	}
 
 	/**
@@ -287,9 +287,9 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            The partitioning expressions for the second input
 	 * @return The partitioned {@link ConnectedDataStream}
 	 */
-	public ConnectedDataStream<IN1, IN2> partitionBy(String[] fields1, String[] fields2) {
-		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(fields1),
-				dataStream2.partitionBy(fields2));
+	public ConnectedDataStream<IN1, IN2> partitionByHash(String[] fields1, String[] fields2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionByHash(fields1),
+				dataStream2.partitionByHash(fields2));
 	}
 
 	/**
@@ -302,10 +302,10 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            The {@link KeySelector} used for partitioning the second input
 	 * @return @return The partitioned {@link ConnectedDataStream}
 	 */
-	public ConnectedDataStream<IN1, IN2> partitionBy(KeySelector<IN1, ?> keySelector1,
-												 KeySelector<IN2, ?> keySelector2) {
-		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keySelector1),
-				dataStream2.partitionBy(keySelector2));
+	public ConnectedDataStream<IN1, IN2> partitionByHash(KeySelector<IN1, ?> keySelector1,
+														KeySelector<IN2, ?> keySelector2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionByHash(keySelector1),
+				dataStream2.partitionByHash(keySelector2));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 7abd327..1ec440d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -78,7 +78,7 @@ import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
@@ -112,7 +112,7 @@ public class DataStream<OUT> {
 	protected StreamPartitioner<OUT> partitioner;
 	@SuppressWarnings("rawtypes")
 	protected TypeInformation typeInfo;
-	protected List<DataStream<OUT>> mergedStreams;
+	protected List<DataStream<OUT>> unionizedStreams;
 	
 	protected Integer iterationID = null;
 	protected Long iterationWaitTime = null;
@@ -143,10 +143,10 @@ public class DataStream<OUT> {
 		this.parallelism = environment.getParallelism();
 		this.streamGraph = environment.getStreamGraph();
 		this.userDefinedNames = new ArrayList<String>();
-		this.partitioner = new DistributePartitioner<OUT>(true);
+		this.partitioner = new RebalancePartitioner<OUT>(true);
 		this.typeInfo = typeInfo;
-		this.mergedStreams = new ArrayList<DataStream<OUT>>();
-		this.mergedStreams.add(this);
+		this.unionizedStreams = new ArrayList<DataStream<OUT>>();
+		this.unionizedStreams.add(this);
 	}
 
 	/**
@@ -165,11 +165,11 @@ public class DataStream<OUT> {
 		this.typeInfo = dataStream.typeInfo;
 		this.iterationID = dataStream.iterationID;
 		this.iterationWaitTime = dataStream.iterationWaitTime;
-		this.mergedStreams = new ArrayList<DataStream<OUT>>();
-		this.mergedStreams.add(this);
-		if (dataStream.mergedStreams.size() > 1) {
-			for (int i = 1; i < dataStream.mergedStreams.size(); i++) {
-				this.mergedStreams.add(new DataStream<OUT>(dataStream.mergedStreams.get(i)));
+		this.unionizedStreams = new ArrayList<DataStream<OUT>>();
+		this.unionizedStreams.add(this);
+		if (dataStream.unionizedStreams.size() > 1) {
+			for (int i = 1; i < dataStream.unionizedStreams.size(); i++) {
+				this.unionizedStreams.add(new DataStream<OUT>(dataStream.unionizedStreams.get(i)));
 			}
 		}
 
@@ -261,16 +261,16 @@ public class DataStream<OUT> {
 	 * will be transformed simultaneously.
 	 * 
 	 * @param streams
-	 *            The DataStreams to merge output with.
+	 *            The DataStreams to union output with.
 	 * @return The {@link DataStream}.
 	 */
-	public DataStream<OUT> merge(DataStream<OUT>... streams) {
+	public DataStream<OUT> union(DataStream<OUT>... streams) {
 		DataStream<OUT> returnStream = this.copy();
 
 		for (DataStream<OUT> stream : streams) {
-			for (DataStream<OUT> ds : stream.mergedStreams) {
-				validateMerge(ds.getId());
-				returnStream.mergedStreams.add(ds.copy());
+			for (DataStream<OUT> ds : stream.unionizedStreams) {
+				validateUnion(ds.getId());
+				returnStream.unionizedStreams.add(ds.copy());
 			}
 		}
 		return returnStream;
@@ -288,7 +288,7 @@ public class DataStream<OUT> {
 	 * @return The {@link SplitDataStream}
 	 */
 	public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
-		for (DataStream<OUT> ds : this.mergedStreams) {
+		for (DataStream<OUT> ds : this.unionizedStreams) {
 			streamGraph.addOutputSelector(ds.getId(), clean(outputSelector));
 		}
 
@@ -380,11 +380,11 @@ public class DataStream<OUT> {
 	 * Specifies how elements will be distributed to parallel instances of downstream operations.
 	 *
 	 */
-	public DataStream<OUT> partitionBy(int... fields) {
+	public DataStream<OUT> partitionByHash(int... fields) {
 		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
-			return groupBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields));
+			return partitionByHash(new KeySelectorUtil.ArrayKeySelector<OUT>(fields));
 		} else {
-			return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+			return partitionByHash(new Keys.ExpressionKeys<OUT>(fields, getType()));
 		}
 	}
 
@@ -399,8 +399,8 @@ public class DataStream<OUT> {
 	 * Specifies how elements will be distributed to parallel instances of downstream operations.
 	 *
 	 */
-	public DataStream<OUT> partitionBy(String... fields) {
-		return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+	public DataStream<OUT> partitionByHash(String... fields) {
+		return partitionByHash(new Keys.ExpressionKeys<OUT>(fields, getType()));
 	}
 
 	/**
@@ -413,12 +413,12 @@ public class DataStream<OUT> {
 	 * @return The partitioned DataStream
 	 * Specifies how elements will be distributed to parallel instances of downstream operations.
 	 */
-	public DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
+	public DataStream<OUT> partitionByHash(KeySelector<OUT, ?> keySelector) {
 		return setConnectionType(new FieldsPartitioner<OUT>(clean(keySelector)));
 	}
 
 	//private helper method for partitioning
-	private DataStream<OUT> partitionBy(Keys<OUT> keys) {
+	private DataStream<OUT> partitionByHash(Keys<OUT> keys) {
 		return setConnectionType(
 				new FieldsPartitioner<OUT>(
 						clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))));
@@ -458,7 +458,7 @@ public class DataStream<OUT> {
 	 * @return The DataStream with shuffle partitioning set.
 	 */
 	public DataStream<OUT> forward() {
-		return setConnectionType(new DistributePartitioner<OUT>(true));
+		return setConnectionType(new RebalancePartitioner<OUT>(true));
 	}
 
 	/**
@@ -469,8 +469,8 @@ public class DataStream<OUT> {
 	 * 
 	 * @return The DataStream with shuffle partitioning set.
 	 */
-	public DataStream<OUT> distribute() {
-		return setConnectionType(new DistributePartitioner<OUT>(false));
+	public DataStream<OUT> rebalance() {
+		return setConnectionType(new RebalancePartitioner<OUT>(false));
 	}
 
 	/**
@@ -1295,7 +1295,7 @@ public class DataStream<OUT> {
 	protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
 		DataStream<OUT> returnStream = this.copy();
 
-		for (DataStream<OUT> stream : returnStream.mergedStreams) {
+		for (DataStream<OUT> stream : returnStream.unionizedStreams) {
 			stream.partitioner = partitioner;
 		}
 
@@ -1316,7 +1316,7 @@ public class DataStream<OUT> {
 	 *            Number of the type (used at co-functions)
 	 */
 	protected <X> void connectGraph(DataStream<X> inputStream, Integer outputID, int typeNumber) {
-		for (DataStream<X> stream : inputStream.mergedStreams) {
+		for (DataStream<X> stream : inputStream.unionizedStreams) {
 			streamGraph.addEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
 					inputStream.userDefinedNames);
 		}
@@ -1407,8 +1407,8 @@ public class DataStream<OUT> {
 		}
 	}
 
-	private void validateMerge(Integer id) {
-		for (DataStream<OUT> ds : this.mergedStreams) {
+	private void validateUnion(Integer id) {
+		for (DataStream<OUT> ds : this.unionizedStreams) {
 			if (ds.getId().equals(id)) {
 				throw new RuntimeException("A DataStream cannot be merged with itself");
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 62e7781..2d6829d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -49,7 +49,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 * @param keySelector Function for determining group inclusion
 	 */
 	public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, ?> keySelector) {
-		super(dataStream.partitionBy(keySelector));
+		super(dataStream.partitionByHash(keySelector));
 		this.keySelector = keySelector;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index bebfff0..1e5b5cf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -126,8 +126,8 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	}
 
 	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> distribute() {
-		return (SingleOutputStreamOperator<OUT, O>) super.distribute();
+	public SingleOutputStreamOperator<OUT, O> rebalance() {
+		return (SingleOutputStreamOperator<OUT, O>) super.rebalance();
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 69e059e..36a94c7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -57,7 +57,7 @@ public class SplitDataStream<OUT> extends DataStream<OUT> {
 
 		DataStream<OUT> returnStream = copy();
 
-		for (DataStream<OUT> ds : returnStream.mergedStreams) {
+		for (DataStream<OUT> ds : returnStream.unionizedStreams) {
 			ds.userDefinedNames = Arrays.asList(outputNames);
 		}
 		return returnStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
index 2c536d8..4ae2f97 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
@@ -147,7 +147,7 @@ public class JSONGenerator {
 		JSONObject input = new JSONObject();
 		inputArray.put(input);
 		input.put(ID, mappedInputID);
-		input.put(SHIP_STRATEGY, streamGraph.getEdge(inputID, vertexID).getPartitioner()
+		input.put(SHIP_STRATEGY, streamGraph.getStreamEdge(inputID, vertexID).getPartitioner()
 				.getStrategy());
 		input.put(SIDE, (inputArray.length() == 0) ? "first" : "second");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index aa0f164..95cbc23 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -330,7 +330,7 @@ public class StreamGraph extends StreamingPlan {
 		return streamNodes.keySet();
 	}
 
-	protected StreamEdge getEdge(int sourceId, int targetId) {
+	public StreamEdge getStreamEdge(int sourceId, int targetId) {
 		Iterator<StreamEdge> outIterator = getStreamNode(sourceId).getOutEdges().iterator();
 		while (outIterator.hasNext()) {
 			StreamEdge edge = outIterator.next();

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 7eb2028..1670a47 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -282,7 +282,7 @@ public class StreamingJobGraphGenerator {
 
 		for (StreamEdge output : allOutputs) {
 			config.setSelectedNames(output.getTargetID(),
-					streamGraph.getEdge(vertexID, output.getTargetID()).getSelectedNames());
+					streamGraph.getStreamEdge(vertexID, output.getTargetID()).getSelectedNames());
 		}
 
 		vertexConfigs.put(vertexID, config);

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
index f375536..92043e7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer;
 import org.apache.flink.streaming.api.operators.windowing.WindowFlattener;
 import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
-import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 
 public class WindowingOptimizer {
 
@@ -65,12 +65,12 @@ public class WindowingOptimizer {
 
 				// We connect the merge input to the flattener directly
 				streamGraph.addEdge(mergeInput.getID(), flattenerID,
-						new DistributePartitioner(true), 0, new ArrayList<String>());
+						new RebalancePartitioner(true), 0, new ArrayList<String>());
 
 				// If the merger is only connected to the flattener we delete it
 				// completely, otherwise we only remove the edge
 				if (input.getOutEdges().size() > 1) {
-					streamGraph.removeEdge(streamGraph.getEdge(input.getID(), flattenerID));
+					streamGraph.removeEdge(streamGraph.getStreamEdge(input.getID(), flattenerID));
 				} else {
 					streamGraph.removeVertex(input);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java
deleted file mode 100644
index 0fa191b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that distributes the data equally by cycling through the output
- * channels.
- * 
- * @param <T>
- *            Type of the Tuple
- */
-public class DistributePartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private int[] returnArray = new int[] {-1};
-	private boolean forward;
-
-	public DistributePartitioner(boolean forward) {
-		super(forward ? PartitioningStrategy.FORWARD : PartitioningStrategy.DISTRIBUTE);
-		this.forward = forward;
-	}
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
-
-		return this.returnArray;
-	}
-	
-	public StreamPartitioner<T> copy() {
-		return new DistributePartitioner<T>(forward);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
new file mode 100644
index 0000000..70d9c6b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner that distributes the data equally by cycling through the output
+ * channels.
+ * 
+ * @param <T>
+ *            Type of the Tuple
+ */
+public class RebalancePartitioner<T> extends StreamPartitioner<T> {
+	private static final long serialVersionUID = 1L;
+
+	private int[] returnArray = new int[] {-1};
+	private boolean forward;
+
+	public RebalancePartitioner(boolean forward) {
+		super(forward ? PartitioningStrategy.FORWARD : PartitioningStrategy.DISTRIBUTE);
+		this.forward = forward;
+	}
+
+	@Override
+	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
+			int numberOfOutputChannels) {
+		this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
+
+		return this.returnArray;
+	}
+	
+	public StreamPartitioner<T> copy() {
+		return new RebalancePartitioner<T>(forward);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
index 4b3543b..369b384 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
@@ -79,7 +79,7 @@ public class CoStreamTest {
 						return new Tuple2<Integer, Integer>(value, value + 1);
 					}
 				})
-				.distribute()
+				.rebalance()
 				.filter(new FilterFunction<Tuple2<Integer, Integer>>() {
 
 					private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 8d6b206..591aa27 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -20,14 +20,24 @@ package org.apache.flink.streaming.api;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.ConnectedDataStream;
+import org.apache.flink.streaming.api.datastream.GroupedDataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -36,6 +46,10 @@ public class DataStreamTest {
 	private static final long MEMORYSIZE = 32;
 	private static int PARALLELISM = 1;
 
+	/**
+	 * Tests {@link SingleOutputStreamOperator#name(String)} functionality.
+	 * @throws Exception
+	 */
 	@Test
 	public void testNaming() throws Exception {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
@@ -86,4 +100,165 @@ public class DataStreamTest {
 		assertTrue(plan.contains("testWindowFold"));
 	}
 
+	/**
+	 * Tests that {@link DataStream#groupBy} and {@link DataStream#partitionByHash} result in
+	 * different and correct topologies. Does the some for the {@link ConnectedDataStream}.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testPartitioning(){
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		StreamGraph graph = env.getStreamGraph();
+
+		DataStream src1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+		DataStream src2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+		ConnectedDataStream connected = src1.connect(src2);
+
+		//Testing DataStream grouping
+		DataStream group1 = src1.groupBy(0);
+		DataStream group2 = src1.groupBy(1,0);
+		DataStream group3 = src1.groupBy("f0");
+		DataStream group4 = src1.groupBy(new FirstSelector());
+
+		assertTrue(isPartitioned(graph.getStreamEdge(group1.getId(), createDownStreamId(group1))));
+		assertTrue(isPartitioned(graph.getStreamEdge(group2.getId(), createDownStreamId(group2))));
+		assertTrue(isPartitioned(graph.getStreamEdge(group3.getId(), createDownStreamId(group3))));
+		assertTrue(isPartitioned(graph.getStreamEdge(group4.getId(), createDownStreamId(group4))));
+
+		assertTrue(isGrouped(group1));
+		assertTrue(isGrouped(group2));
+		assertTrue(isGrouped(group3));
+		assertTrue(isGrouped(group4));
+
+		//Testing DataStream partitioning
+		DataStream partition1 = src1.partitionByHash(0);
+		DataStream partition2 = src1.partitionByHash(1, 0);
+		DataStream partition3 = src1.partitionByHash("f0");
+		DataStream partition4 = src1.partitionByHash(new FirstSelector());
+
+		assertTrue(isPartitioned(graph.getStreamEdge(partition1.getId(), createDownStreamId(partition1))));
+		assertTrue(isPartitioned(graph.getStreamEdge(partition2.getId(), createDownStreamId(partition2))));
+		assertTrue(isPartitioned(graph.getStreamEdge(partition3.getId(), createDownStreamId(partition3))));
+		assertTrue(isPartitioned(graph.getStreamEdge(partition4.getId(), createDownStreamId(partition4))));
+
+		assertFalse(isGrouped(partition1));
+		assertFalse(isGrouped(partition3));
+		assertFalse(isGrouped(partition2));
+		assertFalse(isGrouped(partition4));
+
+		//Testing ConnectedDataStream grouping
+		ConnectedDataStream connectedGroup1 = connected.groupBy(0,0);
+		Integer downStreamId1 = createDownStreamId(connectedGroup1);
+
+		ConnectedDataStream connectedGroup2 = connected.groupBy(new int[]{0},new int[]{0});
+		Integer downStreamId2 = createDownStreamId(connectedGroup2);
+
+		ConnectedDataStream connectedGroup3 = connected.groupBy("f0", "f0");
+		Integer downStreamId3 = createDownStreamId(connectedGroup3);
+
+		ConnectedDataStream connectedGroup4 = connected.groupBy(new String[]{"f0"}, new String[]{"f0"});
+		Integer downStreamId4 = createDownStreamId(connectedGroup4);
+
+		ConnectedDataStream connectedGroup5 = connected.groupBy(new FirstSelector(), new FirstSelector());
+		Integer downStreamId5 = createDownStreamId(connectedGroup5);
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup1.getFirst().getId(), downStreamId1)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup1.getSecond().getId(), downStreamId1)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup2.getFirst().getId(), downStreamId2)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup2.getSecond().getId(), downStreamId2)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup3.getFirst().getId(), downStreamId3)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup3.getSecond().getId(), downStreamId3)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup4.getFirst().getId(), downStreamId4)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup4.getSecond().getId(), downStreamId4)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup5.getFirst().getId(), downStreamId5)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup5.getSecond().getId(), downStreamId5)));
+
+		assertTrue(isGrouped(connectedGroup1));
+		assertTrue(isGrouped(connectedGroup2));
+		assertTrue(isGrouped(connectedGroup3));
+		assertTrue(isGrouped(connectedGroup4));
+		assertTrue(isGrouped(connectedGroup5));
+
+		//Testing ConnectedDataStream partitioning
+		ConnectedDataStream connectedPartition1 = connected.partitionByHash(0, 0);
+		Integer connectDownStreamId1 = createDownStreamId(connectedPartition1);
+
+		ConnectedDataStream connectedPartition2 = connected.partitionByHash(new int[]{0},new int[]{0});
+		Integer connectDownStreamId2 = createDownStreamId(connectedPartition2);
+
+		ConnectedDataStream connectedPartition3 = connected.partitionByHash("f0", "f0");
+		Integer connectDownStreamId3 = createDownStreamId(connectedPartition3);
+
+		ConnectedDataStream connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
+		Integer connectDownStreamId4 = createDownStreamId(connectedPartition4);
+
+		ConnectedDataStream connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
+		Integer connectDownStreamId5 = createDownStreamId(connectedPartition5);
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition1.getFirst().getId(), connectDownStreamId1)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition1.getSecond().getId(), connectDownStreamId1)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition2.getFirst().getId(), connectDownStreamId2)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition2.getSecond().getId(), connectDownStreamId2)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition3.getFirst().getId(), connectDownStreamId3)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition3.getSecond().getId(), connectDownStreamId3)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition4.getFirst().getId(), connectDownStreamId4)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition4.getSecond().getId(), connectDownStreamId4)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition5.getFirst().getId(), connectDownStreamId5)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition5.getSecond().getId(), connectDownStreamId5)));
+
+		assertFalse(isGrouped(connectedPartition1));
+		assertFalse(isGrouped(connectedPartition2));
+		assertFalse(isGrouped(connectedPartition3));
+		assertFalse(isGrouped(connectedPartition4));
+		assertFalse(isGrouped(connectedPartition5));
+	}
+
+	/////////////////////////////////////////////////////////////
+	// Utilities
+	/////////////////////////////////////////////////////////////
+
+	private static Integer createDownStreamId(DataStream dataStream){
+		return dataStream.print().getId();
+	}
+
+	private static boolean isGrouped(DataStream dataStream){
+		return dataStream instanceof GroupedDataStream;
+	}
+
+	private static Integer createDownStreamId(ConnectedDataStream dataStream){
+		return dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
+			@Override
+			public Object map1(Tuple2<Long, Long> value) {
+				return null;
+			}
+
+			@Override
+			public Object map2(Tuple2<Long, Long> value) {
+				return null;
+			}
+		}).getId();
+	}
+
+	private static boolean isGrouped(ConnectedDataStream dataStream){
+		return (dataStream.getFirst() instanceof GroupedDataStream && dataStream.getSecond() instanceof GroupedDataStream);
+	}
+
+	private static boolean isPartitioned(StreamEdge edge){
+		return edge.getPartitioner() instanceof FieldsPartitioner;
+	}
+
+	private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long>{
+		@Override
+		public Long getKey(Tuple2<Long, Long> value) throws Exception {
+			return value.f0;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 06013e3..3e8a2c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -108,7 +108,7 @@ public class IterateTest {
 	public void testColocation() throws Exception {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
 
-		IterativeDataStream<Boolean> it = env.fromElements(true).distribute().map(new NoOpMap())
+		IterativeDataStream<Boolean> it = env.fromElements(true).rebalance().map(new NoOpMap())
 				.iterate();
 
 		DataStream<Boolean> head = it.map(new NoOpMap()).setParallelism(2).name("HeadOperator");

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
index 14f0fa0..fc78d27 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -48,7 +48,7 @@ public class OutputSplitterTest {
 		DataStream<Integer> d1 = env.fromElements(0, 2, 4, 6, 8);
 		DataStream<Integer> d2 = env.fromElements(1, 3, 5, 7, 9);
 
-		d1 = d1.merge(d2);
+		d1 = d1.union(d2);
 
 		d1.split(new OutputSelector<Integer>() {
 			private static final long serialVersionUID = 8354166915727490130L;

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index d321851..9d4efdc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -53,7 +53,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.Serializable;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -228,13 +227,13 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		sourceStream31.filter(new PrimeFilterFunction())
 				.window(Count.of(100))
 				.max(0).flatten()
-				.merge(sourceStream32.filter(new PrimeFilterFunction())
+				.union(sourceStream32.filter(new PrimeFilterFunction())
 						.window(Count.of(100))
 						.max(0).flatten())
 				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
 
 		sourceStream31.flatMap(new DivisorsFlatMapFunction())
-				.merge(sourceStream32.flatMap(new DivisorsFlatMapFunction())).map(new MapFunction<Long, Tuple2<Long,
+				.union(sourceStream32.flatMap(new DivisorsFlatMapFunction())).map(new MapFunction<Long, Tuple2<Long,
 				Integer>>() {
 
 			@Override
@@ -348,7 +347,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		});
 
 
-		dataStream53.merge(dataStream52)
+		dataStream53.union(dataStream52)
 				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
 
 		env.execute();

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
index 99cc62f..7f23e23 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
 import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -71,10 +70,10 @@ public class CoFlatMapTest implements Serializable {
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
-		DataStream<Integer> ds2 = env.fromElements(2, 4).merge(ds1);
+		DataStream<Integer> ds2 = env.fromElements(2, 4).union(ds1);
 		
 		try {
-			ds1.forward().merge(ds2);
+			ds1.forward().union(ds2);
 			fail();
 		} catch (RuntimeException e) {
 			// expected

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
index 37638cf..b37e43a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
@@ -21,21 +21,20 @@ import static org.junit.Assert.*;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
 
 public class DistributePartitionerTest {
 	
-	private DistributePartitioner<Tuple> distributePartitioner;
+	private RebalancePartitioner<Tuple> distributePartitioner;
 	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
 	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
 			null);
 	
 	@Before
 	public void setPartitioner() {
-		distributePartitioner = new DistributePartitioner<Tuple>(false);
+		distributePartitioner = new RebalancePartitioner<Tuple>(false);
 	}
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
index c0d39da..405a28e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
@@ -21,21 +21,20 @@ import static org.junit.Assert.*;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
 
 public class ForwardPartitionerTest {
 
-	private DistributePartitioner<Tuple> forwardPartitioner;
+	private RebalancePartitioner<Tuple> forwardPartitioner;
 	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
 	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
 			null);
 
 	@Before
 	public void setPartitioner() {
-		forwardPartitioner = new DistributePartitioner<Tuple>(true);
+		forwardPartitioner = new RebalancePartitioner<Tuple>(true);
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
index 0974f3d..56abb12 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
@@ -61,7 +61,7 @@ import org.apache.flink.util.Collector;
  * <p>
  * This example shows how to:
  * <ul>
- * <li>merge and join data streams,
+ * <li>union and join data streams,
  * <li>use different windowing policies,
  * <li>define windowing aggregations.
  * </ul>
@@ -89,7 +89,7 @@ public class StockPrices {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		//Step 1 
-	    //Read a stream of stock prices from different sources and merge it into one stream
+	    //Read a stream of stock prices from different sources and union it into one stream
 		
 		//Read from a socket stream at map it to StockPrice objects
 		DataStream<StockPrice> socketStockStream = env.socketTextStream(hostName, port)
@@ -111,7 +111,7 @@ public class StockPrices {
 
 		//Merge all stock streams together
 		@SuppressWarnings("unchecked")
-		DataStream<StockPrice> stockStream = socketStockStream.merge(SPX_stream, FTSE_stream, DJI_stream, BUX_stream);
+		DataStream<StockPrice> stockStream = socketStockStream.union(SPX_stream, FTSE_stream, DJI_stream, BUX_stream);
 		
 		//Step 2
 	    //Compute some simple statistics on a rolling window

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
index f9530eb..4940c6c 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
@@ -50,7 +50,7 @@ import scala.util.Random
  *
  * This example shows how to:
  *
- *   - merge and join data streams,
+ *   - union and join data streams,
  *   - use different windowing policies,
  *   - define windowing aggregations.
  */
@@ -77,7 +77,7 @@ object StockPrices {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     //Step 1 
-    //Read a stream of stock prices from different sources and merge it into one stream
+    //Read a stream of stock prices from different sources and union it into one stream
 
     //Read from a socket stream at map it to StockPrice objects
     val socketStockStream = env.socketTextStream(hostName, port).map(x => {
@@ -91,8 +91,8 @@ object StockPrices {
     val DJI_Stream = env.addSource(generateStock("DJI")(30))
     val BUX_Stream = env.addSource(generateStock("BUX")(40))
 
-    //Merge all stock streams together
-    val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, DJI_Stream, BUX_Stream)
+    //Union all stock streams together
+    val stockStream = socketStockStream.union(SPX_Stream, FTSE_Stream, DJI_Stream, BUX_Stream)
 
     //Step 2
     //Compute some simple statistics on a rolling window
@@ -103,7 +103,7 @@ object StockPrices {
     val rollingMean = windowedStream.groupBy("symbol").mapWindow(mean _).getDiscretizedStream
 
     //Step 3 
-    //Use  delta policy to create price change warnings,
+    //Use delta policy to create price change warnings,
     // and also count the number of warning every half minute
 
     val priceWarnings = stockStream.groupBy("symbol")
@@ -152,9 +152,9 @@ object StockPrices {
       .flatten()
 
     if (fileOutput) {
-      rollingCorrelation.writeAsText(outputPath, 1);
+      rollingCorrelation.writeAsText(outputPath, 1)
     } else {
-      rollingCorrelation.print;
+      rollingCorrelation.print
     }
 
     env.execute("Stock stream")

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
index ec2c6cc..6fbc73f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
@@ -260,8 +260,8 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * second input stream.
    * @return The transformed { @link ConnectedDataStream}
    */
-  def partitionBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
-    javaStream.partitionBy(keyPosition1, keyPosition2)
+  def partitionByHash(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
+    javaStream.partitionByHash(keyPosition1, keyPosition2)
   }
 
   /**
@@ -274,9 +274,9 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The fields used to partition the second input stream.
    * @return The transformed { @link ConnectedDataStream}
    */
-  def partitionBy(keyPositions1: Array[Int], keyPositions2: Array[Int]):
+  def partitionByHash(keyPositions1: Array[Int], keyPositions2: Array[Int]):
   ConnectedDataStream[IN1, IN2] = {
-    javaStream.partitionBy(keyPositions1, keyPositions2)
+    javaStream.partitionByHash(keyPositions1, keyPositions2)
   }
 
   /**
@@ -292,8 +292,8 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The partitioning expression for the second input
    * @return The grouped { @link ConnectedDataStream}
    */
-  def partitionBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
-    javaStream.partitionBy(field1, field2)
+  def partitionByHash(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
+    javaStream.partitionByHash(field1, field2)
   }
 
   /**
@@ -306,9 +306,9 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The partitioning expressions for the second input
    * @return The partitioned { @link ConnectedDataStream}
    */
-  def partitionBy(fields1: Array[String], fields2: Array[String]):
+  def partitionByHash(fields1: Array[String], fields2: Array[String]):
   ConnectedDataStream[IN1, IN2] = {
-    javaStream.partitionBy(fields1, fields2)
+    javaStream.partitionByHash(fields1, fields2)
   }
 
   /**
@@ -321,7 +321,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The function used for partitioning the second input
    * @return The partitioned { @link ConnectedDataStream}
    */
-  def partitionBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
+  def partitionByHash[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
   ConnectedDataStream[IN1, IN2] = {
 
     val keyExtractor1 = new KeySelector[IN1, K] {
@@ -331,7 +331,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
       def getKey(in: IN2) = clean(fun2)(in)
     }
 
-    javaStream.partitionBy(keyExtractor1, keyExtractor2)
+    javaStream.partitionByHash(keyExtractor1, keyExtractor2)
   }
 
   /**
@@ -367,14 +367,9 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * the same key. This type of reduce is much faster than reduceGroup since
    * the reduce function can be applied incrementally.
    *
-   * @param reducer1
-   * @param reducer2
-   * @param mapper1
-   * @param mapper2
-   *
    * @return The transformed { @link DataStream}.
    */
-  def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1, 
+  def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1,
       reducer2: (IN2, IN2) => IN2,mapper1: IN1 => R, mapper2: IN2 => R): DataStream[R] = {
     if (mapper1 == null || mapper2 == null) {
       throw new NullPointerException("Map functions must not be null.")

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 92304ae..f32d0cc 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -191,8 +191,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * will be transformed simultaneously.
    *
    */
-  def merge(dataStreams: DataStream[T]*): DataStream[T] =
-    javaStream.merge(dataStreams.map(_.getJavaStream): _*)
+  def union(dataStreams: DataStream[T]*): DataStream[T] =
+    javaStream.union(dataStreams.map(_.getJavaStream): _*)
 
   /**
    * Creates a new ConnectedDataStream by connecting
@@ -232,31 +232,31 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Partitions the elements of a DataStream by the given key positions (for tuple/array types) to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def partitionBy(fields: Int*): DataStream[T] = javaStream.partitionBy(fields: _*)
+  def partitionByHash(fields: Int*): DataStream[T] = javaStream.partitionByHash(fields: _*)
 
   /**
    * Groups the elements of a DataStream by the given field expressions to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
-    javaStream.partitionBy(firstField +: otherFields.toArray: _*)
+  def partitionByHash(firstField: String, otherFields: String*): DataStream[T] =
+    javaStream.partitionByHash(firstField +: otherFields.toArray: _*)
 
   /**
    * Groups the elements of a DataStream by the given K key to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
+  def partitionByHash[K: TypeInformation](fun: T => K): DataStream[T] = {
 
     val keyExtractor = new KeySelector[T, K] {
       val cleanFun = clean(fun)
       def getKey(in: T) = cleanFun(in)
     }
-    javaStream.partitionBy(keyExtractor)
+    javaStream.partitionByHash(keyExtractor)
   }
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
-   * are broadcasted to every parallel instance of the next component. This
+   * are broad casted to every parallel instance of the next component. This
    * setting only effects the how the outputs will be distributed between the
    * parallel instances of the next processing operator.
    *
@@ -296,7 +296,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * the next processing operator.
    *
    */
-  def distribute: DataStream[T] = javaStream.distribute()
+  def rebalance: DataStream[T] = javaStream.rebalance()
 
   /**
    * Initiates an iterative part of the program that creates a loop by feeding

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index fb4b2b7..a25024e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -88,7 +88,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 
 		DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT))
 				// add a non-chained no-op map to test the chain state restore logic
-				.distribute().map(new MapFunction<Long, Long>() {
+				.rebalance().map(new MapFunction<Long, Long>() {
 					@Override
 					public Long map(Long value) throws Exception {
 						return value;