You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:41 UTC

[07/13] flink git commit: [FLINK-2550] Rename ConnectedDataStream to ConnectedStreams, Remove some operations

[FLINK-2550] Rename ConnectedDataStream to ConnectedStreams, Remove some operations

The removed operations are tricky and some of them are not working
correctly. For now, co-reduce, stream-cross and stream-join are
removed.

I'm planning to add a new join implementation based on tagged union
that uses the new windowing code.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23d8e264
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23d8e264
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23d8e264

Branch: refs/heads/master
Commit: 23d8e26438370d8c99c24c8b43d543e953775fd2
Parents: 9e6e0ae
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Oct 1 17:07:11 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |  22 +-
 .../connectors/flume/FlumeTopology.java         |   2 +-
 .../api/datastream/ConnectedDataStream.java     | 502 ------------------
 .../api/datastream/ConnectedStreams.java        | 331 ++++++++++++
 .../streaming/api/datastream/DataStream.java    |  39 +-
 .../api/datastream/IterativeDataStream.java     |  54 +-
 .../temporal/StreamCrossOperator.java           | 115 -----
 .../datastream/temporal/StreamJoinOperator.java |  54 +-
 .../api/functions/co/CoReduceFunction.java      | 107 ----
 .../api/functions/co/CoWindowFunction.java      |  30 --
 .../api/functions/co/CrossWindowFunction.java   |  44 --
 .../api/functions/co/JoinWindowFunction.java    |  77 ---
 .../api/functions/co/RichCoReduceFunction.java  |  40 --
 .../api/functions/co/RichCoWindowFunction.java  |  34 --
 .../api/operators/co/CoStreamGroupedReduce.java |  77 ---
 .../api/operators/co/CoStreamReduce.java        |  86 ----
 .../api/operators/co/CoStreamWindow.java        | 228 ---------
 .../flink/streaming/api/DataStreamTest.java     |  39 +-
 .../apache/flink/streaming/api/IterateTest.java |  14 +-
 .../flink/streaming/api/TypeFillTest.java       |  46 --
 .../streaming/api/WindowCrossJoinTest.java      |  17 -
 .../api/graph/StreamGraphGeneratorTest.java     |   4 +-
 .../api/scala/ConnectedDataStream.scala         | 510 -------------------
 .../streaming/api/scala/ConnectedStreams.scala  | 353 +++++++++++++
 .../flink/streaming/api/scala/DataStream.scala  |  23 +-
 .../api/scala/StreamCrossOperator.scala         | 101 ----
 .../api/scala/StreamJoinOperator.scala          |  30 +-
 .../flink/streaming/api/scala/package.scala     |   4 +-
 .../streaming/api/scala/DataStreamTest.scala    |  32 +-
 .../StreamingScalaAPICompletenessTest.scala     |  21 +-
 30 files changed, 831 insertions(+), 2205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 9c584ef..c437114 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -991,9 +991,9 @@ dataStream1 cross dataStream2 onWindow (windowing_params)
 ### Co operators
 
 Co operators allow the users to jointly transform two `DataStream`s of different types, providing a simple way to jointly manipulate streams with a shared state. It is designed to support joint stream transformations where union is not appropriate due to different data types, or in case the user needs explicit tracking of the origin of individual elements.
-Co operators can be applied to `ConnectedDataStream`s which represent two `DataStream`s of possibly different types. A `ConnectedDataStream` can be created by calling the `connect(otherDataStream)` method of a `DataStream`.
+Co operators can be applied to `ConnectedStreams` which represent two `DataStream`s of possibly different types. `ConnectedStreams` can be created by calling the `connect(otherDataStream)` method of a `DataStream`.
 
-#### Map on ConnectedDataStream
+#### Map on ConnectedStreams
 Applies a CoMap transformation on two separate DataStreams, mapping them to a common output type. The transformation calls a `CoMapFunction.map1()` for each element of the first input and `CoMapFunction.map2()` for each element of the second input. Each CoMapFunction call returns exactly one element.
 A CoMap operator that outputs true if an Integer value is received and false if a String value is received:
 
@@ -1032,8 +1032,8 @@ val dataStream2 : DataStream[String] = ...
 </div>
 </div>
 
-#### FlatMap on ConnectedDataStream
-The FlatMap operator for the `ConnectedDataStream` works similarly to CoMap, but instead of returning exactly one element after each map call the user can output arbitrarily many values using the Collector interface. 
+#### FlatMap on ConnectedStreams
+The FlatMap operator for `ConnectedStreams` works similarly to CoMap, but instead of returning exactly one element after each map call the user can output arbitrarily many values using the Collector interface.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1072,11 +1072,11 @@ val dataStream2 : DataStream[String] = ...
 </div>
 </div>
 
-#### WindowReduce on ConnectedDataStream
+#### WindowReduce on ConnectedStreams
 The windowReduce operator applies a user defined `CoWindowFunction` to time aligned windows of the two data streams and return zero or more elements of an arbitrary type. The user can define the window and slide intervals and can also implement custom timestamps to be used for calculating windows.
 
-#### Reduce on ConnectedDataStream
-The Reduce operator for the `ConnectedDataStream` applies a group-reduce transformation on the grouped joined data streams and then maps the reduced elements to a common output type. It works only for connected data streams where the inputs are grouped.
+#### Reduce on ConnectedStreams
+The Reduce operator for `ConnectedStreams` applies a group-reduce transformation on the grouped joined data streams and then maps the reduced elements to a common output type. It works only for connected data streams where the inputs are grouped.
 
 ### Output splitting
 <div class="codetabs" markdown="1">
@@ -1188,7 +1188,7 @@ To use this functionality the user needs to add the maxWaitTimeMillis parameter
 By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the `closeWith` method. 
 
 #### Iteration head as a co-operator
-The user can also treat the input and feedback stream of a streaming iteration as a `ConnectedDataStream`. This can be used to distinguish the feedback tuples and also to change the type of the iteration feedback. 
+The user can also treat the input and feedback stream of a streaming iteration as `ConnectedStreams`. This can be used to distinguish the feedback tuples and also to change the type of the iteration feedback.
 
 To use this feature the user needs to call the `withFeedbackType(type)` method of the iterative data stream and pass the type of the feedback stream:
 
@@ -1224,13 +1224,13 @@ To use this functionality the user needs to add the maxWaitTimeMillis parameter
 By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the `iterate` method. 
 
 #### Iteration head as a co-operator
-The user can also treat the input and feedback stream of a streaming iteration as a `ConnectedDataStream`. This can be used to distinguish the feedback tuples and also to change the type of the iteration feedback. 
+The user can also treat the input and feedback stream of a streaming iteration as `ConnectedStreams`. This can be used to distinguish the feedback tuples and also to change the type of the iteration feedback.
 
-To use this feature the user needs to call implement a step function that operates on a `ConnectedDataStream` and pass it to the `iterate(…)` call.
+To use this feature the user needs to call implement a step function that operates on `ConnectedStreams` and pass it to the `iterate(…)` call.
 
 {% highlight scala %}
 val iteratedStream = someDataStream.iterate(
-			stepFunction: ConnectedDataStream[T, F] => (DataStream[F], DataStream[R]), 
+			stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]),
 			maxWaitTimeMillis)
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index f630bce..45da6eb 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -29,7 +29,7 @@
 //		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 //
 //		@SuppressWarnings("unused")
-//		DataStream<String> dataStream1 = env.addSource(
+//		DataStream<String> inputStream1 = env.addSource(
 //				new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink(
 //				new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
 //

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
deleted file mode 100644
index 0406e35..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ /dev/null
@@ -1,502 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
-import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
-import org.apache.flink.streaming.api.operators.co.CoStreamGroupedReduce;
-import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-import org.apache.flink.streaming.api.operators.co.CoStreamWindow;
-import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
-import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-/**
- * The ConnectedDataStream represents a stream for two different data types. It
- * can be used to apply transformations like {@link CoMapFunction} on two
- * {@link DataStream}s
- * 
- * @param <IN1>
- *            Type of the first input data steam.
- * @param <IN2>
- *            Type of the second input data stream.
- */
-public class ConnectedDataStream<IN1, IN2> {
-
-	protected StreamExecutionEnvironment environment;
-	protected DataStream<IN1> dataStream1;
-	protected DataStream<IN2> dataStream2;
-
-	protected boolean isGrouped;
-	protected KeySelector<IN1, ?> keySelector1;
-	protected KeySelector<IN2, ?> keySelector2;
-
-	protected ConnectedDataStream(StreamExecutionEnvironment env, DataStream<IN1> input1, DataStream<IN2> input2) {
-		this.environment = env;
-		if (input1 != null) {
-			this.dataStream1 = input1;
-		}
-		if (input2 != null) {
-			this.dataStream2 = input2;
-		}
-
-		if ((input1 instanceof GroupedDataStream) && (input2 instanceof GroupedDataStream)) {
-			this.isGrouped = true;
-			this.keySelector1 = ((GroupedDataStream<IN1, ?>) input1).keySelector;
-			this.keySelector2 = ((GroupedDataStream<IN2, ?>) input2).keySelector;
-		} else {
-			this.isGrouped = false;
-			this.keySelector1 = null;
-			this.keySelector2 = null;
-		}
-	}
-
-	protected ConnectedDataStream(ConnectedDataStream<IN1, IN2> coDataStream) {
-		this.environment = coDataStream.environment;
-		this.dataStream1 = coDataStream.getFirst();
-		this.dataStream2 = coDataStream.getSecond();
-		this.isGrouped = coDataStream.isGrouped;
-		this.keySelector1 = coDataStream.keySelector1;
-		this.keySelector2 = coDataStream.keySelector2;
-	}
-
-	public <F> F clean(F f) {
-		if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
-			ClosureCleaner.clean(f, true);
-		}
-		ClosureCleaner.ensureSerializable(f);
-		return f;
-	}
-
-	public StreamExecutionEnvironment getExecutionEnvironment() {
-		return environment;
-	}
-
-	/**
-	 * Returns the first {@link DataStream}.
-	 * 
-	 * @return The first DataStream.
-	 */
-	public DataStream<IN1> getFirst() {
-		return dataStream1;
-	}
-
-	/**
-	 * Returns the second {@link DataStream}.
-	 * 
-	 * @return The second DataStream.
-	 */
-	public DataStream<IN2> getSecond() {
-		return dataStream2;
-	}
-
-	/**
-	 * Gets the type of the first input
-	 * 
-	 * @return The type of the first input
-	 */
-	public TypeInformation<IN1> getType1() {
-		return dataStream1.getType();
-	}
-
-	/**
-	 * Gets the type of the second input
-	 * 
-	 * @return The type of the second input
-	 */
-	public TypeInformation<IN2> getType2() {
-		return dataStream2.getType();
-	}
-
-	/**
-	 * GroupBy operation for connected data stream. Groups the elements of
-	 * input1 and input2 according to keyPosition1 and keyPosition2. Used for
-	 * applying function on grouped data streams for example
-	 * {@link ConnectedDataStream#reduce}
-	 * 
-	 * @param keyPosition1
-	 *            The field used to compute the hashcode of the elements in the
-	 *            first input stream.
-	 * @param keyPosition2
-	 *            The field used to compute the hashcode of the elements in the
-	 *            second input stream.
-	 * @return The grouped {@link ConnectedDataStream}
-	 */
-	public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
-		return new ConnectedDataStream<IN1, IN2>(this.environment, dataStream1.groupBy(keyPosition1),
-				dataStream2.groupBy(keyPosition2));
-	}
-
-	/**
-	 * GroupBy operation for connected data stream. Groups the elements of
-	 * input1 and input2 according to keyPositions1 and keyPositions2. Used for
-	 * applying function on grouped data streams for example
-	 * {@link ConnectedDataStream#reduce}
-	 * 
-	 * @param keyPositions1
-	 *            The fields used to group the first input stream.
-	 * @param keyPositions2
-	 *            The fields used to group the second input stream.
-	 * @return The grouped {@link ConnectedDataStream}
-	 */
-	public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
-		return new ConnectedDataStream<IN1, IN2>(environment, dataStream1.groupBy(keyPositions1),
-				dataStream2.groupBy(keyPositions2));
-	}
-
-	/**
-	 * GroupBy operation for connected data stream using key expressions. Groups
-	 * the elements of input1 and input2 according to field1 and field2. A field
-	 * expression is either the name of a public field or a getter method with
-	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
-	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field1
-	 *            The grouping expression for the first input
-	 * @param field2
-	 *            The grouping expression for the second input
-	 * @return The grouped {@link ConnectedDataStream}
-	 */
-	public ConnectedDataStream<IN1, IN2> groupBy(String field1, String field2) {
-		return new ConnectedDataStream<IN1, IN2>(environment, dataStream1.groupBy(field1),
-				dataStream2.groupBy(field2));
-	}
-
-	/**
-	 * GroupBy operation for connected data stream using key expressions. Groups
-	 * the elements of input1 and input2 according to fields1 and fields2. A
-	 * field expression is either the name of a public field or a getter method
-	 * with parentheses of the {@link DataStream}S underlying type. A dot can be
-	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
-	 * .
-	 * 
-	 * @param fields1
-	 *            The grouping expressions for the first input
-	 * @param fields2
-	 *            The grouping expressions for the second input
-	 * @return The grouped {@link ConnectedDataStream}
-	 */
-	public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] fields2) {
-		return new ConnectedDataStream<IN1, IN2>(environment, dataStream1.groupBy(fields1),
-				dataStream2.groupBy(fields2));
-	}
-
-	/**
-	 * GroupBy operation for connected data stream. Groups the elements of
-	 * input1 and input2 using keySelector1 and keySelector2. Used for applying
-	 * function on grouped data streams for example
-	 * {@link ConnectedDataStream#reduce}
-	 * 
-	 * @param keySelector1
-	 *            The {@link KeySelector} used for grouping the first input
-	 * @param keySelector2
-	 *            The {@link KeySelector} used for grouping the second input
-	 * @return The partitioned {@link ConnectedDataStream}
-	 */
-	public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
-			KeySelector<IN2, ?> keySelector2) {
-		return new ConnectedDataStream<IN1, IN2>(environment, dataStream1.groupBy(keySelector1),
-				dataStream2.groupBy(keySelector2));
-	}
-
-	/**
-	 * PartitionBy operation for connected data stream. Partitions the elements of
-	 * input1 and input2 according to keyPosition1 and keyPosition2.
-	 *
-	 * @param keyPosition1
-	 *            The field used to compute the hashcode of the elements in the
-	 *            first input stream.
-	 * @param keyPosition2
-	 *            The field used to compute the hashcode of the elements in the
-	 *            second input stream.
-	 * @return The partitioned {@link ConnectedDataStream}
-	 */
-	public ConnectedDataStream<IN1, IN2> partitionByHash(int keyPosition1, int keyPosition2) {
-		return new ConnectedDataStream<IN1, IN2>(environment, dataStream1.partitionByHash(keyPosition1),
-				dataStream2.partitionByHash(keyPosition2));
-	}
-
-	/**
-	 * PartitionBy operation for connected data stream. Partitions the elements of
-	 * input1 and input2 according to keyPositions1 and keyPositions2.
-	 *
-	 * @param keyPositions1
-	 *            The fields used to group the first input stream.
-	 * @param keyPositions2
-	 *            The fields used to group the second input stream.
-	 * @return The partitioned {@link ConnectedDataStream}
-	 */
-	public ConnectedDataStream<IN1, IN2> partitionByHash(int[] keyPositions1, int[] keyPositions2) {
-		return new ConnectedDataStream<IN1, IN2>(environment, dataStream1.partitionByHash(keyPositions1),
-				dataStream2.partitionByHash(keyPositions2));
-	}
-
-	/**
-	 * PartitionBy operation for connected data stream using key expressions. Partitions
-	 * the elements of input1 and input2 according to field1 and field2. A
-	 * field expression is either the name of a public field or a getter method
-	 * with parentheses of the {@link DataStream}s underlying type. A dot can be
-	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
-	 *
-	 * @param field1
-	 *            The partitioning expressions for the first input
-	 * @param field2
-	 *            The partitioning expressions for the second input
-	 * @return The partitioned {@link ConnectedDataStream}
-	 */
-	public ConnectedDataStream<IN1, IN2> partitionByHash(String field1, String field2) {
-		return new ConnectedDataStream<IN1, IN2>(environment, dataStream1.partitionByHash(field1),
-				dataStream2.partitionByHash(field2));
-	}
-
-	/**
-	 * PartitionBy operation for connected data stream using key expressions. Partitions
-	 * the elements of input1 and input2 according to fields1 and fields2. A
-	 * field expression is either the name of a public field or a getter method
-	 * with parentheses of the {@link DataStream}s underlying type. A dot can be
-	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
-	 *
-	 * @param fields1
-	 *            The partitioning expressions for the first input
-	 * @param fields2
-	 *            The partitioning expressions for the second input
-	 * @return The partitioned {@link ConnectedDataStream}
-	 */
-	public ConnectedDataStream<IN1, IN2> partitionByHash(String[] fields1, String[] fields2) {
-		return new ConnectedDataStream<IN1, IN2>(environment, dataStream1.partitionByHash(fields1),
-				dataStream2.partitionByHash(fields2));
-	}
-
-	/**
-	 * PartitionBy operation for connected data stream. Partitions the elements of
-	 * input1 and input2 using keySelector1 and keySelector2.
-	 *
-	 * @param keySelector1
-	 *            The {@link KeySelector} used for partitioning the first input
-	 * @param keySelector2
-	 *            The {@link KeySelector} used for partitioning the second input
-	 * @return @return The partitioned {@link ConnectedDataStream}
-	 */
-	public ConnectedDataStream<IN1, IN2> partitionByHash(KeySelector<IN1, ?> keySelector1,
-														KeySelector<IN2, ?> keySelector2) {
-		return new ConnectedDataStream<IN1, IN2>(environment, dataStream1.partitionByHash(keySelector1),
-				dataStream2.partitionByHash(keySelector2));
-	}
-
-	/**
-	 * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
-	 * the output to a common type. The transformation calls a
-	 * {@link CoMapFunction#map1} for each element of the first input and
-	 * {@link CoMapFunction#map2} for each element of the second input. Each
-	 * CoMapFunction call returns exactly one element.
-	 * 
-	 * @param coMapper
-	 *            The CoMapFunction used to jointly transform the two input
-	 *            DataStreams
-	 * @return The transformed {@link DataStream}
-	 */
-	public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
-
-		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
-				CoMapFunction.class, false, true, getType1(), getType2(),
-				Utils.getCallLocationName(), true);
-
-		return transform("Co-Map", outTypeInfo, new CoStreamMap<IN1, IN2, OUT>(
-				clean(coMapper)));
-
-	}
-
-	/**
-	 * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
-	 * maps the output to a common type. The transformation calls a
-	 * {@link CoFlatMapFunction#flatMap1} for each element of the first input
-	 * and {@link CoFlatMapFunction#flatMap2} for each element of the second
-	 * input. Each CoFlatMapFunction call returns any number of elements
-	 * including none.
-	 * 
-	 * @param coFlatMapper
-	 *            The CoFlatMapFunction used to jointly transform the two input
-	 *            DataStreams
-	 * @return The transformed {@link DataStream}
-	 */
-	public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
-			CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
-
-		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
-				CoFlatMapFunction.class, false, true, getType1(), getType2(),
-				Utils.getCallLocationName(), true);
-
-		return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<IN1, IN2, OUT>(
-				clean(coFlatMapper)));
-	}
-
-	/**
-	 * Applies a reduce transformation on a grouped{@link ConnectedDataStream} 
-	 * and maps the outputs to a common type. The reducer is applied on every 
-	 * group of elements sharing the same key. 
-	 * 
-	 * @param coReducer
-	 *            The {@link CoReduceFunction} that will be called for every
-	 *            element of the inputs.
-	 * @return The transformed {@link DataStream}.
-	 */
-	public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
-
-		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coReducer,
-				CoReduceFunction.class, false, true, getType1(), getType2(),
-				Utils.getCallLocationName(), true);
-
-		return transform("Co-Reduce", outTypeInfo, getReduceOperator(clean(coReducer)));
-
-	}
-
-	/**
-	 * Applies a CoWindow transformation on the connected DataStreams. The
-	 * transformation calls the {@link CoWindowFunction#coWindow} method for for
-	 * time aligned windows of the two data streams. System time is used as
-	 * default to compute windows.
-	 * 
-	 * @param coWindowFunction
-	 *            The {@link CoWindowFunction} that will be applied for the time
-	 *            windows.
-	 * @param windowSize
-	 *            Size of the windows that will be aligned for both streams in
-	 *            milliseconds.
-	 * @param slideInterval
-	 *            After every function call the windows will be slid by this
-	 *            interval.
-	 * 
-	 * @return The transformed {@link DataStream}.
-	 */
-	@SuppressWarnings("unchecked")
-	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(
-			CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize, long slideInterval) {
-		return windowReduce(coWindowFunction, windowSize, slideInterval,
-				(TimestampWrapper<IN1>) SystemTimestamp.getWrapper(),
-				(TimestampWrapper<IN2>) SystemTimestamp.getWrapper());
-	}
-
-	/**
-	 * Applies a CoWindow transformation on the connected DataStreams. The
-	 * transformation calls the {@link CoWindowFunction#coWindow} method for
-	 * time aligned windows of the two data streams. The user can implement
-	 * their own time stamps or use the system time by default.
-	 * 
-	 * @param coWindowFunction
-	 *            The {@link CoWindowFunction} that will be applied for the time
-	 *            windows.
-	 * @param windowSize
-	 *            Size of the windows that will be aligned for both streams. If
-	 *            system time is used it is milliseconds. User defined time
-	 *            stamps are assumed to be monotonically increasing.
-	 * @param slideInterval
-	 *            After every function call the windows will be slid by this
-	 *            interval.
-	 * 
-	 * @param timestamp1
-	 *            User defined time stamps for the first input.
-	 * @param timestamp2
-	 *            User defined time stamps for the second input.
-	 * @return The transformed {@link DataStream}.
-	 */
-	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(
-			CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize, long slideInterval,
-			TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) {
-
-		if (windowSize < 1) {
-			throw new IllegalArgumentException("Window size must be positive");
-		}
-		if (slideInterval < 1) {
-			throw new IllegalArgumentException("Slide interval must be positive");
-		}
-		
-		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coWindowFunction,
-				CoWindowFunction.class, false, true, getType1(), getType2(),
-				Utils.getCallLocationName(), true);
-
-		return transform("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
-				clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
-
-	}
-
-	protected <OUT> TwoInputStreamOperator<IN1, IN2, OUT> getReduceOperator(
-			CoReduceFunction<IN1, IN2, OUT> coReducer) {
-		if (isGrouped) {
-			return new CoStreamGroupedReduce<IN1, IN2, OUT>(clean(coReducer), keySelector1,
-					keySelector2);
-		} else {
-			throw new UnsupportedOperationException(
-					"Reduce can only be applied on grouped streams.");
-		}
-	}
-
-	public <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
-			CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> outTypeInfo,
-			long windowSize, long slideInterval, TimestampWrapper<IN1> timestamp1,
-			TimestampWrapper<IN2> timestamp2) {
-
-		if (windowSize < 1) {
-			throw new IllegalArgumentException("Window size must be positive");
-		}
-		if (slideInterval < 1) {
-			throw new IllegalArgumentException("Slide interval must be positive");
-		}
-
-		return transform("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
-				clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
-
-	}
-
-	public <OUT> SingleOutputStreamOperator<OUT, ?> transform(String functionName,
-			TypeInformation<OUT> outTypeInfo, TwoInputStreamOperator<IN1, IN2, OUT> operator) {
-
-		// read the output type of the input Transforms to coax out errors about MissinTypeInfo
-		dataStream1.getType();
-		dataStream2.getType();
-
-		TwoInputTransformation<IN1, IN2, OUT> transform = new TwoInputTransformation<IN1, IN2, OUT>(
-				dataStream1.getTransformation(),
-				dataStream2.getTransformation(),
-				functionName,
-				operator,
-				outTypeInfo,
-				environment.getParallelism());
-
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(environment, transform);
-
-		getExecutionEnvironment().addOperator(transform);
-
-		return returnStream;
-	}
-
-	protected ConnectedDataStream<IN1, IN2> copy() {
-		return new ConnectedDataStream<IN1, IN2>(this);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
new file mode 100644
index 0000000..2447c1e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+
+/**
+ * {@code ConnectedStreams} represents two connected streams of (possible) different data types. It
+ * can be used to apply transformations such as {@link CoMapFunction} on two
+ * {@link DataStream DataStreams}
+ * 
+ * @param <IN1> Type of the first input data steam.
+ * @param <IN2> Type of the second input data stream.
+ */
+public class ConnectedStreams<IN1, IN2> {
+
+	protected StreamExecutionEnvironment environment;
+	protected DataStream<IN1> inputStream1;
+	protected DataStream<IN2> inputStream2;
+
+	protected ConnectedStreams(StreamExecutionEnvironment env,
+			DataStream<IN1> input1,
+			DataStream<IN2> input2) {
+		this.environment = env;
+		if (input1 != null) {
+			this.inputStream1 = input1;
+		}
+		if (input2 != null) {
+			this.inputStream2 = input2;
+		}
+	}
+
+	public StreamExecutionEnvironment getExecutionEnvironment() {
+		return environment;
+	}
+
+	/**
+	 * Returns the first {@link DataStream}.
+	 *
+	 * @return The first DataStream.
+	 */
+	public DataStream<IN1> getFirstInput() {
+		return inputStream1;
+	}
+
+	/**
+	 * Returns the second {@link DataStream}.
+	 *
+	 * @return The second DataStream.
+	 */
+	public DataStream<IN2> getSecondInput() {
+		return inputStream2;
+	}
+
+	/**
+	 * Gets the type of the first input
+	 *
+	 * @return The type of the first input
+	 */
+	public TypeInformation<IN1> getType1() {
+		return inputStream1.getType();
+	}
+
+	/**
+	 * Gets the type of the second input
+	 *
+	 * @return The type of the second input
+	 */
+	public TypeInformation<IN2> getType2() {
+		return inputStream2.getType();
+	}
+
+	/**
+	 * GroupBy operation for connected data stream. Groups the elements of
+	 * input1 and input2 according to keyPosition1 and keyPosition2.
+	 *
+	 * @param keyPosition1
+	 *            The field used to compute the hashcode of the elements in the
+	 *            first input stream.
+	 * @param keyPosition2
+	 *            The field used to compute the hashcode of the elements in the
+	 *            second input stream.
+	 * @return The grouped {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
+		return new ConnectedStreams<>(this.environment, inputStream1.groupBy(keyPosition1),
+				inputStream2.groupBy(keyPosition2));
+	}
+
+	/**
+	 * GroupBy operation for connected data stream. Groups the elements of
+	 * input1 and input2 according to keyPositions1 and keyPositions2.
+	 *
+	 * @param keyPositions1
+	 *            The fields used to group the first input stream.
+	 * @param keyPositions2
+	 *            The fields used to group the second input stream.
+	 * @return The grouped {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
+		return new ConnectedStreams<>(environment, inputStream1.groupBy(keyPositions1),
+				inputStream2.groupBy(keyPositions2));
+	}
+
+	/**
+	 * GroupBy operation for connected data stream using key expressions. Groups
+	 * the elements of input1 and input2 according to field1 and field2. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field1
+	 *            The grouping expression for the first input
+	 * @param field2
+	 *            The grouping expression for the second input
+	 * @return The grouped {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> groupBy(String field1, String field2) {
+		return new ConnectedStreams<>(environment, inputStream1.groupBy(field1),
+				inputStream2.groupBy(field2));
+	}
+
+	/**
+	 * GroupBy operation for connected data stream using key expressions. Groups
+	 * the elements of input1 and input2 according to fields1 and fields2. A
+	 * field expression is either the name of a public field or a getter method
+	 * with parentheses of the {@link DataStream}S underlying type. A dot can be
+	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+	 * .
+	 *
+	 * @param fields1
+	 *            The grouping expressions for the first input
+	 * @param fields2
+	 *            The grouping expressions for the second input
+	 * @return The grouped {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> groupBy(String[] fields1, String[] fields2) {
+		return new ConnectedStreams<>(environment, inputStream1.groupBy(fields1),
+				inputStream2.groupBy(fields2));
+	}
+
+	/**
+	 * GroupBy operation for connected data stream. Groups the elements of
+	 * input1 and input2 using keySelector1 and keySelector2.
+	 *
+	 * @param keySelector1
+	 *            The {@link KeySelector} used for grouping the first input
+	 * @param keySelector2
+	 *            The {@link KeySelector} used for grouping the second input
+	 * @return The partitioned {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
+		return new ConnectedStreams<>(environment, inputStream1.groupBy(keySelector1),
+				inputStream2.groupBy(keySelector2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream. Partitions the elements of
+	 * input1 and input2 according to keyPosition1 and keyPosition2.
+	 *
+	 * @param keyPosition1
+	 *            The field used to compute the hashcode of the elements in the
+	 *            first input stream.
+	 * @param keyPosition2
+	 *            The field used to compute the hashcode of the elements in the
+	 *            second input stream.
+	 * @return The partitioned {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> partitionByHash(int keyPosition1, int keyPosition2) {
+		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keyPosition1),
+				inputStream2.partitionByHash(keyPosition2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream. Partitions the elements of
+	 * input1 and input2 according to keyPositions1 and keyPositions2.
+	 *
+	 * @param keyPositions1
+	 *            The fields used to group the first input stream.
+	 * @param keyPositions2
+	 *            The fields used to group the second input stream.
+	 * @return The partitioned {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> partitionByHash(int[] keyPositions1, int[] keyPositions2) {
+		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keyPositions1),
+				inputStream2.partitionByHash(keyPositions2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream using key expressions. Partitions
+	 * the elements of input1 and input2 according to field1 and field2. A
+	 * field expression is either the name of a public field or a getter method
+	 * with parentheses of the {@link DataStream}s underlying type. A dot can be
+	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+	 *
+	 * @param field1
+	 *            The partitioning expressions for the first input
+	 * @param field2
+	 *            The partitioning expressions for the second input
+	 * @return The partitioned {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> partitionByHash(String field1, String field2) {
+		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(field1),
+				inputStream2.partitionByHash(field2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream using key expressions. Partitions
+	 * the elements of input1 and input2 according to fields1 and fields2. A
+	 * field expression is either the name of a public field or a getter method
+	 * with parentheses of the {@link DataStream}s underlying type. A dot can be
+	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+	 *
+	 * @param fields1
+	 *            The partitioning expressions for the first input
+	 * @param fields2
+	 *            The partitioning expressions for the second input
+	 * @return The partitioned {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> partitionByHash(String[] fields1, String[] fields2) {
+		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(fields1),
+				inputStream2.partitionByHash(fields2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream. Partitions the elements of
+	 * input1 and input2 using keySelector1 and keySelector2.
+	 *
+	 * @param keySelector1
+	 *            The {@link KeySelector} used for partitioning the first input
+	 * @param keySelector2
+	 *            The {@link KeySelector} used for partitioning the second input
+	 * @return @return The partitioned {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> partitionByHash(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
+		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keySelector1),
+				inputStream2.partitionByHash(keySelector2));
+	}
+
+	/**
+	 * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
+	 * the output to a common type. The transformation calls a
+	 * {@link CoMapFunction#map1} for each element of the first input and
+	 * {@link CoMapFunction#map2} for each element of the second input. Each
+	 * CoMapFunction call returns exactly one element.
+	 * 
+	 * @param coMapper The CoMapFunction used to jointly transform the two input DataStreams
+	 * @return The transformed {@link DataStream}
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
+
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
+				CoMapFunction.class, false, true, getType1(), getType2(),
+				Utils.getCallLocationName(), true);
+
+		return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper)));
+
+	}
+
+	/**
+	 * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
+	 * maps the output to a common type. The transformation calls a
+	 * {@link CoFlatMapFunction#flatMap1} for each element of the first input
+	 * and {@link CoFlatMapFunction#flatMap2} for each element of the second
+	 * input. Each CoFlatMapFunction call returns any number of elements
+	 * including none.
+	 * 
+	 * @param coFlatMapper
+	 *            The CoFlatMapFunction used to jointly transform the two input
+	 *            DataStreams
+	 * @return The transformed {@link DataStream}
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
+			CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
+
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
+				CoFlatMapFunction.class, false, true, getType1(), getType2(),
+				Utils.getCallLocationName(), true);
+
+		return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
+	}
+
+	public <OUT> SingleOutputStreamOperator<OUT, ?> transform(String functionName,
+			TypeInformation<OUT> outTypeInfo,
+			TwoInputStreamOperator<IN1, IN2, OUT> operator) {
+
+		// read the output type of the input Transforms to coax out errors about MissingTypeInfo
+		inputStream1.getType();
+		inputStream2.getType();
+
+		TwoInputTransformation<IN1, IN2, OUT> transform = new TwoInputTransformation<>(
+				inputStream1.getTransformation(),
+				inputStream2.getTransformation(),
+				functionName,
+				operator,
+				outTypeInfo,
+				environment.getParallelism());
+
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(environment, transform);
+
+		getExecutionEnvironment().addOperator(transform);
+
+		return returnStream;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index ad159f9..3389016 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -44,7 +44,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.temporal.StreamCrossOperator;
 import org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.TimestampExtractor;
@@ -218,17 +217,17 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Creates a new {@link ConnectedDataStream} by connecting
+	 * Creates a new {@link ConnectedStreams} by connecting
 	 * {@link DataStream} outputs of (possible) different types with each other.
 	 * The DataStreams connected using this operator can be used with
 	 * CoFunctions to apply joint transformations.
 	 * 
 	 * @param dataStream
 	 *            The DataStream with which this stream will be connected.
-	 * @return The {@link ConnectedDataStream}.
+	 * @return The {@link ConnectedStreams}.
 	 */
-	public <R> ConnectedDataStream<T, R> connect(DataStream<R> dataStream) {
-		return new ConnectedDataStream<T, R>(environment, this, dataStream);
+	public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
+		return new ConnectedStreams<T, R>(environment, this, dataStream);
 	}
 
 	/**
@@ -536,7 +535,7 @@ public class DataStream<T> {
 	 * the data stream that will be fed back and used as the input for the
 	 * iteration head. The user can also use different feedback type than the
 	 * input of the iteration and treat the input and feedback streams as a
-	 * {@link ConnectedDataStream} be calling
+	 * {@link ConnectedStreams} be calling
 	 * {@link IterativeDataStream#withFeedbackType(TypeInformation)}
 	 * <p>
 	 * A common usage pattern for streaming iterations is to use output
@@ -567,7 +566,7 @@ public class DataStream<T> {
 	 * the data stream that will be fed back and used as the input for the
 	 * iteration head. The user can also use different feedback type than the
 	 * input of the iteration and treat the input and feedback streams as a
-	 * {@link ConnectedDataStream} be calling
+	 * {@link ConnectedStreams} be calling
 	 * {@link IterativeDataStream#withFeedbackType(TypeInformation)}
 	 * <p>
 	 * A common usage pattern for streaming iterations is to use output
@@ -680,32 +679,6 @@ public class DataStream<T> {
 		return new StreamProjection<T>(this, fieldIndexes).projectTupleX();
 	}
 
-
-	/**
-	 * Initiates a temporal Cross transformation.<br/>
-	 * A Cross transformation combines the elements of two {@link DataStream}s
-	 * into one DataStream over a specified time window. It builds all pair
-	 * combinations of elements of both DataStreams, i.e., it builds a Cartesian
-	 * product.
-	 * 
-	 * <p>
-	 * This method returns a {@link StreamCrossOperator} on which the
-	 * {@link StreamCrossOperator#onWindow} should be called to define the
-	 * window.
-	 * <p>
-	 * Call {@link StreamCrossOperator.CrossWindow#with(org.apache.flink.api.common.functions.CrossFunction)}
-	 * to define a custom cross function.
-	 * 
-	 * @param dataStreamToCross
-	 *            The other DataStream with which this DataStream is crossed.
-	 * @return A {@link StreamCrossOperator} to continue the definition of the
-	 *         cross transformation.
-	 * 
-	 */
-	public <IN2> StreamCrossOperator<T, IN2> cross(DataStream<IN2> dataStreamToCross) {
-		return new StreamCrossOperator<T, IN2>(this, dataStreamToCross);
-	}
-
 	/**
 	 * Initiates a temporal Join transformation. <br/>
 	 * A temporal Join transformation joins the elements of two

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 6b12013..2fe3848 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -81,56 +81,56 @@ public class IterativeDataStream<T> extends SingleOutputStreamOperator<T, Iterat
 	/**
 	 * Changes the feedback type of the iteration and allows the user to apply
 	 * co-transformations on the input and feedback stream, as in a
-	 * {@link ConnectedDataStream}.
+	 * {@link ConnectedStreams}.
 	 *
 	 * <p>
 	 * For type safety the user needs to define the feedback type
 	 * 
 	 * @param feedbackTypeString
 	 *            String describing the type information of the feedback stream.
-	 * @return A {@link ConnectedIterativeDataStream}.
+	 * @return A {@link ConnectedIterativeDataStreams}.
 	 */
-	public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(String feedbackTypeString) {
+	public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(String feedbackTypeString) {
 		return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
 	}
 
 	/**
 	 * Changes the feedback type of the iteration and allows the user to apply
 	 * co-transformations on the input and feedback stream, as in a
-	 * {@link ConnectedDataStream}.
+	 * {@link ConnectedStreams}.
 	 *
 	 * <p>
 	 * For type safety the user needs to define the feedback type
 	 * 
 	 * @param feedbackTypeClass
 	 *            Class of the elements in the feedback stream.
-	 * @return A {@link ConnectedIterativeDataStream}.
+	 * @return A {@link ConnectedIterativeDataStreams}.
 	 */
-	public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
+	public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
 		return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
 	}
 
 	/**
 	 * Changes the feedback type of the iteration and allows the user to apply
 	 * co-transformations on the input and feedback stream, as in a
-	 * {@link ConnectedDataStream}.
+	 * {@link ConnectedStreams}.
 	 *
 	 * <p>
 	 * For type safety the user needs to define the feedback type
 	 * 
 	 * @param feedbackType
 	 *            The type information of the feedback stream.
-	 * @return A {@link ConnectedIterativeDataStream}.
+	 * @return A {@link ConnectedIterativeDataStreams}.
 	 */
-	public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
-		return new ConnectedIterativeDataStream<T, F>(originalInput, feedbackType, maxWaitTime);
+	public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
+		return new ConnectedIterativeDataStreams<T, F>(originalInput, feedbackType, maxWaitTime);
 	}
 	
 	/**
-	 * The {@link ConnectedIterativeDataStream} represent a start of an
+	 * The {@link ConnectedIterativeDataStreams} represent a start of an
 	 * iterative part of a streaming program, where the original input of the
 	 * iteration and the feedback of the iteration are connected as in a
-	 * {@link ConnectedDataStream}.
+	 * {@link ConnectedStreams}.
 	 *
 	 * <p>
 	 * The user can distinguish between the two inputs using co-transformation,
@@ -142,24 +142,26 @@ public class IterativeDataStream<T> extends SingleOutputStreamOperator<T, Iterat
 	 * @param <F>
 	 *            Type of the feedback of the iteration
 	 */
-	public static class ConnectedIterativeDataStream<I, F> extends ConnectedDataStream<I, F>{
+	public static class ConnectedIterativeDataStreams<I, F> extends ConnectedStreams<I, F> {
 
 		private CoFeedbackTransformation<F> coFeedbackTransformation;
 
-		public ConnectedIterativeDataStream(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) {
+		public ConnectedIterativeDataStreams(DataStream<I> input,
+				TypeInformation<F> feedbackType,
+				long waitTime) {
 			super(input.getExecutionEnvironment(),
 					input,
 					new DataStream<F>(input.getExecutionEnvironment(),
 							new CoFeedbackTransformation<F>(input.getParallelism(),
 									feedbackType,
 									waitTime)));
-			this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecond().getTransformation();
+			this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation();
 		}
 
 		/**
 		 * Closes the iteration. This method defines the end of the iterative
 		 * program part that will be fed back to the start of the iteration as
-		 * the second input in the {@link ConnectedDataStream}.
+		 * the second input in the {@link ConnectedStreams}.
 		 * 
 		 * @param feedbackStream
 		 *            {@link DataStream} that will be used as second input to
@@ -186,34 +188,34 @@ public class IterativeDataStream<T> extends SingleOutputStreamOperator<T, Iterat
 				"Cannot change the input partitioning of an iteration head directly. Apply the partitioning on the input and feedback streams instead.");
 		
 		@Override
-		public ConnectedDataStream<I, F> groupBy(int keyPosition1, int keyPosition2) {throw groupingException;}
+		public ConnectedStreams<I, F> groupBy(int keyPosition1, int keyPosition2) {throw groupingException;}
 		
 		@Override
-		public ConnectedDataStream<I, F> groupBy(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
+		public ConnectedStreams<I, F> groupBy(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
 		
 		@Override
-		public ConnectedDataStream<I, F> groupBy(String field1, String field2) {throw groupingException;}
+		public ConnectedStreams<I, F> groupBy(String field1, String field2) {throw groupingException;}
 		
 		@Override
-		public ConnectedDataStream<I, F> groupBy(String[] fields1, String[] fields2) {throw groupingException;}
+		public ConnectedStreams<I, F> groupBy(String[] fields1, String[] fields2) {throw groupingException;}
 		
 		@Override
-		public ConnectedDataStream<I, F> groupBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
+		public ConnectedStreams<I, F> groupBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
 		
 		@Override
-		public ConnectedDataStream<I, F> partitionByHash(int keyPosition1, int keyPosition2) {throw groupingException;}
+		public ConnectedStreams<I, F> partitionByHash(int keyPosition1, int keyPosition2) {throw groupingException;}
 		
 		@Override
-		public ConnectedDataStream<I, F> partitionByHash(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
+		public ConnectedStreams<I, F> partitionByHash(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
 		
 		@Override
-		public ConnectedDataStream<I, F> partitionByHash(String field1, String field2) {throw groupingException;}
+		public ConnectedStreams<I, F> partitionByHash(String field1, String field2) {throw groupingException;}
 		
 		@Override
-		public ConnectedDataStream<I, F> partitionByHash(String[] fields1, String[] fields2) {throw groupingException;}
+		public ConnectedStreams<I, F> partitionByHash(String[] fields1, String[] fields2) {throw groupingException;}
 		
 		@Override
-		public ConnectedDataStream<I, F> partitionByHash(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {throw groupingException;}
+		public ConnectedStreams<I, F> partitionByHash(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {throw groupingException;}
 		
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
deleted file mode 100644
index e0aafb7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream.temporal;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.operators.CrossOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.co.CrossWindowFunction;
-
-public class StreamCrossOperator<I1, I2> extends
-		TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2, Tuple2<I1, I2>>> {
-
-	public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) {
-		super(input1, input2);
-	}
-
-	protected <F> F clean(F f) {
-		if (input1.getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
-			ClosureCleaner.clean(f, true);
-		}
-		ClosureCleaner.ensureSerializable(f);
-		return f;
-	}
-
-	@Override
-	protected CrossWindow<I1, I2, Tuple2<I1, I2>> createNextWindowOperator() {
-
-		CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction = new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>(
-				clean(new CrossOperator.DefaultCrossFunction<I1, I2>()));
-
-		return new CrossWindow<I1, I2, Tuple2<I1, I2>>(this, input1.connect(input2).addGeneralWindowCombine(
-				crossWindowFunction,
-				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), windowSize,
-				slideInterval, timeStamp1, timeStamp2));
-	}
-
-	public static class CrossWindow<I1, I2, R> extends
-			SingleOutputStreamOperator<R, CrossWindow<I1, I2, R>> implements
-			TemporalWindow<CrossWindow<I1, I2, R>> {
-
-		private StreamCrossOperator<I1, I2> op;
-
-		public CrossWindow(StreamCrossOperator<I1, I2> op, DataStream<R> ds) {
-			super(ds.getExecutionEnvironment(), ds.getTransformation());
-			this.op = op;
-		}
-
-		public CrossWindow<I1, I2, R> every(long length, TimeUnit timeUnit) {
-			return every(timeUnit.toMillis(length));
-		}
-
-		@SuppressWarnings("unchecked")
-		public CrossWindow<I1, I2, R> every(long length) {
-
-			CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction = new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>(
-					clean(new CrossOperator.DefaultCrossFunction<I1, I2>()));
-
-			return (CrossWindow<I1, I2, R>) new CrossWindow<I1, I2, Tuple2<I1, I2>>(op, op.input1.connect(op.input2).addGeneralWindowCombine(
-					crossWindowFunction,
-					new TupleTypeInfo<Tuple2<I1, I2>>(op.input1.getType(), op.input2.getType()), op.windowSize,
-					length, op.timeStamp1, op.timeStamp2));
-		}
-
-		/**
-		 * Finalizes a temporal Cross transformation by applying a
-		 * {@link CrossFunction} to each pair of crossed elements.<br/>
-		 * Each CrossFunction call returns exactly one element.
-		 * 
-		 * @param function
-		 *            The CrossFunction that is called for each pair of crossed
-		 *            elements.
-		 * @return The crossed data streams
-		 * 
-		 */
-		@SuppressWarnings("unchecked")
-		public <R> SingleOutputStreamOperator<R, ?> with(CrossFunction<I1, I2, R> function) {
-			TypeInformation<R> outTypeInfo = TypeExtractor.getCrossReturnTypes(function,
-					op.input1.getType(), op.input2.getType());
-
-			CrossWindowFunction<I1, I2, R> crossWindowFunction = new CrossWindowFunction<I1, I2, R>(clean(function));
-
-			return new CrossWindow<I1, I2, R>(op, op.input1.connect(op.input2).addGeneralWindowCombine(
-					crossWindowFunction,
-					outTypeInfo, op.windowSize,
-					op.slideInterval, op.timeStamp1, op.timeStamp2));
-
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
index e48d707..999d197 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
@@ -26,11 +26,9 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.co.JoinWindowFunction;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 
 public class StreamJoinOperator<I1, I2> extends
@@ -204,19 +202,20 @@ public class StreamJoinOperator<I1, I2> extends
 
 		private JoinedStream<I1, I2, Tuple2<I1, I2>> createJoinOperator() {
 
-			JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1, I2>();
-
-			JoinWindowFunction<I1, I2, Tuple2<I1, I2>> joinWindowFunction = getJoinWindowFunction(
-					joinFunction, this);
-
-			TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1, I2>>(
-					op.input1.getType(), op.input2.getType());
-
-			return new JoinedStream<I1, I2, Tuple2<I1, I2>>(this, op.input1
-					.groupBy(keys1)
-					.connect(op.input2.groupBy(keys2))
-					.addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
-							op.slideInterval, op.timeStamp1, op.timeStamp2));
+//			JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1, I2>();
+//
+//			JoinWindowFunction<I1, I2, Tuple2<I1, I2>> joinWindowFunction = getJoinWindowFunction(
+//					joinFunction, this);
+//
+//			TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1, I2>>(
+//					op.input1.getType(), op.input2.getType());
+
+//			return new JoinedStream<I1, I2, Tuple2<I1, I2>>(this, op.input1
+//					.groupBy(keys1)
+//					.connect(op.input2.groupBy(keys2))
+//					.addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
+//							op.slideInterval, op.timeStamp1, op.timeStamp2));
+			return null;
 		}
 
 		public static class JoinedStream<I1, I2, R> extends
@@ -240,15 +239,16 @@ public class StreamJoinOperator<I1, I2> extends
 				TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
 						predicate.op.input1.getType(), predicate.op.input2.getType());
 
-				JoinWindowFunction<I1, I2, OUT> joinWindowFunction = getJoinWindowFunction(joinFunction, predicate);
+//				JoinWindowFunction<I1, I2, OUT> joinWindowFunction = getJoinWindowFunction(joinFunction, predicate);
+//
 
-
-				return new JoinedStream<I1, I2, OUT>(
-						predicate, predicate.op.input1
-						.groupBy(predicate.keys1)
-						.connect(predicate.op.input2.groupBy(predicate.keys2))
-						.addGeneralWindowCombine(joinWindowFunction, outType, predicate.op.windowSize,
-								predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2));
+//				return new JoinedStream<I1, I2, OUT>(
+//						predicate, predicate.op.input1
+//						.groupBy(predicate.keys1)
+//						.connect(predicate.op.input2.groupBy(predicate.keys2))
+//						.addGeneralWindowCombine(joinWindowFunction, outType, predicate.op.windowSize,
+//								predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2));
+				return null;
 			}
 		}
 	}
@@ -267,8 +267,8 @@ public class StreamJoinOperator<I1, I2> extends
 		}
 	}
 
-	private static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction(
-			JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) {
-		return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction);
-	}
+//	private static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction(
+//			JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) {
+//		return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction);
+//	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoReduceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoReduceFunction.java
deleted file mode 100644
index 9518d4b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoReduceFunction.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-
-/**
- * The CoReduceFunction interface represents a Reduce transformation with two
- * different input streams. The reduce1 function combine groups of elements of
- * the first input with the same key to a single value, while reduce2 combine
- * groups of elements of the second input with the same key to a single value.
- * Each produced values are mapped to the same type by map1 and map2,
- * respectively, to form one output stream.
- * 
- * The basic syntax for using a grouped ReduceFunction is as follows:
- * 
- * <pre>
- * <blockquote>
- * ConnectedDataStream<X> input = ...;
- * 
- * ConnectedDataStream<X> result = input.groupBy(keyPosition1, keyPosition2)
- *          .reduce(new MyCoReduceFunction(), keyPosition1, keyPosition2).addSink(...);
- * </blockquote>
- * </pre>
- * <p>
- * 
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable {
-
-	/**
-	 * The core method of CoReduceFunction, combining two values of the first
-	 * input into one value of the same type. The reduce1 function is
-	 * consecutively applied to all values of a group until only a single value
-	 * remains.
-	 *
-	 * @param value1
-	 *            The first value to combine.
-	 * @param value2
-	 *            The second value to combine.
-	 * @return The combined value of both input values.
-	 *
-	 * @throws Exception
-	 *             This method may throw exceptions. Throwing an exception will
-	 *             cause the operation to fail and may trigger recovery.
-	 */
-	IN1 reduce1(IN1 value1, IN1 value2) throws Exception;
-
-	/**
-	 * The core method of ReduceFunction, combining two values of the second
-	 * input into one value of the same type. The reduce2 function is
-	 * consecutively applied to all values of a group until only a single value
-	 * remains.
-	 *
-	 * @param value1
-	 *            The first value to combine.
-	 * @param value2
-	 *            The second value to combine.
-	 * @return The combined value of both input values.
-	 *
-	 * @throws Exception
-	 *             This method may throw exceptions. Throwing an exception will
-	 *             cause the operation to fail and may trigger recovery.
-	 */
-	IN2 reduce2(IN2 value1, IN2 value2) throws Exception;
-
-	/**
-	 * Maps the reduced first input to the output type.
-	 * 
-	 * @param value
-	 *            Type of the first input.
-	 * @return the output type.
-	 */
-	OUT map1(IN1 value) throws Exception;
-
-	/**
-	 * Maps the reduced second input to the output type.
-	 * 
-	 * @param value
-	 *            Type of the second input.
-	 * @return the output type.
-	 */
-	OUT map2(IN2 value) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoWindowFunction.java
deleted file mode 100644
index 1ae5137..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoWindowFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.util.Collector;
-
-public interface CoWindowFunction<IN1, IN2, O> extends Function, Serializable {
-
-	void coWindow(List<IN1> first, List<IN2> second, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CrossWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CrossWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CrossWindowFunction.java
deleted file mode 100644
index e9c0169..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CrossWindowFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.util.Collector;
-
-public class CrossWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private CrossFunction<IN1, IN2, OUT> crossFunction;
-
-	public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) {
-		this.crossFunction = crossFunction;
-	}
-
-	@Override
-	public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
-		for (IN1 firstValue : first) {
-			for (IN2 secondValue : second) {
-				out.collect(crossFunction.cross(firstValue, secondValue));
-			}
-		}
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/JoinWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/JoinWindowFunction.java
deleted file mode 100644
index 6f658c6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/JoinWindowFunction.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.util.Collector;
-
-public class JoinWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private KeySelector<IN1, ?> keySelector1;
-	private KeySelector<IN2, ?> keySelector2;
-	private JoinFunction<IN1, IN2, OUT> joinFunction;
-
-	public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2,
-			JoinFunction<IN1, IN2, OUT> joinFunction) {
-		this.keySelector1 = keySelector1;
-		this.keySelector2 = keySelector2;
-		this.joinFunction = joinFunction;
-	}
-
-	@Override
-	public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
-
-		Map<Object, List<IN1>> map = build(first);
-
-		for (IN2 record : second) {
-			Object key = keySelector2.getKey(record);
-			List<IN1> match = map.get(key);
-			if (match != null) {
-				for (IN1 matching : match) {
-					out.collect(joinFunction.join(matching, record));
-				}
-			}
-		}
-
-	}
-
-	private Map<Object, List<IN1>> build(List<IN1> records) throws Exception {
-
-		Map<Object, List<IN1>> map = new HashMap<Object, List<IN1>>();
-
-		for (IN1 record : records) {
-			Object key = keySelector1.getKey(record);
-			List<IN1> current = map.get(key);
-			if (current == null) {
-				current = new LinkedList<IN1>();
-				map.put(key, current);
-			}
-			current.add(record);
-		}
-
-		return map;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoReduceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoReduceFunction.java
deleted file mode 100644
index d3e6f3a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoReduceFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoReduceFunction represents a Reduce transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public abstract class RichCoReduceFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
-		CoReduceFunction<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoWindowFunction.java
deleted file mode 100644
index e317065..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoWindowFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.util.Collector;
-
-public abstract class RichCoWindowFunction<IN1, IN2, O> extends AbstractRichFunction implements
-		CoWindowFunction<IN1, IN2, O> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public abstract void coWindow(List<IN1> first, List<IN2> second, Collector<O> out)
-			throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
deleted file mode 100644
index b46a929..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected KeySelector<IN1, ?> keySelector1;
-	protected KeySelector<IN2, ?> keySelector2;
-	private Map<Object, IN1> values1;
-	private Map<Object, IN2> values2;
-	IN1 reduced1;
-	IN2 reduced2;
-
-	public CoStreamGroupedReduce(CoReduceFunction<IN1, IN2, OUT> coReducer,
-			KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
-		super(coReducer);
-		this.keySelector1 = keySelector1;
-		this.keySelector2 = keySelector2;
-		values1 = new HashMap<Object, IN1>();
-		values2 = new HashMap<Object, IN2>();
-	}
-
-	@Override
-	public void processElement1(StreamRecord<IN1> elementRecord) throws Exception {
-		IN1 element = elementRecord.getValue();
-		Object key = keySelector1.getKey(element);
-		currentValue1 = values1.get(key);
-		if (currentValue1 != null) {
-			reduced1 = userFunction.reduce1(currentValue1, element);
-			values1.put(key, reduced1);
-			output.collect(elementRecord.replace(userFunction.map1(reduced1)));
-		} else {
-			values1.put(key, element);
-			output.collect(elementRecord.replace(userFunction.map1(element)));
-		}
-	}
-
-	@Override
-	public void processElement2(StreamRecord<IN2> elementRecord) throws Exception {
-		IN2 element = elementRecord.getValue();
-
-		Object key = keySelector2.getKey(element);
-		currentValue2 = values2.get(key);
-		if (currentValue2 != null) {
-			reduced2 = userFunction.reduce2(currentValue2, element);
-			values2.put(key, reduced2);
-			output.collect(elementRecord.replace(userFunction.map2(reduced2)));
-		} else {
-			values2.put(key, element);
-			output.collect(elementRecord.replace(userFunction.map2(element)));
-		}
-	}
-}