You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:20 UTC

[04/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
new file mode 100644
index 0000000..f25c995
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+
+public interface OutputSelectorWrapper<OUT> extends Serializable {
+
+	public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge);
+
+	public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
new file mode 100644
index 0000000..dca2ede
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.util.List;
+
+public class OutputSelectorWrapperFactory {
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public static OutputSelectorWrapper<?> create(List<OutputSelector<?>> outputSelectors) {
+		if (outputSelectors.size() == 0) {
+			return new BroadcastOutputSelectorWrapper();
+		} else {
+			return new DirectedOutputSelectorWrapper(outputSelectors);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
new file mode 100644
index 0000000..7191304
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.windowing.FoldAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+
+/**
+ * A {@code AllWindowedStream} represents a data stream where the stream of
+ * elements is split into windows based on a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
+ * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be
+ * used to evict elements from the window after
+ * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
+ * When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ *
+ * <p>
+ * Note that the {@code AllWindowedStream} is purely and API construct, during runtime
+ * the {@code AllWindowedStream} will be collapsed together with the
+ * operation over the window into one single operation.
+ *
+ * @param <T> The type of elements in the stream.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
+ */
+public class AllWindowedStream<T, W extends Window> {
+
+	/** The data stream that is windowed by this stream */
+	private final DataStream<T> input;
+
+	/** The window assigner */
+	private final WindowAssigner<? super T, W> windowAssigner;
+
+	/** The trigger that is used for window evaluation/emission. */
+	private Trigger<? super T, ? super W> trigger;
+
+	/** The evictor that is used for evicting elements before window evaluation. */
+	private Evictor<? super T, ? super W> evictor;
+
+
+	public AllWindowedStream(DataStream<T> input,
+			WindowAssigner<? super T, W> windowAssigner) {
+		this.input = input;
+		this.windowAssigner = windowAssigner;
+		this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
+	}
+
+	/**
+	 * Sets the {@code Trigger} that should be used to trigger window emission.
+	 */
+	public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
+		this.trigger = trigger;
+		return this;
+	}
+
+	/**
+	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+	 *
+	 * <p>
+	 * Note: When using an evictor window performance will degrade significantly, since
+	 * pre-aggregation of window results cannot be used.
+	 */
+	public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
+		this.evictor = evictor;
+		return this;
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Operations on the keyed windows
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Applies a reduce function to the window. The window function is called for each evaluation
+	 * of the window for each key individually. The output of the reduce function is interpreted
+	 * as a regular non-windowed stream.
+	 * <p>
+	 * This window will try and pre-aggregate data as much as the window policies permit. For example,
+	 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+	 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
+	 * so a few elements are stored per key (one per slide interval).
+	 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+	 * aggregation tree.
+	 * 
+	 * @param function The reduce function.
+	 * @return The data stream that is the result of applying the reduce function to the window. 
+	 */
+	public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
+		//clean the closure
+		function = input.getExecutionEnvironment().clean(function);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "Reduce at " + callLocation;
+
+		SingleOutputStreamOperator<T, ?> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
+		if (result != null) {
+			return result;
+		}
+
+		String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+		OneInputStreamOperator<T, T> operator;
+
+		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+		if (evictor != null) {
+			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					new HeapWindowBuffer.Factory<T>(),
+					new ReduceAllWindowFunction<W, T>(function),
+					trigger,
+					evictor).enableSetProcessingTime(setProcessingTime);
+
+		} else {
+			// we need to copy because we need our own instance of the pre aggregator
+			@SuppressWarnings("unchecked")
+			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
+
+			operator = new NonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+					new ReduceAllWindowFunction<W, T>(function),
+					trigger).enableSetProcessingTime(setProcessingTime);
+		}
+
+		return input.transform(opName, input.getType(), operator).setParallelism(1);
+	}
+
+	/**
+	 * Applies the given fold function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the reduce function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * @param function The fold function.
+	 * @return The data stream that is the result of applying the fold function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function) {
+		//clean the closure
+		function = input.getExecutionEnvironment().clean(function);
+
+		TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
+				Utils.getCallLocationName(), true);
+
+		return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
+	}
+
+	/**
+	 * Applies the given fold function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the reduce function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * @param function The fold function.
+	 * @return The data stream that is the result of applying the fold function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
+		//clean the closure
+		function = input.getExecutionEnvironment().clean(function);
+		return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
+	}
+
+	/**
+	 * Applies a window function to the window. The window function is called for each evaluation
+	 * of the window for each key individually. The output of the window function is interpreted
+	 * as a regular non-windowed stream.
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of pre-aggregation.
+	 * 
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function) {
+		TypeInformation<T> inType = input.getType();
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, AllWindowFunction.class, true, true, inType, null, false);
+
+		return apply(function, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each evaluation
+	 * of the window for each key individually. The output of the window function is interpreted
+	 * as a regular non-windowed stream.
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of pre-aggregation.
+	 *
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+		//clean the closure
+		function = input.getExecutionEnvironment().clean(function);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowApply at " + callLocation;
+
+		SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
+		if (result != null) {
+			return result;
+		}
+
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+		NonKeyedWindowOperator<T, R, W> operator;
+
+		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+		if (evictor != null) {
+			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger,
+					evictor).enableSetProcessingTime(setProcessingTime);
+
+		} else {
+			operator = new NonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger).enableSetProcessingTime(setProcessingTime);
+		}
+
+		return input.transform(opName, resultType, operator).setParallelism(1);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+	 *
+	 * @param preAggregator The reduce function that is used for pre-aggregation
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+
+	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function) {
+		TypeInformation<T> inType = input.getType();
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, AllWindowFunction.class, true, true, inType, null, false);
+
+		return apply(preAggregator, function, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+	 *
+	 * @param preAggregator The reduce function that is used for pre-aggregation
+	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+		//clean the closures
+		function = input.getExecutionEnvironment().clean(function);
+		preAggregator = input.getExecutionEnvironment().clean(preAggregator);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowApply at " + callLocation;
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+		OneInputStreamOperator<T, R> operator;
+
+		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+		if (evictor != null) {
+			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger,
+					evictor).enableSetProcessingTime(setProcessingTime);
+
+		} else {
+			operator = new NonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
+					function,
+					trigger).enableSetProcessingTime(setProcessingTime);
+		}
+
+		return input.transform(opName, resultType, operator).setParallelism(1);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Aggregations on the  windows
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Applies an aggregation that sums every window of the data stream at the
+	 * given position.
+	 *
+	 * @param positionToSum The position in the tuple/array to sum
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
+		return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that sums every window of the pojo data stream at
+	 * the given field for every window.
+	 *
+	 * <p>
+	 * A field expression is either
+	 * the name of a public field or a getter method with parentheses of the
+	 * stream's underlying type. A dot can be used to drill down into objects,
+	 * as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field to sum
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> sum(String field) {
+		return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum value of every window
+	 * of the data stream at the given position.
+	 *
+	 * @param positionToMin The position to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
+		return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum value of the pojo data
+	 * stream at the given field expression for every window.
+	 *
+	 * <p>
+	 * A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field expression based on which the aggregation will be applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> min(String field) {
+		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns the first element by default.
+	 *
+	 * @param positionToMinBy
+	 *            The position to minimize by
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns the first element by default.
+	 *
+	 * @param positionToMinBy The position to minimize by
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns either the first or last one depending
+	 * on the parameter setting.
+	 *
+	 * @param positionToMinBy The position to minimize
+	 * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
+		return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum element of the pojo
+	 * data stream by the given field expression for every window. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field expression based on which the aggregation will be applied.
+	 * @param first If True then in case of field equality the first object will be returned
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
+		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum value of every window of
+	 * the data stream at the given position.
+	 *
+	 * @param positionToMax The position to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
+		return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum value of the pojo data
+	 * stream at the given field expression for every window. A field expression
+	 * is either the name of a public field or a getter method with parentheses
+	 * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
+	 * down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field expression based on which the aggregation will be applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> max(String field) {
+		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns the first by default.
+	 *
+	 * @param positionToMaxBy
+	 *            The position to maximize by
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns the first by default.
+	 *
+	 * @param positionToMaxBy
+	 *            The position to maximize by
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns either the first or last one depending
+	 * on the parameter setting.
+	 *
+	 * @param positionToMaxBy The position to maximize by
+	 * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
+		return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum element of the pojo
+	 * data stream by the given field expression for every window. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field The field expression based on which the aggregation will be applied.
+	 * @param first If True then in case of field equality the first object will be returned
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
+		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
+	}
+
+	private SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregator) {
+		return reduce(aggregator);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+
+	private <R> SingleOutputStreamOperator<R, ?> createFastTimeOperatorIfValid(
+			Function function,
+			TypeInformation<R> resultType,
+			String functionName) {
+
+		// TODO: add once non-parallel fast aligned time windows operator is ready
+		return null;
+	}
+
+	public StreamExecutionEnvironment getExecutionEnvironment() {
+		return input.getExecutionEnvironment();
+	}
+
+	public TypeInformation<T> getInputType() {
+		return input.getType();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
new file mode 100644
index 0000000..d1da783
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *{@code CoGroupedStreams} represents two {@link DataStream DataStreams} that have been co-grouped.
+ * A streaming co-group operation is evaluated over elements in a window.
+ *
+ * <p>
+ * To finalize co-group operation you also need to specify a {@link KeySelector} for
+ * both the first and second input and a {@link WindowAssigner}.
+ *
+ * <p>
+ * Note: Right now, the groups are being built in memory so you need to ensure that they don't
+ * get too big. Otherwise the JVM might crash.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> one = ...;
+ * DataStream<Tuple2<String, Integer>> two = ...;
+ *
+ * DataStream<T> result = one.coGroup(two)
+ *     .where(new MyFirstKeySelector())
+ *     .equalTo(new MyFirstKeySelector())
+ *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ *     .apply(new MyCoGroupFunction());
+ * } </pre>
+ */
+public class CoGroupedStreams<T1, T2> {
+
+	/** The first input stream */
+	private final DataStream<T1> input1;
+
+	/** The second input stream */
+	private final DataStream<T2> input2;
+
+	/**
+	 * Creates new CoGroped data streams, which are the first step towards building a streaming co-group.
+	 * 
+	 * @param input1 The first data stream.
+	 * @param input2 The second data stream.
+	 */
+	public CoGroupedStreams(DataStream<T1> input1, DataStream<T2> input2) {
+		this.input1 = requireNonNull(input1);
+		this.input2 = requireNonNull(input2);
+	}
+
+	/**
+	 * Specifies a {@link KeySelector} for elements from the first input.
+	 */
+	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
+		TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+		return new Where<>(input1.clean(keySelector), keyType);
+	}
+
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * CoGrouped streams that have the key for one side defined.
+	 * 
+	 * @param <KEY> The type of the key.
+	 */
+	public class Where<KEY> {
+
+		private final KeySelector<T1, KEY> keySelector1;
+		private final TypeInformation<KEY> keyType;
+
+		Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
+			this.keySelector1 = keySelector1;
+			this.keyType = keyType;
+		}
+	
+		/**
+		 * Specifies a {@link KeySelector} for elements from the second input.
+		 */
+		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
+			TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+			if (!otherKey.equals(this.keyType)) {
+				throw new IllegalArgumentException("The keys for the two inputs are not equal: " + 
+						"first key = " + this.keyType + " , second key = " + otherKey);
+			}
+			
+			return new EqualTo(input2.clean(keySelector));
+		}
+
+		// --------------------------------------------------------------------
+		
+		/**
+		 * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
+		 */
+		public class EqualTo {
+
+			private final KeySelector<T2, KEY> keySelector2;
+
+			EqualTo(KeySelector<T2, KEY> keySelector2) {
+				this.keySelector2 = requireNonNull(keySelector2);
+			}
+
+			/**
+			 * Specifies the window on which the co-group operation works.
+			 */
+			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
+				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
+	 * well as a {@link WindowAssigner}.
+	 *
+	 * @param <T1> Type of the elements from the first input
+	 * @param <T2> Type of the elements from the second input
+	 * @param <KEY> Type of the key. This must be the same for both inputs
+	 * @param <W> Type of {@link Window} on which the co-group operation works.
+	 */
+	public static class WithWindow<T1, T2, KEY, W extends Window> {
+		private final DataStream<T1> input1;
+		private final DataStream<T2> input2;
+
+		private final KeySelector<T1, KEY> keySelector1;
+		private final KeySelector<T2, KEY> keySelector2;
+		
+		private final TypeInformation<KEY> keyType;
+
+		private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
+
+		private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
+
+		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
+
+		protected WithWindow(DataStream<T1> input1,
+				DataStream<T2> input2,
+				KeySelector<T1, KEY> keySelector1,
+				KeySelector<T2, KEY> keySelector2,
+				TypeInformation<KEY> keyType,
+				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
+				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
+				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
+			this.input1 = input1;
+			this.input2 = input2;
+
+			this.keySelector1 = keySelector1;
+			this.keySelector2 = keySelector2;
+			this.keyType = keyType;
+			
+			this.windowAssigner = windowAssigner;
+			this.trigger = trigger;
+			this.evictor = evictor;
+		}
+
+		/**
+		 * Sets the {@code Trigger} that should be used to trigger window emission.
+		 */
+		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+					windowAssigner, newTrigger, evictor);
+		}
+
+		/**
+		 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+		 *
+		 * <p>
+		 * Note: When using an evictor window performance will degrade significantly, since
+		 * pre-aggregation of window results cannot be used.
+		 */
+		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+					windowAssigner, trigger, newEvictor);
+		}
+
+		/**
+		 * Completes the co-group operation with the user function that is executed
+		 * for windowed groups.
+		 */
+		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
+
+			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
+					function,
+					CoGroupFunction.class,
+					true,
+					true,
+					input1.getType(),
+					input2.getType(),
+					"CoGroup",
+					false);
+
+			return apply(function, resultType);
+		}
+
+		/**
+		 * Completes the co-group operation with the user function that is executed
+		 * for windowed groups.
+		 */
+		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
+			//clean the closure
+			function = input1.getExecutionEnvironment().clean(function);
+
+			UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
+			UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);
+			
+			DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
+					.map(new Input1Tagger<T1, T2>())
+					.returns(unionType);
+			DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
+					.map(new Input2Tagger<T1, T2>())
+					.returns(unionType);
+
+			DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
+			
+			// we explicitly create the keyed stream to manually pass the key type information in
+			WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp = 
+					new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
+					.window(windowAssigner);
+
+			if (trigger != null) {
+				windowOp.trigger(trigger);
+			}
+			if (evictor != null) {
+				windowOp.evictor(evictor);
+			}
+
+			return windowOp.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Data type and type information for Tagged Union
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Internal class for implementing tagged union co-group.
+	 */
+	public static class TaggedUnion<T1, T2> {
+		private final T1 one;
+		private final T2 two;
+
+		private TaggedUnion(T1 one, T2 two) {
+			this.one = one;
+			this.two = two;
+		}
+
+		public boolean isOne() {
+			return one != null;
+		}
+
+		public boolean isTwo() {
+			return two != null;
+		}
+
+		public T1 getOne() {
+			return one;
+		}
+
+		public T2 getTwo() {
+			return two;
+		}
+
+		public static <T1, T2> TaggedUnion<T1, T2> one(T1 one) {
+			return new TaggedUnion<>(one, null);
+		}
+
+		public static <T1, T2> TaggedUnion<T1, T2> two(T2 two) {
+			return new TaggedUnion<>(null, two);
+		}
+	}
+
+	private static class UnionTypeInfo<T1, T2> extends TypeInformation<TaggedUnion<T1, T2>> {
+		private static final long serialVersionUID = 1L;
+
+		TypeInformation<T1> oneType;
+		TypeInformation<T2> twoType;
+
+		public UnionTypeInfo(TypeInformation<T1> oneType,
+				TypeInformation<T2> twoType) {
+			this.oneType = oneType;
+			this.twoType = twoType;
+		}
+
+		@Override
+		public boolean isBasicType() {
+			return false;
+		}
+
+		@Override
+		public boolean isTupleType() {
+			return false;
+		}
+
+		@Override
+		public int getArity() {
+			return 2;
+		}
+
+		@Override
+		public int getTotalFields() {
+			return 2;
+		}
+
+		@Override
+		@SuppressWarnings("unchecked, rawtypes")
+		public Class<TaggedUnion<T1, T2>> getTypeClass() {
+			return (Class) TaggedUnion.class;
+		}
+
+		@Override
+		public boolean isKeyType() {
+			return true;
+		}
+
+		@Override
+		public TypeSerializer<TaggedUnion<T1, T2>> createSerializer(ExecutionConfig config) {
+			return new UnionSerializer<>(oneType.createSerializer(config), twoType.createSerializer(config));
+		}
+
+		@Override
+		public String toString() {
+			return "TaggedUnion<" + oneType + ", " + twoType + ">";
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof UnionTypeInfo) {
+				@SuppressWarnings("unchecked")
+				UnionTypeInfo<T1, T2> unionTypeInfo = (UnionTypeInfo<T1, T2>) obj;
+
+				return unionTypeInfo.canEqual(this) && oneType.equals(unionTypeInfo.oneType) && twoType.equals(unionTypeInfo.twoType);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			return 31 *  oneType.hashCode() + twoType.hashCode();
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof UnionTypeInfo;
+		}
+	}
+
+	private static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
+		private static final long serialVersionUID = 1L;
+
+		private final TypeSerializer<T1> oneSerializer;
+		private final TypeSerializer<T2> twoSerializer;
+
+		public UnionSerializer(TypeSerializer<T1> oneSerializer,
+				TypeSerializer<T2> twoSerializer) {
+			this.oneSerializer = oneSerializer;
+			this.twoSerializer = twoSerializer;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<TaggedUnion<T1, T2>> duplicate() {
+			return this;
+		}
+
+		@Override
+		public TaggedUnion<T1, T2> createInstance() {
+			return null;
+		}
+
+		@Override
+		public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from) {
+			if (from.isOne()) {
+				return TaggedUnion.one(oneSerializer.copy(from.getOne()));
+			} else {
+				return TaggedUnion.two(twoSerializer.copy(from.getTwo()));
+			}
+		}
+
+		@Override
+		public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from, TaggedUnion<T1, T2> reuse) {
+			if (from.isOne()) {
+				return TaggedUnion.one(oneSerializer.copy(from.getOne()));
+			} else {
+				return TaggedUnion.two(twoSerializer.copy(from.getTwo()));
+			}		}
+
+		@Override
+		public int getLength() {
+			return -1;
+		}
+
+		@Override
+		public void serialize(TaggedUnion<T1, T2> record, DataOutputView target) throws IOException {
+			if (record.isOne()) {
+				target.writeByte(1);
+				oneSerializer.serialize(record.getOne(), target);
+			} else {
+				target.writeByte(2);
+				twoSerializer.serialize(record.getTwo(), target);
+			}
+		}
+
+		@Override
+		public TaggedUnion<T1, T2> deserialize(DataInputView source) throws IOException {
+			byte tag = source.readByte();
+			if (tag == 1) {
+				return TaggedUnion.one(oneSerializer.deserialize(source));
+			} else {
+				return TaggedUnion.two(twoSerializer.deserialize(source));
+			}
+		}
+
+		@Override
+		public TaggedUnion<T1, T2> deserialize(TaggedUnion<T1, T2> reuse,
+				DataInputView source) throws IOException {
+			byte tag = source.readByte();
+			if (tag == 1) {
+				return TaggedUnion.one(oneSerializer.deserialize(source));
+			} else {
+				return TaggedUnion.two(twoSerializer.deserialize(source));
+			}
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			byte tag = source.readByte();
+			target.writeByte(tag);
+			if (tag == 1) {
+				oneSerializer.copy(source, target);
+			} else {
+				twoSerializer.copy(source, target);
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			return 31 * oneSerializer.hashCode() + twoSerializer.hashCode();
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public boolean equals(Object obj) {
+			if (obj instanceof UnionSerializer) {
+				UnionSerializer<T1, T2> other = (UnionSerializer<T1, T2>) obj;
+
+				return other.canEqual(this) && oneSerializer.equals(other.oneSerializer) && twoSerializer.equals(other.twoSerializer);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof UnionSerializer;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utility functions that implement the CoGroup logic based on the tagged
+	//  untion window reduce
+	// ------------------------------------------------------------------------
+	
+	private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public TaggedUnion<T1, T2> map(T1 value) throws Exception {
+			return TaggedUnion.one(value);
+		}
+	}
+
+	private static class Input2Tagger<T1, T2> implements MapFunction<T2, TaggedUnion<T1, T2>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public TaggedUnion<T1, T2> map(T2 value) throws Exception {
+			return TaggedUnion.two(value);
+		}
+	}
+
+	private static class UnionKeySelector<T1, T2, KEY> implements KeySelector<TaggedUnion<T1, T2>, KEY> {
+		private static final long serialVersionUID = 1L;
+
+		private final KeySelector<T1, KEY> keySelector1;
+		private final KeySelector<T2, KEY> keySelector2;
+
+		public UnionKeySelector(KeySelector<T1, KEY> keySelector1,
+				KeySelector<T2, KEY> keySelector2) {
+			this.keySelector1 = keySelector1;
+			this.keySelector2 = keySelector2;
+		}
+
+		@Override
+		public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{
+			if (value.isOne()) {
+				return keySelector1.getKey(value.getOne());
+			} else {
+				return keySelector2.getKey(value.getTwo());
+			}
+		}
+	}
+
+	private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
+			extends WrappingFunction<CoGroupFunction<T1, T2, T>>
+			implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
+		
+		private static final long serialVersionUID = 1L;
+
+		public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
+			super(userFunction);
+		}
+
+		@Override
+		public void apply(KEY key,
+				W window,
+				Iterable<TaggedUnion<T1, T2>> values,
+				Collector<T> out) throws Exception {
+			
+			List<T1> oneValues = new ArrayList<>();
+			List<T2> twoValues = new ArrayList<>();
+			
+			for (TaggedUnion<T1, T2> val: values) {
+				if (val.isOne()) {
+					oneValues.add(val.getOne());
+				} else {
+					twoValues.add(val.getTwo());
+				}
+			}
+			wrappedFunction.coGroup(oneValues, twoValues, out);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
new file mode 100644
index 0000000..4074a1d
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+
+/**
+ * {@code ConnectedStreams} represents two connected streams of (possible) different data types. It
+ * can be used to apply transformations such as {@link CoMapFunction} on two
+ * {@link DataStream DataStreams}
+ * 
+ * @param <IN1> Type of the first input data steam.
+ * @param <IN2> Type of the second input data stream.
+ */
+public class ConnectedStreams<IN1, IN2> {
+
+	protected StreamExecutionEnvironment environment;
+	protected DataStream<IN1> inputStream1;
+	protected DataStream<IN2> inputStream2;
+
+	protected ConnectedStreams(StreamExecutionEnvironment env,
+			DataStream<IN1> input1,
+			DataStream<IN2> input2) {
+		this.environment = env;
+		if (input1 != null) {
+			this.inputStream1 = input1;
+		}
+		if (input2 != null) {
+			this.inputStream2 = input2;
+		}
+	}
+
+	public StreamExecutionEnvironment getExecutionEnvironment() {
+		return environment;
+	}
+
+	/**
+	 * Returns the first {@link DataStream}.
+	 *
+	 * @return The first DataStream.
+	 */
+	public DataStream<IN1> getFirstInput() {
+		return inputStream1;
+	}
+
+	/**
+	 * Returns the second {@link DataStream}.
+	 *
+	 * @return The second DataStream.
+	 */
+	public DataStream<IN2> getSecondInput() {
+		return inputStream2;
+	}
+
+	/**
+	 * Gets the type of the first input
+	 *
+	 * @return The type of the first input
+	 */
+	public TypeInformation<IN1> getType1() {
+		return inputStream1.getType();
+	}
+
+	/**
+	 * Gets the type of the second input
+	 *
+	 * @return The type of the second input
+	 */
+	public TypeInformation<IN2> getType2() {
+		return inputStream2.getType();
+	}
+
+	/**
+	 * KeyBy operation for connected data stream. Assigns keys to the elements of
+	 * input1 and input2 according to keyPosition1 and keyPosition2.
+	 *
+	 * @param keyPosition1
+	 *            The field used to compute the hashcode of the elements in the
+	 *            first input stream.
+	 * @param keyPosition2
+	 *            The field used to compute the hashcode of the elements in the
+	 *            second input stream.
+	 * @return The grouped {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> keyBy(int keyPosition1, int keyPosition2) {
+		return new ConnectedStreams<>(this.environment, inputStream1.keyBy(keyPosition1),
+				inputStream2.keyBy(keyPosition2));
+	}
+
+	/**
+	 * KeyBy operation for connected data stream. Assigns keys to the elements of
+	 * input1 and input2 according to keyPositions1 and keyPositions2.
+	 *
+	 * @param keyPositions1
+	 *            The fields used to group the first input stream.
+	 * @param keyPositions2
+	 *            The fields used to group the second input stream.
+	 * @return The grouped {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> keyBy(int[] keyPositions1, int[] keyPositions2) {
+		return new ConnectedStreams<>(environment, inputStream1.keyBy(keyPositions1),
+				inputStream2.keyBy(keyPositions2));
+	}
+
+	/**
+	 * KeyBy operation for connected data stream using key expressions. Assigns keys to
+	 * the elements of input1 and input2 according to field1 and field2. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field1
+	 *            The grouping expression for the first input
+	 * @param field2
+	 *            The grouping expression for the second input
+	 * @return The grouped {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> keyBy(String field1, String field2) {
+		return new ConnectedStreams<>(environment, inputStream1.keyBy(field1),
+				inputStream2.keyBy(field2));
+	}
+
+	/**
+	 * KeyBy operation for connected data stream using key expressions.
+	 * the elements of input1 and input2 according to fields1 and fields2. A
+	 * field expression is either the name of a public field or a getter method
+	 * with parentheses of the {@link DataStream}S underlying type. A dot can be
+	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+	 * .
+	 *
+	 * @param fields1
+	 *            The grouping expressions for the first input
+	 * @param fields2
+	 *            The grouping expressions for the second input
+	 * @return The grouped {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> keyBy(String[] fields1, String[] fields2) {
+		return new ConnectedStreams<>(environment, inputStream1.keyBy(fields1),
+				inputStream2.keyBy(fields2));
+	}
+
+	/**
+	 * KeyBy operation for connected data stream. Assigns keys to the elements of
+	 * input1 and input2 using keySelector1 and keySelector2.
+	 *
+	 * @param keySelector1
+	 *            The {@link KeySelector} used for grouping the first input
+	 * @param keySelector2
+	 *            The {@link KeySelector} used for grouping the second input
+	 * @return The partitioned {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
+		return new ConnectedStreams<>(environment, inputStream1.keyBy(keySelector1),
+				inputStream2.keyBy(keySelector2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream. Partitions the elements of
+	 * input1 and input2 according to keyPosition1 and keyPosition2.
+	 *
+	 * @param keyPosition1
+	 *            The field used to compute the hashcode of the elements in the
+	 *            first input stream.
+	 * @param keyPosition2
+	 *            The field used to compute the hashcode of the elements in the
+	 *            second input stream.
+	 * @return The partitioned {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> partitionByHash(int keyPosition1, int keyPosition2) {
+		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keyPosition1),
+				inputStream2.partitionByHash(keyPosition2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream. Partitions the elements of
+	 * input1 and input2 according to keyPositions1 and keyPositions2.
+	 *
+	 * @param keyPositions1
+	 *            The fields used to group the first input stream.
+	 * @param keyPositions2
+	 *            The fields used to group the second input stream.
+	 * @return The partitioned {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> partitionByHash(int[] keyPositions1, int[] keyPositions2) {
+		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keyPositions1),
+				inputStream2.partitionByHash(keyPositions2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream using key expressions. Partitions
+	 * the elements of input1 and input2 according to field1 and field2. A
+	 * field expression is either the name of a public field or a getter method
+	 * with parentheses of the {@link DataStream}s underlying type. A dot can be
+	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+	 *
+	 * @param field1
+	 *            The partitioning expressions for the first input
+	 * @param field2
+	 *            The partitioning expressions for the second input
+	 * @return The partitioned {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> partitionByHash(String field1, String field2) {
+		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(field1),
+				inputStream2.partitionByHash(field2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream using key expressions. Partitions
+	 * the elements of input1 and input2 according to fields1 and fields2. A
+	 * field expression is either the name of a public field or a getter method
+	 * with parentheses of the {@link DataStream}s underlying type. A dot can be
+	 * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+	 *
+	 * @param fields1
+	 *            The partitioning expressions for the first input
+	 * @param fields2
+	 *            The partitioning expressions for the second input
+	 * @return The partitioned {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> partitionByHash(String[] fields1, String[] fields2) {
+		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(fields1),
+				inputStream2.partitionByHash(fields2));
+	}
+
+	/**
+	 * PartitionBy operation for connected data stream. Partitions the elements of
+	 * input1 and input2 using keySelector1 and keySelector2.
+	 *
+	 * @param keySelector1
+	 *            The {@link KeySelector} used for partitioning the first input
+	 * @param keySelector2
+	 *            The {@link KeySelector} used for partitioning the second input
+	 * @return @return The partitioned {@link ConnectedStreams}
+	 */
+	public ConnectedStreams<IN1, IN2> partitionByHash(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
+		return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keySelector1),
+				inputStream2.partitionByHash(keySelector2));
+	}
+
+	/**
+	 * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
+	 * the output to a common type. The transformation calls a
+	 * {@link CoMapFunction#map1} for each element of the first input and
+	 * {@link CoMapFunction#map2} for each element of the second input. Each
+	 * CoMapFunction call returns exactly one element.
+	 * 
+	 * @param coMapper The CoMapFunction used to jointly transform the two input DataStreams
+	 * @return The transformed {@link DataStream}
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
+
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
+				CoMapFunction.class, false, true, getType1(), getType2(),
+				Utils.getCallLocationName(), true);
+
+		return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper)));
+
+	}
+
+	/**
+	 * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
+	 * maps the output to a common type. The transformation calls a
+	 * {@link CoFlatMapFunction#flatMap1} for each element of the first input
+	 * and {@link CoFlatMapFunction#flatMap2} for each element of the second
+	 * input. Each CoFlatMapFunction call returns any number of elements
+	 * including none.
+	 * 
+	 * @param coFlatMapper
+	 *            The CoFlatMapFunction used to jointly transform the two input
+	 *            DataStreams
+	 * @return The transformed {@link DataStream}
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
+			CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
+
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
+				CoFlatMapFunction.class, false, true, getType1(), getType2(),
+				Utils.getCallLocationName(), true);
+
+		return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
+	}
+
+	public <OUT> SingleOutputStreamOperator<OUT, ?> transform(String functionName,
+			TypeInformation<OUT> outTypeInfo,
+			TwoInputStreamOperator<IN1, IN2, OUT> operator) {
+
+		// read the output type of the input Transforms to coax out errors about MissingTypeInfo
+		inputStream1.getType();
+		inputStream2.getType();
+
+		TwoInputTransformation<IN1, IN2, OUT> transform = new TwoInputTransformation<>(
+				inputStream1.getTransformation(),
+				inputStream2.getTransformation(),
+				functionName,
+				operator,
+				outTypeInfo,
+				environment.getParallelism());
+
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(environment, transform);
+
+		getExecutionEnvironment().addOperator(transform);
+
+		return returnStream;
+	}
+}