You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/03 15:05:17 UTC

[1/4] flink git commit: [FLINK-2130] [streaming] RMQ Source properly propagates exceptions

Repository: flink
Updated Branches:
  refs/heads/master bf9cc81a7 -> 39ec54ff1


[FLINK-2130] [streaming] RMQ Source properly propagates exceptions

Closes #767


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39ec54ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39ec54ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39ec54ff

Branch: refs/heads/master
Commit: 39ec54ff141fe3c0da58a53b695b7585eb9d5418
Parents: 3527f40
Author: mbalassi <mb...@apache.org>
Authored: Wed Jun 3 11:16:48 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Jun 3 12:47:41 2015 +0200

----------------------------------------------------------------------
 .../streaming/connectors/rabbitmq/RMQSource.java  | 18 +++++-------------
 1 file changed, 5 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39ec54ff/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index a4c833e..d706b8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -22,8 +22,6 @@ import java.io.IOException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.ConnectorSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
@@ -33,8 +31,6 @@ import com.rabbitmq.client.QueueingConsumer;
 public class RMQSource<OUT> extends ConnectorSource<OUT> {
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
-
 	private final String QUEUE_NAME;
 	private final String HOST_NAME;
 
@@ -44,8 +40,6 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
 	private transient QueueingConsumer consumer;
 	private transient QueueingConsumer.Delivery delivery;
 
-	private volatile boolean isRunning = false;
-
 	OUT out;
 
 	public RMQSource(String HOST_NAME, String QUEUE_NAME,
@@ -97,9 +91,8 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
 		try {
 			delivery = consumer.nextDelivery();
 		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
-			}
+			throw new RuntimeException("Error while reading message from RMQ source from " + QUEUE_NAME
+					+ " at " + HOST_NAME, e);
 		}
 
 		out = schema.deserialize(delivery.getBody());
@@ -121,14 +114,13 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
 		try {
 			delivery = consumer.nextDelivery();
 		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
-			}
+			throw new RuntimeException("Error while reading message from RMQ source from " + QUEUE_NAME
+					+ " at " + HOST_NAME, e);
 		}
 
 		out = schema.deserialize(delivery.getBody());
 		if (schema.isEndOfStream(out)) {
-			throw new RuntimeException("RMQ source is at end.");
+			throw new RuntimeException("RMQ source is at end for " + QUEUE_NAME + " at " + HOST_NAME);
 		}
 		OUT result = out;
 		out = null;


[4/4] flink git commit: [FLINK-2103] [streaming] [api-extending] Expose partitionBy to user

Posted by mb...@apache.org.
[FLINK-2103] [streaming] [api-extending] Expose partitionBy to user

Conflicts:
	flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java

Closes #743


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a43e0d5c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a43e0d5c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a43e0d5c

Branch: refs/heads/master
Commit: a43e0d5c4c84f92a3b12e4f410e803f17fa40039
Parents: bf9cc81
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu May 28 10:40:56 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Jun 3 12:47:41 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 42 +++++++++++++++++++-
 1 file changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a43e0d5c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index b9225c9..db1f40f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -373,17 +373,55 @@ public class DataStream<OUT> {
 
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output is
+	 * partitioned hashing on the given fields. This setting only
+	 * effects the how the outputs will be distributed between the parallel
+	 * instances of the next processing operator.
+	 *
+	 * @param fields The tuple fields that should be used for partitioning
+	 * @return The partitioned DataStream
+	 * Specifies how elements will be distributed to parallel instances of downstream operations.
+	 *
+	 */
+	public DataStream<OUT> partitionBy(int... fields) {
+		return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+	}
+
+	/**
+	 * Sets the partitioning of the {@link DataStream} so that the output is
+	 * partitioned hashing on the given fields. This setting only
+	 * effects the how the outputs will be distributed between the parallel
+	 * instances of the next processing operator.
+	 *
+	 * @param fields The tuple fields that should be used for partitioning
+	 * @return The partitioned DataStream
+	 * Specifies how elements will be distributed to parallel instances of downstream operations.
+	 *
+	 */
+	public DataStream<OUT> partitionBy(String... fields) {
+		return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+	}
+
+	/**
+	 * Sets the partitioning of the {@link DataStream} so that the output is
 	 * partitioned using the given {@link KeySelector}. This setting only
 	 * effects the how the outputs will be distributed between the parallel
 	 * instances of the next processing operator.
-	 * 
+	 *
 	 * @param keySelector
 	 * @return The partitioned DataStream
+	 * Specifies how elements will be distributed to parallel instances of downstream operations.
 	 */
-	protected DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
+	public DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
 		return setConnectionType(new FieldsPartitioner<OUT>(clean(keySelector)));
 	}
 
+	//private helper method for partitioning
+	private DataStream<OUT> partitionBy(Keys<OUT> keys) {
+		return setConnectionType(
+				new FieldsPartitioner<OUT>(
+						clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))));
+	}
+
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output tuples
 	 * are broadcasted to every parallel instance of the next component. This


[3/4] flink git commit: [streaming] [api-breaking] Consolidate DataStream method names

Posted by mb...@apache.org.
[streaming] [api-breaking] Consolidate DataStream method names

To better match the names of the DataSet API the following renamings were made:
- DataStream.merge -> DataStream.union
- DataStream.distribute -> DataStream.rebalance
- DataStream.partitionBy -> DataStream.partitionByHash

Closes #743


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3527f40f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3527f40f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3527f40f

Branch: refs/heads/master
Commit: 3527f40f6e1e43e8e10c983ff3e214484cbb4d04
Parents: 6b28bdf
Author: mbalassi <mb...@apache.org>
Authored: Tue Jun 2 17:01:48 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Jun 3 12:47:41 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |  24 +--
 .../api/datastream/ConnectedDataStream.java     |  32 ++--
 .../streaming/api/datastream/DataStream.java    |  60 +++----
 .../api/datastream/GroupedDataStream.java       |   2 +-
 .../datastream/SingleOutputStreamOperator.java  |   4 +-
 .../api/datastream/SplitDataStream.java         |   2 +-
 .../streaming/api/graph/JSONGenerator.java      |   2 +-
 .../flink/streaming/api/graph/StreamGraph.java  |   2 +-
 .../api/graph/StreamingJobGraphGenerator.java   |   2 +-
 .../streaming/api/graph/WindowingOptimizer.java |   6 +-
 .../partitioner/DistributePartitioner.java      |  52 ------
 .../partitioner/RebalancePartitioner.java       |  52 ++++++
 .../flink/streaming/api/CoStreamTest.java       |   2 +-
 .../flink/streaming/api/DataStreamTest.java     | 175 +++++++++++++++++++
 .../apache/flink/streaming/api/IterateTest.java |   2 +-
 .../flink/streaming/api/OutputSplitterTest.java |   2 +-
 .../api/complex/ComplexIntegrationTest.java     |   7 +-
 .../api/operators/co/CoFlatMapTest.java         |   5 +-
 .../partitioner/DistributePartitionerTest.java  |   5 +-
 .../partitioner/ForwardPartitionerTest.java     |   5 +-
 .../examples/windowing/StockPrices.java         |   6 +-
 .../scala/examples/windowing/StockPrices.scala  |  14 +-
 .../api/scala/ConnectedDataStream.scala         |  27 ++-
 .../flink/streaming/api/scala/DataStream.scala  |  18 +-
 .../ProcessFailureStreamingRecoveryITCase.java  |   2 +-
 25 files changed, 338 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index e041e45..49dc068 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -278,12 +278,12 @@ There are several partitioning types supported in Flink Streaming:
 
  * *Forward(default)*: Forward partitioning directs the output data to the next operator on the same machine (if possible) avoiding expensive network I/O. If there are more processing nodes than inputs or vice verse the load is distributed among the extra nodes in a round-robin fashion. This is the default partitioner.
 Usage: `dataStream.forward()`
- * *Shuffle*: Shuffle partitioning randomly partitions the output data stream to the next operator using uniform distribution. Use this only when it is important that the partitioning is randomised. If you only care about an even load use *Distribute*
+ * *Shuffle*: Shuffle partitioning randomly partitions the output data stream to the next operator using uniform distribution. Use this only when it is important that the partitioning is randomised. If you only care about an even load use *Rebalance*
 Usage: `dataStream.shuffle()`
- * *Distribute*: Distribute partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
-Usage: `dataStream.distribute()`
+ * *Rebalance*: Rebalance partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
+Usage: `dataStream.rebalance()`
  * *Field/Key Partitioning*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. 
-Usage: `dataStream.partitionBy(fields…)`
+Usage: `dataStream.partitionByHash(fields…)`
 * *Field/Key Grouping*: Field/Key grouping takes partitioning one step further and seperates the elements to disjoint groups based on the hash code. These groups are processed separately by the next downstream operator. 
 Usage: `dataStream.groupBy(fields…)`
  * *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances of the next operator.
@@ -457,11 +457,11 @@ dataStream.reduce(new ReduceFunction<Integer>() {
     </tr>
 
     <tr>
-      <td><strong>Merge</strong></td>
+      <td><strong>Union</strong></td>
       <td>
-        <p>Merges two or more data streams creating a new stream containing all the elements from all the streams.</p>
+        <p>Union of two or more data streams creating a new stream containing all the elements from all the streams.</p>
 {% highlight java %}
-dataStream.merge(otherStream1, otherStream2, …)
+dataStream.union(otherStream1, otherStream2, …)
 {% endhighlight %}
       </td>
     </tr>
@@ -563,11 +563,11 @@ dataStream.reduce{ _ + _ }
     </tr>
 
     <tr>
-      <td><strong>Merge</strong></td>
+      <td><strong>Union</strong></td>
       <td>
-        <p>Merges two or more data streams creating a new stream containing all the elements from all the streams.</p>
+        <p>Union of two or more data streams creating a new stream containing all the elements from all the streams.</p>
 {% highlight scala %}
-dataStream.merge(otherStream1, otherStream2, …)
+dataStream.union(otherStream1, otherStream2, …)
 {% endhighlight %}
       </td>
     </tr>
@@ -862,8 +862,8 @@ dataStream1 cross dataStream2 onWindow (windowing_params)
 
 ### Co operators
 
-Co operators allow the users to jointly transform two `DataStream`s of different types providing a simple way to jointly manipulate streams with a shared state. It is designed to support joint stream transformations where merging is not appropriate due to different data types or in case the user needs explicit tracking of the origin of individual elements.
-Co operators can be applied to `ConnectedDataStream`s which represent two `DataStream`s of possibly different types. A `ConnectedDataStream` can be created by calling the `connect(otherDataStream)` method of a `DataStream`. Please note that the two connected `DataStream`s can also be merged data streams.
+Co operators allow the users to jointly transform two `DataStream`s of different types providing a simple way to jointly manipulate streams with a shared state. It is designed to support joint stream transformations where union is not appropriate due to different data types or in case the user needs explicit tracking of the origin of individual elements.
+Co operators can be applied to `ConnectedDataStream`s which represent two `DataStream`s of possibly different types. A `ConnectedDataStream` can be created by calling the `connect(otherDataStream)` method of a `DataStream`.
 
 #### Map on ConnectedDataStream
 Applies a CoMap transformation on two separate DataStreams, mapping them to a common output type. The transformation calls a `CoMapFunction.map1()` for each element of the first input and `CoMapFunction.map2()` for each element of the second input. Each CoMapFunction call returns exactly one element.

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 35418e0..0176277 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -236,9 +236,9 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            second input stream.
 	 * @return The partitioned {@link ConnectedDataStream}
 	 */
-	public ConnectedDataStream<IN1, IN2> partitionBy(int keyPosition1, int keyPosition2) {
-		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keyPosition1),
-				dataStream2.partitionBy(keyPosition2));
+	public ConnectedDataStream<IN1, IN2> partitionByHash(int keyPosition1, int keyPosition2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionByHash(keyPosition1),
+				dataStream2.partitionByHash(keyPosition2));
 	}
 
 	/**
@@ -251,9 +251,9 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            The fields used to group the second input stream.
 	 * @return The partitioned {@link ConnectedDataStream}
 	 */
-	public ConnectedDataStream<IN1, IN2> partitionBy(int[] keyPositions1, int[] keyPositions2) {
-		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keyPositions1),
-				dataStream2.partitionBy(keyPositions2));
+	public ConnectedDataStream<IN1, IN2> partitionByHash(int[] keyPositions1, int[] keyPositions2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionByHash(keyPositions1),
+				dataStream2.partitionByHash(keyPositions2));
 	}
 
 	/**
@@ -269,9 +269,9 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            The partitioning expressions for the second input
 	 * @return The partitioned {@link ConnectedDataStream}
 	 */
-	public ConnectedDataStream<IN1, IN2> partitionBy(String field1, String field2) {
-		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(field1),
-				dataStream2.partitionBy(field2));
+	public ConnectedDataStream<IN1, IN2> partitionByHash(String field1, String field2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionByHash(field1),
+				dataStream2.partitionByHash(field2));
 	}
 
 	/**
@@ -287,9 +287,9 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            The partitioning expressions for the second input
 	 * @return The partitioned {@link ConnectedDataStream}
 	 */
-	public ConnectedDataStream<IN1, IN2> partitionBy(String[] fields1, String[] fields2) {
-		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(fields1),
-				dataStream2.partitionBy(fields2));
+	public ConnectedDataStream<IN1, IN2> partitionByHash(String[] fields1, String[] fields2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionByHash(fields1),
+				dataStream2.partitionByHash(fields2));
 	}
 
 	/**
@@ -302,10 +302,10 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            The {@link KeySelector} used for partitioning the second input
 	 * @return @return The partitioned {@link ConnectedDataStream}
 	 */
-	public ConnectedDataStream<IN1, IN2> partitionBy(KeySelector<IN1, ?> keySelector1,
-												 KeySelector<IN2, ?> keySelector2) {
-		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keySelector1),
-				dataStream2.partitionBy(keySelector2));
+	public ConnectedDataStream<IN1, IN2> partitionByHash(KeySelector<IN1, ?> keySelector1,
+														KeySelector<IN2, ?> keySelector2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionByHash(keySelector1),
+				dataStream2.partitionByHash(keySelector2));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 7abd327..1ec440d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -78,7 +78,7 @@ import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
@@ -112,7 +112,7 @@ public class DataStream<OUT> {
 	protected StreamPartitioner<OUT> partitioner;
 	@SuppressWarnings("rawtypes")
 	protected TypeInformation typeInfo;
-	protected List<DataStream<OUT>> mergedStreams;
+	protected List<DataStream<OUT>> unionizedStreams;
 	
 	protected Integer iterationID = null;
 	protected Long iterationWaitTime = null;
@@ -143,10 +143,10 @@ public class DataStream<OUT> {
 		this.parallelism = environment.getParallelism();
 		this.streamGraph = environment.getStreamGraph();
 		this.userDefinedNames = new ArrayList<String>();
-		this.partitioner = new DistributePartitioner<OUT>(true);
+		this.partitioner = new RebalancePartitioner<OUT>(true);
 		this.typeInfo = typeInfo;
-		this.mergedStreams = new ArrayList<DataStream<OUT>>();
-		this.mergedStreams.add(this);
+		this.unionizedStreams = new ArrayList<DataStream<OUT>>();
+		this.unionizedStreams.add(this);
 	}
 
 	/**
@@ -165,11 +165,11 @@ public class DataStream<OUT> {
 		this.typeInfo = dataStream.typeInfo;
 		this.iterationID = dataStream.iterationID;
 		this.iterationWaitTime = dataStream.iterationWaitTime;
-		this.mergedStreams = new ArrayList<DataStream<OUT>>();
-		this.mergedStreams.add(this);
-		if (dataStream.mergedStreams.size() > 1) {
-			for (int i = 1; i < dataStream.mergedStreams.size(); i++) {
-				this.mergedStreams.add(new DataStream<OUT>(dataStream.mergedStreams.get(i)));
+		this.unionizedStreams = new ArrayList<DataStream<OUT>>();
+		this.unionizedStreams.add(this);
+		if (dataStream.unionizedStreams.size() > 1) {
+			for (int i = 1; i < dataStream.unionizedStreams.size(); i++) {
+				this.unionizedStreams.add(new DataStream<OUT>(dataStream.unionizedStreams.get(i)));
 			}
 		}
 
@@ -261,16 +261,16 @@ public class DataStream<OUT> {
 	 * will be transformed simultaneously.
 	 * 
 	 * @param streams
-	 *            The DataStreams to merge output with.
+	 *            The DataStreams to union output with.
 	 * @return The {@link DataStream}.
 	 */
-	public DataStream<OUT> merge(DataStream<OUT>... streams) {
+	public DataStream<OUT> union(DataStream<OUT>... streams) {
 		DataStream<OUT> returnStream = this.copy();
 
 		for (DataStream<OUT> stream : streams) {
-			for (DataStream<OUT> ds : stream.mergedStreams) {
-				validateMerge(ds.getId());
-				returnStream.mergedStreams.add(ds.copy());
+			for (DataStream<OUT> ds : stream.unionizedStreams) {
+				validateUnion(ds.getId());
+				returnStream.unionizedStreams.add(ds.copy());
 			}
 		}
 		return returnStream;
@@ -288,7 +288,7 @@ public class DataStream<OUT> {
 	 * @return The {@link SplitDataStream}
 	 */
 	public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
-		for (DataStream<OUT> ds : this.mergedStreams) {
+		for (DataStream<OUT> ds : this.unionizedStreams) {
 			streamGraph.addOutputSelector(ds.getId(), clean(outputSelector));
 		}
 
@@ -380,11 +380,11 @@ public class DataStream<OUT> {
 	 * Specifies how elements will be distributed to parallel instances of downstream operations.
 	 *
 	 */
-	public DataStream<OUT> partitionBy(int... fields) {
+	public DataStream<OUT> partitionByHash(int... fields) {
 		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
-			return groupBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields));
+			return partitionByHash(new KeySelectorUtil.ArrayKeySelector<OUT>(fields));
 		} else {
-			return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+			return partitionByHash(new Keys.ExpressionKeys<OUT>(fields, getType()));
 		}
 	}
 
@@ -399,8 +399,8 @@ public class DataStream<OUT> {
 	 * Specifies how elements will be distributed to parallel instances of downstream operations.
 	 *
 	 */
-	public DataStream<OUT> partitionBy(String... fields) {
-		return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+	public DataStream<OUT> partitionByHash(String... fields) {
+		return partitionByHash(new Keys.ExpressionKeys<OUT>(fields, getType()));
 	}
 
 	/**
@@ -413,12 +413,12 @@ public class DataStream<OUT> {
 	 * @return The partitioned DataStream
 	 * Specifies how elements will be distributed to parallel instances of downstream operations.
 	 */
-	public DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
+	public DataStream<OUT> partitionByHash(KeySelector<OUT, ?> keySelector) {
 		return setConnectionType(new FieldsPartitioner<OUT>(clean(keySelector)));
 	}
 
 	//private helper method for partitioning
-	private DataStream<OUT> partitionBy(Keys<OUT> keys) {
+	private DataStream<OUT> partitionByHash(Keys<OUT> keys) {
 		return setConnectionType(
 				new FieldsPartitioner<OUT>(
 						clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))));
@@ -458,7 +458,7 @@ public class DataStream<OUT> {
 	 * @return The DataStream with shuffle partitioning set.
 	 */
 	public DataStream<OUT> forward() {
-		return setConnectionType(new DistributePartitioner<OUT>(true));
+		return setConnectionType(new RebalancePartitioner<OUT>(true));
 	}
 
 	/**
@@ -469,8 +469,8 @@ public class DataStream<OUT> {
 	 * 
 	 * @return The DataStream with shuffle partitioning set.
 	 */
-	public DataStream<OUT> distribute() {
-		return setConnectionType(new DistributePartitioner<OUT>(false));
+	public DataStream<OUT> rebalance() {
+		return setConnectionType(new RebalancePartitioner<OUT>(false));
 	}
 
 	/**
@@ -1295,7 +1295,7 @@ public class DataStream<OUT> {
 	protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
 		DataStream<OUT> returnStream = this.copy();
 
-		for (DataStream<OUT> stream : returnStream.mergedStreams) {
+		for (DataStream<OUT> stream : returnStream.unionizedStreams) {
 			stream.partitioner = partitioner;
 		}
 
@@ -1316,7 +1316,7 @@ public class DataStream<OUT> {
 	 *            Number of the type (used at co-functions)
 	 */
 	protected <X> void connectGraph(DataStream<X> inputStream, Integer outputID, int typeNumber) {
-		for (DataStream<X> stream : inputStream.mergedStreams) {
+		for (DataStream<X> stream : inputStream.unionizedStreams) {
 			streamGraph.addEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
 					inputStream.userDefinedNames);
 		}
@@ -1407,8 +1407,8 @@ public class DataStream<OUT> {
 		}
 	}
 
-	private void validateMerge(Integer id) {
-		for (DataStream<OUT> ds : this.mergedStreams) {
+	private void validateUnion(Integer id) {
+		for (DataStream<OUT> ds : this.unionizedStreams) {
 			if (ds.getId().equals(id)) {
 				throw new RuntimeException("A DataStream cannot be merged with itself");
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 62e7781..2d6829d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -49,7 +49,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 * @param keySelector Function for determining group inclusion
 	 */
 	public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, ?> keySelector) {
-		super(dataStream.partitionBy(keySelector));
+		super(dataStream.partitionByHash(keySelector));
 		this.keySelector = keySelector;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index bebfff0..1e5b5cf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -126,8 +126,8 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	}
 
 	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> distribute() {
-		return (SingleOutputStreamOperator<OUT, O>) super.distribute();
+	public SingleOutputStreamOperator<OUT, O> rebalance() {
+		return (SingleOutputStreamOperator<OUT, O>) super.rebalance();
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 69e059e..36a94c7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -57,7 +57,7 @@ public class SplitDataStream<OUT> extends DataStream<OUT> {
 
 		DataStream<OUT> returnStream = copy();
 
-		for (DataStream<OUT> ds : returnStream.mergedStreams) {
+		for (DataStream<OUT> ds : returnStream.unionizedStreams) {
 			ds.userDefinedNames = Arrays.asList(outputNames);
 		}
 		return returnStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
index 2c536d8..4ae2f97 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
@@ -147,7 +147,7 @@ public class JSONGenerator {
 		JSONObject input = new JSONObject();
 		inputArray.put(input);
 		input.put(ID, mappedInputID);
-		input.put(SHIP_STRATEGY, streamGraph.getEdge(inputID, vertexID).getPartitioner()
+		input.put(SHIP_STRATEGY, streamGraph.getStreamEdge(inputID, vertexID).getPartitioner()
 				.getStrategy());
 		input.put(SIDE, (inputArray.length() == 0) ? "first" : "second");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index aa0f164..95cbc23 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -330,7 +330,7 @@ public class StreamGraph extends StreamingPlan {
 		return streamNodes.keySet();
 	}
 
-	protected StreamEdge getEdge(int sourceId, int targetId) {
+	public StreamEdge getStreamEdge(int sourceId, int targetId) {
 		Iterator<StreamEdge> outIterator = getStreamNode(sourceId).getOutEdges().iterator();
 		while (outIterator.hasNext()) {
 			StreamEdge edge = outIterator.next();

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 7eb2028..1670a47 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -282,7 +282,7 @@ public class StreamingJobGraphGenerator {
 
 		for (StreamEdge output : allOutputs) {
 			config.setSelectedNames(output.getTargetID(),
-					streamGraph.getEdge(vertexID, output.getTargetID()).getSelectedNames());
+					streamGraph.getStreamEdge(vertexID, output.getTargetID()).getSelectedNames());
 		}
 
 		vertexConfigs.put(vertexID, config);

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
index f375536..92043e7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer;
 import org.apache.flink.streaming.api.operators.windowing.WindowFlattener;
 import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
-import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 
 public class WindowingOptimizer {
 
@@ -65,12 +65,12 @@ public class WindowingOptimizer {
 
 				// We connect the merge input to the flattener directly
 				streamGraph.addEdge(mergeInput.getID(), flattenerID,
-						new DistributePartitioner(true), 0, new ArrayList<String>());
+						new RebalancePartitioner(true), 0, new ArrayList<String>());
 
 				// If the merger is only connected to the flattener we delete it
 				// completely, otherwise we only remove the edge
 				if (input.getOutEdges().size() > 1) {
-					streamGraph.removeEdge(streamGraph.getEdge(input.getID(), flattenerID));
+					streamGraph.removeEdge(streamGraph.getStreamEdge(input.getID(), flattenerID));
 				} else {
 					streamGraph.removeVertex(input);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java
deleted file mode 100644
index 0fa191b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that distributes the data equally by cycling through the output
- * channels.
- * 
- * @param <T>
- *            Type of the Tuple
- */
-public class DistributePartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private int[] returnArray = new int[] {-1};
-	private boolean forward;
-
-	public DistributePartitioner(boolean forward) {
-		super(forward ? PartitioningStrategy.FORWARD : PartitioningStrategy.DISTRIBUTE);
-		this.forward = forward;
-	}
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
-
-		return this.returnArray;
-	}
-	
-	public StreamPartitioner<T> copy() {
-		return new DistributePartitioner<T>(forward);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
new file mode 100644
index 0000000..70d9c6b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner that distributes the data equally by cycling through the output
+ * channels.
+ * 
+ * @param <T>
+ *            Type of the Tuple
+ */
+public class RebalancePartitioner<T> extends StreamPartitioner<T> {
+	private static final long serialVersionUID = 1L;
+
+	private int[] returnArray = new int[] {-1};
+	private boolean forward;
+
+	public RebalancePartitioner(boolean forward) {
+		super(forward ? PartitioningStrategy.FORWARD : PartitioningStrategy.DISTRIBUTE);
+		this.forward = forward;
+	}
+
+	@Override
+	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
+			int numberOfOutputChannels) {
+		this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
+
+		return this.returnArray;
+	}
+	
+	public StreamPartitioner<T> copy() {
+		return new RebalancePartitioner<T>(forward);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
index 4b3543b..369b384 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
@@ -79,7 +79,7 @@ public class CoStreamTest {
 						return new Tuple2<Integer, Integer>(value, value + 1);
 					}
 				})
-				.distribute()
+				.rebalance()
 				.filter(new FilterFunction<Tuple2<Integer, Integer>>() {
 
 					private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 8d6b206..591aa27 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -20,14 +20,24 @@ package org.apache.flink.streaming.api;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.ConnectedDataStream;
+import org.apache.flink.streaming.api.datastream.GroupedDataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -36,6 +46,10 @@ public class DataStreamTest {
 	private static final long MEMORYSIZE = 32;
 	private static int PARALLELISM = 1;
 
+	/**
+	 * Tests {@link SingleOutputStreamOperator#name(String)} functionality.
+	 * @throws Exception
+	 */
 	@Test
 	public void testNaming() throws Exception {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
@@ -86,4 +100,165 @@ public class DataStreamTest {
 		assertTrue(plan.contains("testWindowFold"));
 	}
 
+	/**
+	 * Tests that {@link DataStream#groupBy} and {@link DataStream#partitionByHash} result in
+	 * different and correct topologies. Does the some for the {@link ConnectedDataStream}.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testPartitioning(){
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		StreamGraph graph = env.getStreamGraph();
+
+		DataStream src1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+		DataStream src2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+		ConnectedDataStream connected = src1.connect(src2);
+
+		//Testing DataStream grouping
+		DataStream group1 = src1.groupBy(0);
+		DataStream group2 = src1.groupBy(1,0);
+		DataStream group3 = src1.groupBy("f0");
+		DataStream group4 = src1.groupBy(new FirstSelector());
+
+		assertTrue(isPartitioned(graph.getStreamEdge(group1.getId(), createDownStreamId(group1))));
+		assertTrue(isPartitioned(graph.getStreamEdge(group2.getId(), createDownStreamId(group2))));
+		assertTrue(isPartitioned(graph.getStreamEdge(group3.getId(), createDownStreamId(group3))));
+		assertTrue(isPartitioned(graph.getStreamEdge(group4.getId(), createDownStreamId(group4))));
+
+		assertTrue(isGrouped(group1));
+		assertTrue(isGrouped(group2));
+		assertTrue(isGrouped(group3));
+		assertTrue(isGrouped(group4));
+
+		//Testing DataStream partitioning
+		DataStream partition1 = src1.partitionByHash(0);
+		DataStream partition2 = src1.partitionByHash(1, 0);
+		DataStream partition3 = src1.partitionByHash("f0");
+		DataStream partition4 = src1.partitionByHash(new FirstSelector());
+
+		assertTrue(isPartitioned(graph.getStreamEdge(partition1.getId(), createDownStreamId(partition1))));
+		assertTrue(isPartitioned(graph.getStreamEdge(partition2.getId(), createDownStreamId(partition2))));
+		assertTrue(isPartitioned(graph.getStreamEdge(partition3.getId(), createDownStreamId(partition3))));
+		assertTrue(isPartitioned(graph.getStreamEdge(partition4.getId(), createDownStreamId(partition4))));
+
+		assertFalse(isGrouped(partition1));
+		assertFalse(isGrouped(partition3));
+		assertFalse(isGrouped(partition2));
+		assertFalse(isGrouped(partition4));
+
+		//Testing ConnectedDataStream grouping
+		ConnectedDataStream connectedGroup1 = connected.groupBy(0,0);
+		Integer downStreamId1 = createDownStreamId(connectedGroup1);
+
+		ConnectedDataStream connectedGroup2 = connected.groupBy(new int[]{0},new int[]{0});
+		Integer downStreamId2 = createDownStreamId(connectedGroup2);
+
+		ConnectedDataStream connectedGroup3 = connected.groupBy("f0", "f0");
+		Integer downStreamId3 = createDownStreamId(connectedGroup3);
+
+		ConnectedDataStream connectedGroup4 = connected.groupBy(new String[]{"f0"}, new String[]{"f0"});
+		Integer downStreamId4 = createDownStreamId(connectedGroup4);
+
+		ConnectedDataStream connectedGroup5 = connected.groupBy(new FirstSelector(), new FirstSelector());
+		Integer downStreamId5 = createDownStreamId(connectedGroup5);
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup1.getFirst().getId(), downStreamId1)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup1.getSecond().getId(), downStreamId1)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup2.getFirst().getId(), downStreamId2)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup2.getSecond().getId(), downStreamId2)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup3.getFirst().getId(), downStreamId3)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup3.getSecond().getId(), downStreamId3)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup4.getFirst().getId(), downStreamId4)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup4.getSecond().getId(), downStreamId4)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup5.getFirst().getId(), downStreamId5)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup5.getSecond().getId(), downStreamId5)));
+
+		assertTrue(isGrouped(connectedGroup1));
+		assertTrue(isGrouped(connectedGroup2));
+		assertTrue(isGrouped(connectedGroup3));
+		assertTrue(isGrouped(connectedGroup4));
+		assertTrue(isGrouped(connectedGroup5));
+
+		//Testing ConnectedDataStream partitioning
+		ConnectedDataStream connectedPartition1 = connected.partitionByHash(0, 0);
+		Integer connectDownStreamId1 = createDownStreamId(connectedPartition1);
+
+		ConnectedDataStream connectedPartition2 = connected.partitionByHash(new int[]{0},new int[]{0});
+		Integer connectDownStreamId2 = createDownStreamId(connectedPartition2);
+
+		ConnectedDataStream connectedPartition3 = connected.partitionByHash("f0", "f0");
+		Integer connectDownStreamId3 = createDownStreamId(connectedPartition3);
+
+		ConnectedDataStream connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
+		Integer connectDownStreamId4 = createDownStreamId(connectedPartition4);
+
+		ConnectedDataStream connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
+		Integer connectDownStreamId5 = createDownStreamId(connectedPartition5);
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition1.getFirst().getId(), connectDownStreamId1)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition1.getSecond().getId(), connectDownStreamId1)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition2.getFirst().getId(), connectDownStreamId2)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition2.getSecond().getId(), connectDownStreamId2)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition3.getFirst().getId(), connectDownStreamId3)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition3.getSecond().getId(), connectDownStreamId3)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition4.getFirst().getId(), connectDownStreamId4)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition4.getSecond().getId(), connectDownStreamId4)));
+
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition5.getFirst().getId(), connectDownStreamId5)));
+		assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition5.getSecond().getId(), connectDownStreamId5)));
+
+		assertFalse(isGrouped(connectedPartition1));
+		assertFalse(isGrouped(connectedPartition2));
+		assertFalse(isGrouped(connectedPartition3));
+		assertFalse(isGrouped(connectedPartition4));
+		assertFalse(isGrouped(connectedPartition5));
+	}
+
+	/////////////////////////////////////////////////////////////
+	// Utilities
+	/////////////////////////////////////////////////////////////
+
+	private static Integer createDownStreamId(DataStream dataStream){
+		return dataStream.print().getId();
+	}
+
+	private static boolean isGrouped(DataStream dataStream){
+		return dataStream instanceof GroupedDataStream;
+	}
+
+	private static Integer createDownStreamId(ConnectedDataStream dataStream){
+		return dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
+			@Override
+			public Object map1(Tuple2<Long, Long> value) {
+				return null;
+			}
+
+			@Override
+			public Object map2(Tuple2<Long, Long> value) {
+				return null;
+			}
+		}).getId();
+	}
+
+	private static boolean isGrouped(ConnectedDataStream dataStream){
+		return (dataStream.getFirst() instanceof GroupedDataStream && dataStream.getSecond() instanceof GroupedDataStream);
+	}
+
+	private static boolean isPartitioned(StreamEdge edge){
+		return edge.getPartitioner() instanceof FieldsPartitioner;
+	}
+
+	private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long>{
+		@Override
+		public Long getKey(Tuple2<Long, Long> value) throws Exception {
+			return value.f0;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 06013e3..3e8a2c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -108,7 +108,7 @@ public class IterateTest {
 	public void testColocation() throws Exception {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
 
-		IterativeDataStream<Boolean> it = env.fromElements(true).distribute().map(new NoOpMap())
+		IterativeDataStream<Boolean> it = env.fromElements(true).rebalance().map(new NoOpMap())
 				.iterate();
 
 		DataStream<Boolean> head = it.map(new NoOpMap()).setParallelism(2).name("HeadOperator");

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
index 14f0fa0..fc78d27 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -48,7 +48,7 @@ public class OutputSplitterTest {
 		DataStream<Integer> d1 = env.fromElements(0, 2, 4, 6, 8);
 		DataStream<Integer> d2 = env.fromElements(1, 3, 5, 7, 9);
 
-		d1 = d1.merge(d2);
+		d1 = d1.union(d2);
 
 		d1.split(new OutputSelector<Integer>() {
 			private static final long serialVersionUID = 8354166915727490130L;

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index d321851..9d4efdc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -53,7 +53,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.Serializable;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -228,13 +227,13 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		sourceStream31.filter(new PrimeFilterFunction())
 				.window(Count.of(100))
 				.max(0).flatten()
-				.merge(sourceStream32.filter(new PrimeFilterFunction())
+				.union(sourceStream32.filter(new PrimeFilterFunction())
 						.window(Count.of(100))
 						.max(0).flatten())
 				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
 
 		sourceStream31.flatMap(new DivisorsFlatMapFunction())
-				.merge(sourceStream32.flatMap(new DivisorsFlatMapFunction())).map(new MapFunction<Long, Tuple2<Long,
+				.union(sourceStream32.flatMap(new DivisorsFlatMapFunction())).map(new MapFunction<Long, Tuple2<Long,
 				Integer>>() {
 
 			@Override
@@ -348,7 +347,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		});
 
 
-		dataStream53.merge(dataStream52)
+		dataStream53.union(dataStream52)
 				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
 
 		env.execute();

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
index 99cc62f..7f23e23 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
 import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -71,10 +70,10 @@ public class CoFlatMapTest implements Serializable {
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
-		DataStream<Integer> ds2 = env.fromElements(2, 4).merge(ds1);
+		DataStream<Integer> ds2 = env.fromElements(2, 4).union(ds1);
 		
 		try {
-			ds1.forward().merge(ds2);
+			ds1.forward().union(ds2);
 			fail();
 		} catch (RuntimeException e) {
 			// expected

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
index 37638cf..b37e43a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
@@ -21,21 +21,20 @@ import static org.junit.Assert.*;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
 
 public class DistributePartitionerTest {
 	
-	private DistributePartitioner<Tuple> distributePartitioner;
+	private RebalancePartitioner<Tuple> distributePartitioner;
 	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
 	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
 			null);
 	
 	@Before
 	public void setPartitioner() {
-		distributePartitioner = new DistributePartitioner<Tuple>(false);
+		distributePartitioner = new RebalancePartitioner<Tuple>(false);
 	}
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
index c0d39da..405a28e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
@@ -21,21 +21,20 @@ import static org.junit.Assert.*;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
 
 public class ForwardPartitionerTest {
 
-	private DistributePartitioner<Tuple> forwardPartitioner;
+	private RebalancePartitioner<Tuple> forwardPartitioner;
 	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
 	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
 			null);
 
 	@Before
 	public void setPartitioner() {
-		forwardPartitioner = new DistributePartitioner<Tuple>(true);
+		forwardPartitioner = new RebalancePartitioner<Tuple>(true);
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
index 0974f3d..56abb12 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
@@ -61,7 +61,7 @@ import org.apache.flink.util.Collector;
  * <p>
  * This example shows how to:
  * <ul>
- * <li>merge and join data streams,
+ * <li>union and join data streams,
  * <li>use different windowing policies,
  * <li>define windowing aggregations.
  * </ul>
@@ -89,7 +89,7 @@ public class StockPrices {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		//Step 1 
-	    //Read a stream of stock prices from different sources and merge it into one stream
+	    //Read a stream of stock prices from different sources and union it into one stream
 		
 		//Read from a socket stream at map it to StockPrice objects
 		DataStream<StockPrice> socketStockStream = env.socketTextStream(hostName, port)
@@ -111,7 +111,7 @@ public class StockPrices {
 
 		//Merge all stock streams together
 		@SuppressWarnings("unchecked")
-		DataStream<StockPrice> stockStream = socketStockStream.merge(SPX_stream, FTSE_stream, DJI_stream, BUX_stream);
+		DataStream<StockPrice> stockStream = socketStockStream.union(SPX_stream, FTSE_stream, DJI_stream, BUX_stream);
 		
 		//Step 2
 	    //Compute some simple statistics on a rolling window

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
index f9530eb..4940c6c 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
@@ -50,7 +50,7 @@ import scala.util.Random
  *
  * This example shows how to:
  *
- *   - merge and join data streams,
+ *   - union and join data streams,
  *   - use different windowing policies,
  *   - define windowing aggregations.
  */
@@ -77,7 +77,7 @@ object StockPrices {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     //Step 1 
-    //Read a stream of stock prices from different sources and merge it into one stream
+    //Read a stream of stock prices from different sources and union it into one stream
 
     //Read from a socket stream at map it to StockPrice objects
     val socketStockStream = env.socketTextStream(hostName, port).map(x => {
@@ -91,8 +91,8 @@ object StockPrices {
     val DJI_Stream = env.addSource(generateStock("DJI")(30))
     val BUX_Stream = env.addSource(generateStock("BUX")(40))
 
-    //Merge all stock streams together
-    val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, DJI_Stream, BUX_Stream)
+    //Union all stock streams together
+    val stockStream = socketStockStream.union(SPX_Stream, FTSE_Stream, DJI_Stream, BUX_Stream)
 
     //Step 2
     //Compute some simple statistics on a rolling window
@@ -103,7 +103,7 @@ object StockPrices {
     val rollingMean = windowedStream.groupBy("symbol").mapWindow(mean _).getDiscretizedStream
 
     //Step 3 
-    //Use  delta policy to create price change warnings,
+    //Use delta policy to create price change warnings,
     // and also count the number of warning every half minute
 
     val priceWarnings = stockStream.groupBy("symbol")
@@ -152,9 +152,9 @@ object StockPrices {
       .flatten()
 
     if (fileOutput) {
-      rollingCorrelation.writeAsText(outputPath, 1);
+      rollingCorrelation.writeAsText(outputPath, 1)
     } else {
-      rollingCorrelation.print;
+      rollingCorrelation.print
     }
 
     env.execute("Stock stream")

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
index ec2c6cc..6fbc73f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
@@ -260,8 +260,8 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * second input stream.
    * @return The transformed { @link ConnectedDataStream}
    */
-  def partitionBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
-    javaStream.partitionBy(keyPosition1, keyPosition2)
+  def partitionByHash(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
+    javaStream.partitionByHash(keyPosition1, keyPosition2)
   }
 
   /**
@@ -274,9 +274,9 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The fields used to partition the second input stream.
    * @return The transformed { @link ConnectedDataStream}
    */
-  def partitionBy(keyPositions1: Array[Int], keyPositions2: Array[Int]):
+  def partitionByHash(keyPositions1: Array[Int], keyPositions2: Array[Int]):
   ConnectedDataStream[IN1, IN2] = {
-    javaStream.partitionBy(keyPositions1, keyPositions2)
+    javaStream.partitionByHash(keyPositions1, keyPositions2)
   }
 
   /**
@@ -292,8 +292,8 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The partitioning expression for the second input
    * @return The grouped { @link ConnectedDataStream}
    */
-  def partitionBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
-    javaStream.partitionBy(field1, field2)
+  def partitionByHash(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
+    javaStream.partitionByHash(field1, field2)
   }
 
   /**
@@ -306,9 +306,9 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The partitioning expressions for the second input
    * @return The partitioned { @link ConnectedDataStream}
    */
-  def partitionBy(fields1: Array[String], fields2: Array[String]):
+  def partitionByHash(fields1: Array[String], fields2: Array[String]):
   ConnectedDataStream[IN1, IN2] = {
-    javaStream.partitionBy(fields1, fields2)
+    javaStream.partitionByHash(fields1, fields2)
   }
 
   /**
@@ -321,7 +321,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The function used for partitioning the second input
    * @return The partitioned { @link ConnectedDataStream}
    */
-  def partitionBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
+  def partitionByHash[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
   ConnectedDataStream[IN1, IN2] = {
 
     val keyExtractor1 = new KeySelector[IN1, K] {
@@ -331,7 +331,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
       def getKey(in: IN2) = clean(fun2)(in)
     }
 
-    javaStream.partitionBy(keyExtractor1, keyExtractor2)
+    javaStream.partitionByHash(keyExtractor1, keyExtractor2)
   }
 
   /**
@@ -367,14 +367,9 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * the same key. This type of reduce is much faster than reduceGroup since
    * the reduce function can be applied incrementally.
    *
-   * @param reducer1
-   * @param reducer2
-   * @param mapper1
-   * @param mapper2
-   *
    * @return The transformed { @link DataStream}.
    */
-  def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1, 
+  def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1,
       reducer2: (IN2, IN2) => IN2,mapper1: IN1 => R, mapper2: IN2 => R): DataStream[R] = {
     if (mapper1 == null || mapper2 == null) {
       throw new NullPointerException("Map functions must not be null.")

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 92304ae..f32d0cc 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -191,8 +191,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * will be transformed simultaneously.
    *
    */
-  def merge(dataStreams: DataStream[T]*): DataStream[T] =
-    javaStream.merge(dataStreams.map(_.getJavaStream): _*)
+  def union(dataStreams: DataStream[T]*): DataStream[T] =
+    javaStream.union(dataStreams.map(_.getJavaStream): _*)
 
   /**
    * Creates a new ConnectedDataStream by connecting
@@ -232,31 +232,31 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Partitions the elements of a DataStream by the given key positions (for tuple/array types) to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def partitionBy(fields: Int*): DataStream[T] = javaStream.partitionBy(fields: _*)
+  def partitionByHash(fields: Int*): DataStream[T] = javaStream.partitionByHash(fields: _*)
 
   /**
    * Groups the elements of a DataStream by the given field expressions to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
-    javaStream.partitionBy(firstField +: otherFields.toArray: _*)
+  def partitionByHash(firstField: String, otherFields: String*): DataStream[T] =
+    javaStream.partitionByHash(firstField +: otherFields.toArray: _*)
 
   /**
    * Groups the elements of a DataStream by the given K key to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
+  def partitionByHash[K: TypeInformation](fun: T => K): DataStream[T] = {
 
     val keyExtractor = new KeySelector[T, K] {
       val cleanFun = clean(fun)
       def getKey(in: T) = cleanFun(in)
     }
-    javaStream.partitionBy(keyExtractor)
+    javaStream.partitionByHash(keyExtractor)
   }
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
-   * are broadcasted to every parallel instance of the next component. This
+   * are broad casted to every parallel instance of the next component. This
    * setting only effects the how the outputs will be distributed between the
    * parallel instances of the next processing operator.
    *
@@ -296,7 +296,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * the next processing operator.
    *
    */
-  def distribute: DataStream[T] = javaStream.distribute()
+  def rebalance: DataStream[T] = javaStream.rebalance()
 
   /**
    * Initiates an iterative part of the program that creates a loop by feeding

http://git-wip-us.apache.org/repos/asf/flink/blob/3527f40f/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index fb4b2b7..a25024e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -88,7 +88,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 
 		DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT))
 				// add a non-chained no-op map to test the chain state restore logic
-				.distribute().map(new MapFunction<Long, Long>() {
+				.rebalance().map(new MapFunction<Long, Long>() {
 					@Override
 					public Long map(Long value) throws Exception {
 						return value;


[2/4] flink git commit: [FLINK-2103] [streaming] [api-extending] PartitionBy for connected streams, scala & docs

Posted by mb...@apache.org.
[FLINK-2103] [streaming] [api-extending] PartitionBy for connected streams, scala & docs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b28bdf3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b28bdf3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b28bdf3

Branch: refs/heads/master
Commit: 6b28bdf359ba33cf106b96cf42fac0a431935330
Parents: a43e0d5
Author: mbalassi <mb...@apache.org>
Authored: Tue Jun 2 10:49:37 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Jun 3 12:47:41 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |  4 +-
 .../api/datastream/ConnectedDataStream.java     | 92 +++++++++++++++++++-
 .../streaming/api/datastream/DataStream.java    |  8 +-
 .../api/datastream/WindowedDataStream.java      |  2 +-
 .../api/scala/ConnectedDataStream.scala         | 88 ++++++++++++++++++-
 .../flink/streaming/api/scala/DataStream.scala  | 36 ++++++--
 6 files changed, 213 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 543499c..e041e45 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -282,7 +282,9 @@ Usage: `dataStream.forward()`
 Usage: `dataStream.shuffle()`
  * *Distribute*: Distribute partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
 Usage: `dataStream.distribute()`
- * *Field/Key*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. 
+ * *Field/Key Partitioning*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. 
+Usage: `dataStream.partitionBy(fields…)`
+* *Field/Key Grouping*: Field/Key grouping takes partitioning one step further and seperates the elements to disjoint groups based on the hash code. These groups are processed separately by the next downstream operator. 
 Usage: `dataStream.groupBy(fields…)`
  * *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances of the next operator.
 Usage: `dataStream.broadcast()`

http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 02db538..35418e0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -145,7 +145,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * @param keyPosition2
 	 *            The field used to compute the hashcode of the elements in the
 	 *            second input stream.
-	 * @return @return The transformed {@link ConnectedDataStream}
+	 * @return The grouped {@link ConnectedDataStream}
 	 */
 	public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
 		return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(keyPosition1),
@@ -162,7 +162,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            The fields used to group the first input stream.
 	 * @param keyPositions2
 	 *            The fields used to group the second input stream.
-	 * @return @return The transformed {@link ConnectedDataStream}
+	 * @return The grouped {@link ConnectedDataStream}
 	 */
 	public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
 		return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1),
@@ -175,7 +175,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * expression is either the name of a public field or a getter method with
 	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
 	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-	 * 
+	 *
 	 * @param field1
 	 *            The grouping expression for the first input
 	 * @param field2
@@ -216,7 +216,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            The {@link KeySelector} used for grouping the first input
 	 * @param keySelector2
 	 *            The {@link KeySelector} used for grouping the second input
-	 * @return @return The transformed {@link ConnectedDataStream}
+	 * @return The partitioned {@link ConnectedDataStream}
 	 */
 	public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
 			KeySelector<IN2, ?> keySelector2) {
@@ -225,6 +225,90 @@ public class ConnectedDataStream<IN1, IN2> {
 	}
 
 	/**
+	 * PartitionBy operation for connected data stream. Partitions the elements of
+	 * input1 and input2 according to keyPosition1 and keyPosition2.
+	 *
+	 * @param keyPosition1
+	 *            The field used to compute the hashcode of the elements in the
+	 *            first input stream.
+	 * @param keyPosition2
+	 *            The field used to compute the hashcode of the elements in the
+	 *            second input stream.
+	 * @return The partitioned {@link ConnectedDataStream}
+	 */
+	public ConnectedDataStream<IN1, IN2> partitionBy(int keyPosition1, int keyPosition2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keyPosition1),
+				dataStream2.partitionBy(keyPosition2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream. Partitions the elements of
+	 * input1 and input2 according to keyPositions1 and keyPositions2.
+	 *
+	 * @param keyPositions1
+	 *            The fields used to group the first input stream.
+	 * @param keyPositions2
+	 *            The fields used to group the second input stream.
+	 * @return The partitioned {@link ConnectedDataStream}
+	 */
+	public ConnectedDataStream<IN1, IN2> partitionBy(int[] keyPositions1, int[] keyPositions2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keyPositions1),
+				dataStream2.partitionBy(keyPositions2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream using key expressions. Partitions
+	 * the elements of input1 and input2 according to field1 and field2. A
+	 * field expression is either the name of a public field or a getter method
+	 * with parentheses of the {@link DataStream}s underlying type. A dot can be
+	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+	 *
+	 * @param field1
+	 *            The partitioning expressions for the first input
+	 * @param field2
+	 *            The partitioning expressions for the second input
+	 * @return The partitioned {@link ConnectedDataStream}
+	 */
+	public ConnectedDataStream<IN1, IN2> partitionBy(String field1, String field2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(field1),
+				dataStream2.partitionBy(field2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream using key expressions. Partitions
+	 * the elements of input1 and input2 according to fields1 and fields2. A
+	 * field expression is either the name of a public field or a getter method
+	 * with parentheses of the {@link DataStream}s underlying type. A dot can be
+	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+	 *
+	 * @param fields1
+	 *            The partitioning expressions for the first input
+	 * @param fields2
+	 *            The partitioning expressions for the second input
+	 * @return The partitioned {@link ConnectedDataStream}
+	 */
+	public ConnectedDataStream<IN1, IN2> partitionBy(String[] fields1, String[] fields2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(fields1),
+				dataStream2.partitionBy(fields2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream. Partitions the elements of
+	 * input1 and input2 using keySelector1 and keySelector2.
+	 *
+	 * @param keySelector1
+	 *            The {@link KeySelector} used for partitioning the first input
+	 * @param keySelector2
+	 *            The {@link KeySelector} used for partitioning the second input
+	 * @return @return The partitioned {@link ConnectedDataStream}
+	 */
+	public ConnectedDataStream<IN1, IN2> partitionBy(KeySelector<IN1, ?> keySelector1,
+												 KeySelector<IN2, ?> keySelector2) {
+		return new ConnectedDataStream<IN1, IN2>(dataStream1.partitionBy(keySelector1),
+				dataStream2.partitionBy(keySelector2));
+	}
+
+	/**
 	 * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
 	 * the output to a common type. The transformation calls a
 	 * {@link CoMapFunction#map1} for each element of the first input and

http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index db1f40f..7abd327 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -344,9 +344,7 @@ public class DataStream<OUT> {
 	 * @return The grouped {@link DataStream}
 	 **/
 	public GroupedDataStream<OUT> groupBy(String... fields) {
-
 		return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
-
 	}
 
 	/**
@@ -383,7 +381,11 @@ public class DataStream<OUT> {
 	 *
 	 */
 	public DataStream<OUT> partitionBy(int... fields) {
-		return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
+			return groupBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields));
+		} else {
+			return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 5d769ca..dd9af2f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -436,7 +436,7 @@ public class WindowedDataStream<OUT> {
 				.with(clean(reduceFunction));
 
 		// We get the windowbuffer and set it to emit empty windows with
-		// sequential IDs. This logic is necessarry to merge windows created in
+		// sequential IDs. This logic is necessary to merge windows created in
 		// parallel.
 		WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation).emitEmpty().sequentialID();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
index 47d8fd2..ec2c6cc 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
@@ -233,7 +233,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The function used for grouping the first input
    * @param fun2
    * The function used for grouping the second input
-   * @return @return The transformed { @link ConnectedDataStream}
+   * @return The grouped { @link ConnectedDataStream}
    */
   def groupBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
   ConnectedDataStream[IN1, IN2] = {
@@ -249,6 +249,92 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
   }
 
   /**
+   * PartitionBy operation for connected data stream. Partitions the elements of
+   * input1 and input2 according to keyPosition1 and keyPosition2.
+   *
+   * @param keyPosition1
+   * The field used to compute the hashcode of the elements in the
+   * first input stream.
+   * @param keyPosition2
+   * The field used to compute the hashcode of the elements in the
+   * second input stream.
+   * @return The transformed { @link ConnectedDataStream}
+   */
+  def partitionBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
+    javaStream.partitionBy(keyPosition1, keyPosition2)
+  }
+
+  /**
+   * PartitionBy operation for connected data stream. Partitions the elements of
+   * input1 and input2 according to keyPositions1 and keyPositions2.
+   *
+   * @param keyPositions1
+   * The fields used to partition the first input stream.
+   * @param keyPositions2
+   * The fields used to partition the second input stream.
+   * @return The transformed { @link ConnectedDataStream}
+   */
+  def partitionBy(keyPositions1: Array[Int], keyPositions2: Array[Int]):
+  ConnectedDataStream[IN1, IN2] = {
+    javaStream.partitionBy(keyPositions1, keyPositions2)
+  }
+
+  /**
+   * PartitionBy operation for connected data stream using key expressions. Partitions
+   * the elements of input1 and input2 according to field1 and field2. A field
+   * expression is either the name of a public field or a getter method with
+   * parentheses of the {@link DataStream}S underlying type. A dot can be used
+   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+   *
+   * @param field1
+   * The partitioning expression for the first input
+   * @param field2
+   * The partitioning expression for the second input
+   * @return The grouped { @link ConnectedDataStream}
+   */
+  def partitionBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
+    javaStream.partitionBy(field1, field2)
+  }
+
+  /**
+   * PartitionBy operation for connected data stream using key expressions. Partitions
+   * the elements of input1 and input2 according to fields1 and fields2.
+   *
+   * @param fields1
+   * The partitioning expressions for the first input
+   * @param fields2
+   * The partitioning expressions for the second input
+   * @return The partitioned { @link ConnectedDataStream}
+   */
+  def partitionBy(fields1: Array[String], fields2: Array[String]):
+  ConnectedDataStream[IN1, IN2] = {
+    javaStream.partitionBy(fields1, fields2)
+  }
+
+  /**
+   * PartitionBy operation for connected data stream. Partitions the elements of
+   * input1 and input2 using fun1 and fun2.
+   *
+   * @param fun1
+   * The function used for partitioning the first input
+   * @param fun2
+   * The function used for partitioning the second input
+   * @return The partitioned { @link ConnectedDataStream}
+   */
+  def partitionBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
+  ConnectedDataStream[IN1, IN2] = {
+
+    val keyExtractor1 = new KeySelector[IN1, K] {
+      def getKey(in: IN1) = clean(fun1)(in)
+    }
+    val keyExtractor2 = new KeySelector[IN2, L] {
+      def getKey(in: IN2) = clean(fun2)(in)
+    }
+
+    javaStream.partitionBy(keyExtractor1, keyExtractor2)
+  }
+
+  /**
    * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
    * the outputs to a common type. If the {@link ConnectedDataStream} is
    * batched or windowed then the reduce transformation is applied on every

http://git-wip-us.apache.org/repos/asf/flink/blob/6b28bdf3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index c5b101e..92304ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -198,30 +198,26 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Creates a new ConnectedDataStream by connecting
    * DataStream outputs of different type with each other. The
    * DataStreams connected using this operators can be used with CoFunctions.
-   *
    */
   def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = 
     javaStream.connect(dataStream.getJavaStream)
 
   /**
    * Groups the elements of a DataStream by the given key positions (for tuple/array types) to
-   * be used with grouped operators like grouped reduce or grouped aggregations
-   *
+   * be used with grouped operators like grouped reduce or grouped aggregations.
    */
   def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*)
 
   /**
    * Groups the elements of a DataStream by the given field expressions to
-   * be used with grouped operators like grouped reduce or grouped aggregations
-   *
+   * be used with grouped operators like grouped reduce or grouped aggregations.
    */
   def groupBy(firstField: String, otherFields: String*): DataStream[T] = 
    javaStream.groupBy(firstField +: otherFields.toArray: _*)   
   
   /**
    * Groups the elements of a DataStream by the given K key to
-   * be used with grouped operators like grouped reduce or grouped aggregations
-   *
+   * be used with grouped operators like grouped reduce or grouped aggregations.
    */
   def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
 
@@ -233,6 +229,32 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
+   * Partitions the elements of a DataStream by the given key positions (for tuple/array types) to
+   * be used with grouped operators like grouped reduce or grouped aggregations.
+   */
+  def partitionBy(fields: Int*): DataStream[T] = javaStream.partitionBy(fields: _*)
+
+  /**
+   * Groups the elements of a DataStream by the given field expressions to
+   * be used with grouped operators like grouped reduce or grouped aggregations.
+   */
+  def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
+    javaStream.partitionBy(firstField +: otherFields.toArray: _*)
+
+  /**
+   * Groups the elements of a DataStream by the given K key to
+   * be used with grouped operators like grouped reduce or grouped aggregations.
+   */
+  def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
+
+    val keyExtractor = new KeySelector[T, K] {
+      val cleanFun = clean(fun)
+      def getKey(in: T) = cleanFun(in)
+    }
+    javaStream.partitionBy(keyExtractor)
+  }
+
+  /**
    * Sets the partitioning of the DataStream so that the output tuples
    * are broadcasted to every parallel instance of the next component. This
    * setting only effects the how the outputs will be distributed between the