You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 22:33:54 UTC

[5/8] flink git commit: [FLINK-2819] Add Windowed Join/CoGroup Operator Based on Tagged Union

[FLINK-2819] Add Windowed Join/CoGroup Operator Based on Tagged Union

Right now, this does everything in memory, so the JVM will blow if data
for one key becomes too large.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8634dbbe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8634dbbe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8634dbbe

Branch: refs/heads/master
Commit: 8634dbbe998af53471bed2f3a6557c722bb37b87
Parents: 47b5cb7
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Oct 6 16:33:04 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 7 22:08:25 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/CoGroupedStreams.java        | 563 +++++++++++++++++++
 .../streaming/api/datastream/DataStream.java    |  36 +-
 .../streaming/api/datastream/JoinedStreams.java | 328 +++++++++++
 .../datastream/temporal/StreamJoinOperator.java | 274 ---------
 .../datastream/temporal/TemporalOperator.java   | 124 ----
 .../api/datastream/temporal/TemporalWindow.java |  45 --
 .../windowing/ReduceWindowFunction.java         |  32 +-
 .../flink/streaming/api/CoGroupJoinITCase.java  | 372 ++++++++++++
 .../streaming/api/WindowCrossJoinTest.java      | 143 -----
 .../api/operators/co/SelfConnectionTest.java    |  61 --
 .../streaming/examples/join/WindowJoin.java     | 128 +++--
 .../examples/join/util/WindowJoinData.java      |  70 +--
 .../scala/examples/join/WindowJoin.scala        |  43 +-
 .../join/WindowJoinITCase.java                  | 101 ++--
 .../join/WindowJoinITCase.java                  | 101 ++--
 .../streaming/api/scala/CoGroupedStreams.scala  | 294 ++++++++++
 .../flink/streaming/api/scala/DataStream.scala  |  24 +-
 .../streaming/api/scala/JoinedStreams.scala     | 303 ++++++++++
 .../api/scala/StreamJoinOperator.scala          | 203 -------
 .../streaming/api/scala/TemporalOperator.scala  |  51 --
 .../streaming/api/scala/CoGroupJoinITCase.scala | 274 +++++++++
 .../StreamingScalaAPICompletenessTest.scala     |  14 +-
 22 files changed, 2409 insertions(+), 1175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
new file mode 100644
index 0000000..e1f1a96
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *{@code CoGroupedStreams} represents two {@link DataStream DataStreams} that have been co-grouped.
+ * A streaming co-group operation is evaluated over elements in a window.
+ *
+ * <p>
+ * To finalize co-group operation you also need to specify a {@link KeySelector} for
+ * both the first and second input and a {@link WindowAssigner}.
+ *
+ * <p>
+ * Note: Right now, the groups are being built in memory so you need to ensure that they don't
+ * get too big. Otherwise the JVM might crash.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> one = ...;
+ * DataStream<Tuple2<String, Integer>> two = ...;
+ *
+ * DataStream<T> result = one.coGroup(two)
+ *     .where(new MyFirstKeySelector())
+ *     .equalTo(new MyFirstKeySelector())
+ *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ *     .apply(new MyCoGroupFunction());
+ * } </pre>
+ */
+public class CoGroupedStreams {
+
+	/**
+	 * A co-group operation that does not yet have its {@link KeySelector KeySelectors} defined.
+	 *
+	 * @param <T1> Type of the elements from the first input
+	 * @param <T2> Type of the elements from the second input
+	 */
+	public static class Unspecified<T1, T2> {
+		DataStream<T1> input1;
+		DataStream<T2> input2;
+
+		protected Unspecified(DataStream<T1> input1,
+				DataStream<T2> input2) {
+			this.input1 = input1;
+			this.input2 = input2;
+		}
+
+		/**
+		 * Specifies a {@link KeySelector} for elements from the first input.
+		 */
+		public <KEY> WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector)  {
+			return new WithKey<>(input1, input2, input1.clean(keySelector), null);
+		}
+
+		/**
+		 * Specifies a {@link KeySelector} for elements from the second input.
+		 */
+		public <KEY> WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector)  {
+			return new WithKey<>(input1, input2, null, input1.clean(keySelector));
+		}
+	}
+
+	/**
+	 * A co-group operation that has {@link KeySelector KeySelectors} defined for either both or
+	 * one input.
+	 *
+	 * <p>
+	 * You need to specify a {@code KeySelector} for both inputs using {@link #where(KeySelector)}
+	 * and {@link #equalTo(KeySelector)} before you can proceeed with specifying a
+	 * {@link WindowAssigner} using {@link #window(WindowAssigner)}.
+	 *
+	 * @param <T1> Type of the elements from the first input
+	 * @param <T2> Type of the elements from the second input
+	 * @param <KEY> Type of the key. This must be the same for both inputs
+	 */
+	public static class WithKey<T1, T2, KEY> {
+		DataStream<T1> input1;
+		DataStream<T2> input2;
+
+		KeySelector<T1, KEY> keySelector1;
+		KeySelector<T2, KEY> keySelector2;
+
+		protected WithKey(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) {
+			this.input1 = input1;
+			this.input2 = input2;
+
+			this.keySelector1 = keySelector1;
+			this.keySelector2 = keySelector2;
+		}
+
+		/**
+		 * Specifies a {@link KeySelector} for elements from the first input.
+		 */
+		public WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector)  {
+			return new CoGroupedStreams.WithKey<>(input1, input2, input1.clean(keySelector), keySelector2);
+		}
+
+		/**
+		 * Specifies a {@link KeySelector} for elements from the second input.
+		 */
+		public CoGroupedStreams.WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector)  {
+			return new CoGroupedStreams.WithKey<>(input1, input2, keySelector1, input1.clean(keySelector));
+		}
+
+		/**
+		 * Specifies the window on which the co-group operation works.
+		 */
+		public <W extends Window> CoGroupedStreams.WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
+			if (keySelector1 == null || keySelector2 == null) {
+				throw new UnsupportedOperationException("You first need to specify KeySelectors for both inputs using where() and equalTo().");
+
+			}
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, assigner, null, null);
+		}
+	}
+
+	/**
+	 * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
+	 * well as a {@link WindowAssigner}.
+	 *
+	 * @param <T1> Type of the elements from the first input
+	 * @param <T2> Type of the elements from the second input
+	 * @param <KEY> Type of the key. This must be the same for both inputs
+	 * @param <W> Type of {@link Window} on which the co-group operation works.
+	 */
+	public static class WithWindow<T1, T2, KEY, W extends Window> {
+		private final DataStream<T1> input1;
+		private final DataStream<T2> input2;
+
+		private final KeySelector<T1, KEY> keySelector1;
+		private final KeySelector<T2, KEY> keySelector2;
+
+		private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
+
+		private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
+
+		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
+
+		protected WithWindow(DataStream<T1> input1,
+				DataStream<T2> input2,
+				KeySelector<T1, KEY> keySelector1,
+				KeySelector<T2, KEY> keySelector2,
+				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
+				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
+				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
+			this.input1 = input1;
+			this.input2 = input2;
+
+			this.keySelector1 = keySelector1;
+			this.keySelector2 = keySelector2;
+
+			this.windowAssigner = windowAssigner;
+			this.trigger = trigger;
+			this.evictor = evictor;
+		}
+
+		/**
+		 * Sets the {@code Trigger} that should be used to trigger window emission.
+		 */
+		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, newTrigger, evictor);
+		}
+
+		/**
+		 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+		 *
+		 * <p>
+		 * Note: When using an evictor window performance will degrade significantly, since
+		 * pre-aggregation of window results cannot be used.
+		 */
+		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, trigger, newEvictor);
+		}
+
+		/**
+		 * Completes the co-group operation with the user function that is executed
+		 * for windowed groups.
+		 */
+		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
+
+			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
+					function,
+					CoGroupFunction.class,
+					true,
+					true,
+					input1.getType(),
+					input2.getType(),
+					"CoGroup",
+					false);
+
+			return apply(function, resultType);
+		}
+
+		/**
+		 * Completes the co-group operation with the user function that is executed
+		 * for windowed groups.
+		 */
+		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
+			//clean the closure
+			function = input1.getExecutionEnvironment().clean(function);
+
+			DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
+					.map(new Input1Tagger<T1, T2>())
+					.returns(new UnionTypeInfo<>(input1.getType(), input2.getType()));
+			DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
+					.map(new Input2Tagger<T1, T2>())
+					.returns(new UnionTypeInfo<>(input1.getType(), input2.getType()));
+
+			WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp = taggedInput1
+					.union(taggedInput2)
+					.keyBy(new UnionKeySelector<>(keySelector1, keySelector2))
+					.window(windowAssigner);
+
+			if (trigger != null) {
+				windowOp.trigger(trigger);
+			}
+			if (evictor != null) {
+				windowOp.evictor(evictor);
+			}
+
+			return windowOp.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
+		}
+	}
+
+	/**
+	 * Creates a new co-group operation from the two given inputs.
+	 */
+	public static <T1, T2> Unspecified<T1, T2> createCoGroup(DataStream<T1> input1, DataStream<T2> input2) {
+		return new Unspecified<>(input1, input2);
+	}
+
+	/**
+	 * Internal class for implementing tagged union co-group.
+	 */
+	public static class TaggedUnion<T1, T2> {
+		private final T1 one;
+		private final T2 two;
+
+		private TaggedUnion(T1 one, T2 two) {
+			this.one = one;
+			this.two = two;
+		}
+
+		public boolean isOne() {
+			return one != null;
+		}
+
+		public boolean isTwo() {
+			return two != null;
+		}
+
+		public T1 getOne() {
+			return one;
+		}
+
+		public T2 getTwo() {
+			return two;
+		}
+
+		public static <T1, T2> TaggedUnion<T1, T2> one(T1 one) {
+			return new TaggedUnion<>(one, null);
+		}
+
+		public static <T1, T2> TaggedUnion<T1, T2> two(T2 two) {
+			return new TaggedUnion<>(null, two);
+		}
+	}
+
+	private static class UnionTypeInfo<T1, T2> extends TypeInformation<TaggedUnion<T1, T2>> {
+		private static final long serialVersionUID = 1L;
+
+		TypeInformation<T1> oneType;
+		TypeInformation<T2> twoType;
+
+		public UnionTypeInfo(TypeInformation<T1> oneType,
+				TypeInformation<T2> twoType) {
+			this.oneType = oneType;
+			this.twoType = twoType;
+		}
+
+		@Override
+		public boolean isBasicType() {
+			return false;
+		}
+
+		@Override
+		public boolean isTupleType() {
+			return false;
+		}
+
+		@Override
+		public int getArity() {
+			return 2;
+		}
+
+		@Override
+		public int getTotalFields() {
+			return 2;
+		}
+
+		@Override
+		@SuppressWarnings("unchecked, rawtypes")
+		public Class<TaggedUnion<T1, T2>> getTypeClass() {
+			return (Class) TaggedUnion.class;
+		}
+
+		@Override
+		public boolean isKeyType() {
+			return true;
+		}
+
+		@Override
+		public TypeSerializer<TaggedUnion<T1, T2>> createSerializer(ExecutionConfig config) {
+			return new UnionSerializer<>(oneType.createSerializer(config), twoType.createSerializer(config));
+		}
+
+		@Override
+		public String toString() {
+			return "TaggedUnion<" + oneType + ", " + twoType + ">";
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof UnionTypeInfo) {
+				@SuppressWarnings("unchecked")
+				UnionTypeInfo<T1, T2> unionTypeInfo = (UnionTypeInfo<T1, T2>) obj;
+
+				return unionTypeInfo.canEqual(this) && oneType.equals(unionTypeInfo.oneType) && twoType.equals(unionTypeInfo.twoType);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			return 31 *  oneType.hashCode() + twoType.hashCode();
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof UnionTypeInfo;
+		}
+	}
+
+	private static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
+		private static final long serialVersionUID = 1L;
+
+		private final TypeSerializer<T1> oneSerializer;
+		private final TypeSerializer<T2> twoSerializer;
+
+		public UnionSerializer(TypeSerializer<T1> oneSerializer,
+				TypeSerializer<T2> twoSerializer) {
+			this.oneSerializer = oneSerializer;
+			this.twoSerializer = twoSerializer;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<TaggedUnion<T1, T2>> duplicate() {
+			return this;
+		}
+
+		@Override
+		public TaggedUnion<T1, T2> createInstance() {
+			return null;
+		}
+
+		@Override
+		public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from) {
+			if (from.isOne()) {
+				return TaggedUnion.one(oneSerializer.copy(from.getOne()));
+			} else {
+				return TaggedUnion.two(twoSerializer.copy(from.getTwo()));
+			}
+		}
+
+		@Override
+		public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from, TaggedUnion<T1, T2> reuse) {
+			if (from.isOne()) {
+				return TaggedUnion.one(oneSerializer.copy(from.getOne()));
+			} else {
+				return TaggedUnion.two(twoSerializer.copy(from.getTwo()));
+			}		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public void serialize(TaggedUnion<T1, T2> record, DataOutputView target) throws IOException {
+			if (record.isOne()) {
+				target.writeByte(1);
+				oneSerializer.serialize(record.getOne(), target);
+			} else {
+				target.writeByte(2);
+				twoSerializer.serialize(record.getTwo(), target);
+			}
+		}
+
+		@Override
+		public TaggedUnion<T1, T2> deserialize(DataInputView source) throws IOException {
+			byte tag = source.readByte();
+			if (tag == 1) {
+				return TaggedUnion.one(oneSerializer.deserialize(source));
+			} else {
+				return TaggedUnion.two(twoSerializer.deserialize(source));
+			}
+		}
+
+		@Override
+		public TaggedUnion<T1, T2> deserialize(TaggedUnion<T1, T2> reuse,
+				DataInputView source) throws IOException {
+			byte tag = source.readByte();
+			if (tag == 1) {
+				return TaggedUnion.one(oneSerializer.deserialize(source));
+			} else {
+				return TaggedUnion.two(twoSerializer.deserialize(source));
+			}
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			byte tag = source.readByte();
+			target.writeByte(tag);
+			if (tag == 1) {
+				oneSerializer.copy(source, target);
+			} else {
+				twoSerializer.copy(source, target);
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			return 31 * oneSerializer.hashCode() + twoSerializer.hashCode();
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public boolean equals(Object obj) {
+			if (obj instanceof UnionSerializer) {
+				UnionSerializer<T1, T2> other = (UnionSerializer<T1, T2>) obj;
+
+				return other.canEqual(this) && oneSerializer.equals(other.oneSerializer) && twoSerializer.equals(other.twoSerializer);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof UnionSerializer;
+		}
+	}
+
+	private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public TaggedUnion<T1, T2> map(T1 value) throws Exception {
+			return TaggedUnion.one(value);
+		}
+	}
+
+	private static class Input2Tagger<T1, T2> implements MapFunction<T2, TaggedUnion<T1, T2>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public TaggedUnion<T1, T2> map(T2 value) throws Exception {
+			return TaggedUnion.two(value);
+		}
+	}
+
+	private static class UnionKeySelector<T1, T2, KEY> implements KeySelector<TaggedUnion<T1, T2>, KEY> {
+		private static final long serialVersionUID = 1L;
+
+		private final KeySelector<T1, KEY> keySelector1;
+		private final KeySelector<T2, KEY> keySelector2;
+
+		public UnionKeySelector(KeySelector<T1, KEY> keySelector1,
+				KeySelector<T2, KEY> keySelector2) {
+			this.keySelector1 = keySelector1;
+			this.keySelector2 = keySelector2;
+		}
+
+		@Override
+		public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{
+			if (value.isOne()) {
+				return keySelector1.getKey(value.getOne());
+			} else {
+				return keySelector2.getKey(value.getTwo());
+			}
+		}
+	}
+
+	private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
+			extends WrappingFunction<CoGroupFunction<T1, T2, T>>
+			implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
+		private static final long serialVersionUID = 1L;
+
+		public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
+			super(userFunction);
+		}
+
+		@Override
+		public void apply(KEY key,
+				W window,
+				Iterable<TaggedUnion<T1, T2>> values,
+				Collector<T> out) throws Exception {
+			List<T1> oneValues = Lists.newArrayList();
+			List<T2> twoValues = Lists.newArrayList();
+			for (TaggedUnion<T1, T2> val: values) {
+				if (val.isOne()) {
+					oneValues.add(val.getOne());
+				} else {
+					twoValues.add(val.getTwo());
+				}
+			}
+			wrappedFunction.coGroup(oneValues, twoValues, out);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8de1a0d..0be1d56 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -44,7 +44,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.TimestampExtractor;
 import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
@@ -618,30 +617,19 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Initiates a temporal Join transformation. <br/>
-	 * A temporal Join transformation joins the elements of two
-	 * {@link DataStream}s on key equality over a specified time window.
-	 *
-	 * <p>
-	 * This method returns a {@link StreamJoinOperator} on which the
-	 * {@link StreamJoinOperator#onWindow(long, java.util.concurrent.TimeUnit)}
-	 * should be called to define the window, and then the
-	 * {@link StreamJoinOperator.JoinWindow#where(int...)} and
-	 * {@link StreamJoinOperator.JoinPredicate#equalTo(int...)} can be used to define
-	 * the join keys.
-	 * <p>
-	 * The user can also use the
-	 * {@link org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator.JoinPredicate.JoinedStream#with}
-	 * to apply a custom join function.
-	 * 
-	 * @param dataStreamToJoin
-	 *            The other DataStream with which this DataStream is joined.
-	 * @return A {@link StreamJoinOperator} to continue the definition of the
-	 *         Join transformation.
-	 * 
+	 * Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys
+	 * and window can be specified.
+	 */
+	public <T2> CoGroupedStreams.Unspecified<T, T2> coGroup(DataStream<T2> otherStream) {
+		return CoGroupedStreams.createCoGroup(this, otherStream);
+	}
+
+	/**
+	 * Creates a join operation. See {@link JoinedStreams} for an example of how the keys
+	 * and window can be specified.
 	 */
-	public <IN2> StreamJoinOperator<T, IN2> join(DataStream<IN2> dataStreamToJoin) {
-		return new StreamJoinOperator<T, IN2>(this, dataStreamToJoin);
+	public <T2> JoinedStreams.Unspecified<T, T2> join(DataStream<T2> otherStream) {
+		return JoinedStreams.createJoin(this, otherStream);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
new file mode 100644
index 0000000..ee848e3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ *{@code JoinedStreams} represents two {@link DataStream DataStreams} that have been joined.
+ * A streaming join operation is evaluated over elements in a window.
+ *
+ * <p>
+ * To finalize the join operation you also need to specify a {@link KeySelector} for
+ * both the first and second input and a {@link WindowAssigner}.
+ *
+ * <p>
+ * Note: Right now, the the join is being evaluated in memory so you need to ensure that the number
+ * of elements per key does not get too high. Otherwise the JVM might crash.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> one = ...;
+ * DataStream<Tuple2<String, Integer>> twp = ...;
+ *
+ * DataStream<T> result = one.join(two)
+ *     .where(new MyFirstKeySelector())
+ *     .equalTo(new MyFirstKeySelector())
+ *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ *     .apply(new MyJoinFunction());
+ * } </pre>
+ */
+public class JoinedStreams extends CoGroupedStreams{
+
+	/**
+	 * A join operation that does not yet have its {@link KeySelector KeySelectors} defined.
+	 *
+	 * @param <T1> Type of the elements from the first input
+	 * @param <T2> Type of the elements from the second input
+	 */
+	public static class Unspecified<T1, T2> {
+		DataStream<T1> input1;
+		DataStream<T2> input2;
+
+		protected Unspecified(DataStream<T1> input1,
+				DataStream<T2> input2) {
+			this.input1 = input1;
+			this.input2 = input2;
+		}
+
+		/**
+		 * Specifies a {@link KeySelector} for elements from the first input.
+		 */
+		public <KEY> WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector)  {
+			return new WithKey<>(input1, input2, keySelector, null);
+		}
+
+		/**
+		 * Specifies a {@link KeySelector} for elements from the second input.
+		 */
+		public <KEY> WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector)  {
+			return new WithKey<>(input1, input2, null, keySelector);
+		}
+	}
+
+	/**
+	 * A join operation that has {@link KeySelector KeySelectors} defined for either both or
+	 * one input.
+	 *
+	 * <p>
+	 * You need to specify a {@code KeySelector} for both inputs using {@link #where(KeySelector)}
+	 * and {@link #equalTo(KeySelector)} before you can proceeed with specifying a
+	 * {@link WindowAssigner} using {@link #window(WindowAssigner)}.
+	 *
+	 * @param <T1> Type of the elements from the first input
+	 * @param <T2> Type of the elements from the second input
+	 * @param <KEY> Type of the key. This must be the same for both inputs
+	 */
+	public static class WithKey<T1, T2, KEY> {
+		DataStream<T1> input1;
+		DataStream<T2> input2;
+
+		KeySelector<T1, KEY> keySelector1;
+		KeySelector<T2, KEY> keySelector2;
+
+		protected WithKey(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) {
+			this.input1 = input1;
+			this.input2 = input2;
+
+			this.keySelector1 = keySelector1;
+			this.keySelector2 = keySelector2;
+		}
+
+		/**
+		 * Specifies a {@link KeySelector} for elements from the first input.
+		 */
+		public WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector)  {
+			return new JoinedStreams.WithKey<>(input1, input2, keySelector, keySelector2);
+		}
+
+		/**
+		 * Specifies a {@link KeySelector} for elements from the second input.
+		 */
+		public JoinedStreams.WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector)  {
+			return new JoinedStreams.WithKey<>(input1, input2, keySelector1, keySelector);
+		}
+
+		/**
+		 * Specifies the window on which the join operation works.
+		 */
+		public <W extends Window> JoinedStreams.WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
+			if (keySelector1 == null || keySelector2 == null) {
+				throw new UnsupportedOperationException("You first need to specify KeySelectors for both inputs using where() and equalTo().");
+
+			}
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, assigner, null, null);
+		}
+	}
+
+	/**
+	 * A join operation that has {@link KeySelector KeySelectors} defined for both inputs as
+	 * well as a {@link WindowAssigner}.
+	 *
+	 * @param <T1> Type of the elements from the first input
+	 * @param <T2> Type of the elements from the second input
+	 * @param <KEY> Type of the key. This must be the same for both inputs
+	 * @param <W> Type of {@link Window} on which the join operation works.
+	 */
+	public static class WithWindow<T1, T2, KEY, W extends Window> {
+		private final DataStream<T1> input1;
+		private final DataStream<T2> input2;
+
+		private final KeySelector<T1, KEY> keySelector1;
+		private final KeySelector<T2, KEY> keySelector2;
+
+		private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
+
+		private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
+
+		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
+
+		protected WithWindow(DataStream<T1> input1,
+				DataStream<T2> input2,
+				KeySelector<T1, KEY> keySelector1,
+				KeySelector<T2, KEY> keySelector2,
+				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
+				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
+				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
+			this.input1 = input1;
+			this.input2 = input2;
+
+			this.keySelector1 = keySelector1;
+			this.keySelector2 = keySelector2;
+
+			this.windowAssigner = windowAssigner;
+			this.trigger = trigger;
+			this.evictor = evictor;
+		}
+
+		/**
+		 * Sets the {@code Trigger} that should be used to trigger window emission.
+		 */
+		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, newTrigger, evictor);
+		}
+
+		/**
+		 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+		 *
+		 * <p>
+		 * Note: When using an evictor window performance will degrade significantly, since
+		 * pre-aggregation of window results cannot be used.
+		 */
+		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, trigger, newEvictor);
+		}
+
+		/**
+		 * Completes the join operation with the user function that is executed
+		 * for each combination of elements with the same key in a window.
+		 */
+		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
+			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
+					function,
+					JoinFunction.class,
+					true,
+					true,
+					input1.getType(),
+					input2.getType(),
+					"CoGroup",
+					false);
+
+			return apply(function, resultType);
+		}
+
+		/**
+		 * Completes the join operation with the user function that is executed
+		 * for each combination of elements with the same key in a window.
+		 */
+		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
+			//clean the closure
+			function = input1.getExecutionEnvironment().clean(function);
+
+			return input1.coGroup(input2)
+					.where(keySelector1)
+					.equalTo(keySelector2)
+					.window(windowAssigner)
+					.trigger(trigger)
+					.evictor(evictor)
+					.apply(new FlatJoinCoGroupFunction<>(function), resultType);
+
+		}
+
+		/**
+		 * Completes the join operation with the user function that is executed
+		 * for each combination of elements with the same key in a window.
+		 */
+		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
+			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
+					function,
+					JoinFunction.class,
+					true,
+					true,
+					input1.getType(),
+					input2.getType(),
+					"CoGroup",
+					false);
+
+			return apply(function, resultType);
+		}
+
+		/**
+		 * Completes the join operation with the user function that is executed
+		 * for each combination of elements with the same key in a window.
+		 */
+		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
+			//clean the closure
+			function = input1.getExecutionEnvironment().clean(function);
+
+			return input1.coGroup(input2)
+					.where(keySelector1)
+					.equalTo(keySelector2)
+					.window(windowAssigner)
+					.trigger(trigger)
+					.evictor(evictor)
+					.apply(new JoinCoGroupFunction<>(function), resultType);
+
+		}
+	}
+
+	/**
+	 * Creates a new join operation from the two given inputs.
+	 */
+	public static <T1, T2> Unspecified<T1, T2> createJoin(DataStream<T1> input1, DataStream<T2> input2) {
+		return new Unspecified<>(input1, input2);
+	}
+
+	/**
+	 * CoGroup function that does a nested-loop join to get the join result.
+	 */
+	private static class JoinCoGroupFunction<T1, T2, T>
+			extends WrappingFunction<JoinFunction<T1, T2, T>>
+			implements CoGroupFunction<T1, T2, T> {
+		private static final long serialVersionUID = 1L;
+
+		public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
+			super(wrappedFunction);
+		}
+
+		@Override
+		public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
+			for (T1 val1: first) {
+				for (T2 val2: second) {
+					out.collect(wrappedFunction.join(val1, val2));
+				}
+			}
+		}
+	}
+
+	/**
+	 * CoGroup function that does a nested-loop join to get the join result. (FlatJoin version)
+	 */
+	private static class FlatJoinCoGroupFunction<T1, T2, T>
+			extends WrappingFunction<FlatJoinFunction<T1, T2, T>>
+			implements CoGroupFunction<T1, T2, T> {
+		private static final long serialVersionUID = 1L;
+
+		public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) {
+			super(wrappedFunction);
+		}
+
+		@Override
+		public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
+			for (T1 val1: first) {
+				for (T2 val2: second) {
+					wrappedFunction.join(val1, val2, out);
+				}
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
deleted file mode 100644
index 4a5622d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream.temporal;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-
-public class StreamJoinOperator<I1, I2> extends
-		TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> {
-
-	public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) {
-		super(input1, input2);
-	}
-
-	@Override
-	protected JoinWindow<I1, I2> createNextWindowOperator() {
-		return new JoinWindow<I1, I2>(this);
-	}
-
-	public static class JoinWindow<I1, I2> implements TemporalWindow<JoinWindow<I1, I2>> {
-
-		private StreamJoinOperator<I1, I2> op;
-		private TypeInformation<I1> type1;
-
-		private JoinWindow(StreamJoinOperator<I1, I2> operator) {
-			this.op = operator;
-			this.type1 = op.input1.getType();
-		}
-
-		/**
-		 * Continues a temporal Join transformation. <br/>
-		 * Defines the {@link Tuple} fields of the first join {@link DataStream}
-		 * that should be used as join keys.<br/>
-		 * <b>Note: Fields can only be selected as join keys on Tuple
-		 * DataStreams.</b><br/>
-		 * 
-		 * @param fields
-		 *            The indexes of the other Tuple fields of the first join
-		 *            DataStreams that should be used as keys.
-		 * @return An incomplete Join transformation. Call
-		 *         {@link JoinPredicate#equalTo} to continue the Join.
-		 */
-		public JoinPredicate<I1, I2> where(int... fields) {
-			return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys(
-					new Keys.ExpressionKeys<I1>(fields, type1), type1, op.input1.getExecutionEnvironment().getConfig()));
-		}
-
-		/**
-		 * Continues a temporal join transformation. <br/>
-		 * Defines the fields of the first join {@link DataStream} that should
-		 * be used as grouping keys. Fields are the names of member fields of
-		 * the underlying type of the data stream.
-		 * 
-		 * @param fields
-		 *            The fields of the first join DataStream that should be
-		 *            used as keys.
-		 * @return An incomplete Join transformation. Call
-		 *         {@link JoinPredicate#equalTo} to continue the Join.
-		 */
-		public JoinPredicate<I1, I2> where(String... fields) {
-			return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys(
-					new Keys.ExpressionKeys<I1>(fields, type1), type1, op.input1.getExecutionEnvironment().getConfig()));
-		}
-
-		/**
-		 * Continues a temporal Join transformation and defines a
-		 * {@link KeySelector} function for the first join {@link DataStream}
-		 * .</br> The KeySelector function is called for each element of the
-		 * first DataStream and extracts a single key value on which the
-		 * DataStream is joined. </br>
-		 * 
-		 * @param keySelector
-		 *            The KeySelector function which extracts the key values
-		 *            from the DataStream on which it is joined.
-		 * @return An incomplete Join transformation. Call
-		 *         {@link JoinPredicate#equalTo} to continue the Join.
-		 */
-		public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> keySelector) {
-			return new JoinPredicate<I1, I2>(op, keySelector);
-		}
-
-		@Override
-		public JoinWindow<I1, I2> every(long length, TimeUnit timeUnit) {
-			return every(timeUnit.toMillis(length));
-		}
-
-		@Override
-		public JoinWindow<I1, I2> every(long length) {
-			op.slideInterval = length;
-			return this;
-		}
-
-		// ----------------------------------------------------------------------------------------
-
-	}
-
-	/**
-	 * Intermediate step of a temporal Join transformation. <br/>
-	 * To continue the Join transformation, select the join key of the second
-	 * input {@link DataStream} by calling {@link JoinPredicate#equalTo}
-	 * 
-	 */
-	public static class JoinPredicate<I1, I2> {
-
-		private StreamJoinOperator<I1, I2> op;
-		private KeySelector<I1, ?> keys1;
-		private KeySelector<I2, ?> keys2;
-		private TypeInformation<I2> type2;
-
-		private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> keys1) {
-			this.op = operator;
-			this.keys1 = keys1;
-			this.type2 = op.input2.getType();
-		}
-
-		/**
-		 * Creates a temporal Join transformation and defines the {@link Tuple}
-		 * fields of the second join {@link DataStream} that should be used as
-		 * join keys.<br/>
-		 * </p> The resulting operator wraps each pair of joining elements in a
-		 * Tuple2<I1,I2>(first, second). To use a different wrapping function
-		 * use {@link JoinedStream#with(JoinFunction)}
-		 * 
-		 * @param fields
-		 *            The indexes of the Tuple fields of the second join
-		 *            DataStream that should be used as keys.
-		 * @return A streaming join operator. Call {@link JoinedStream#with} to
-		 *         apply a custom wrapping
-		 */
-		public JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(int... fields) {
-			keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, type2),
-					type2, op.input1.getExecutionEnvironment().getConfig());
-			return createJoinOperator();
-		}
-
-		/**
-		 * Creates a temporal Join transformation and defines the fields of the
-		 * second join {@link DataStream} that should be used as join keys. </p>
-		 * The resulting operator wraps each pair of joining elements in a
-		 * Tuple2<I1,I2>(first, second). To use a different wrapping function
-		 * use {@link JoinedStream#with(JoinFunction)}
-		 * 
-		 * @param fields
-		 *            The fields of the second join DataStream that should be
-		 *            used as keys.
-		 * @return A streaming join operator. Call {@link JoinedStream#with} to
-		 *         apply a custom wrapping
-		 */
-		public JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(String... fields) {
-			this.keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields,
-					type2), type2, op.input1.getExecutionEnvironment().getConfig());
-			return createJoinOperator();
-		}
-
-		/**
-		 * Creates a temporal Join transformation and defines a
-		 * {@link KeySelector} function for the second join {@link DataStream}
-		 * .</br> The KeySelector function is called for each element of the
-		 * second DataStream and extracts a single key value on which the
-		 * DataStream is joined. </p> The resulting operator wraps each pair of
-		 * joining elements in a Tuple2<I1,I2>(first, second). To use a
-		 * different wrapping function use
-		 * {@link JoinedStream#with(JoinFunction)}
-		 * 
-		 * 
-		 * @param keySelector
-		 *            The KeySelector function which extracts the key values
-		 *            from the second DataStream on which it is joined.
-		 * @return A streaming join operator. Call {@link JoinedStream#with} to
-		 *         apply a custom wrapping
-		 */
-		public <K> JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(KeySelector<I2, K> keySelector) {
-			this.keys2 = keySelector;
-			return createJoinOperator();
-		}
-
-		private JoinedStream<I1, I2, Tuple2<I1, I2>> createJoinOperator() {
-
-//			JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1, I2>();
-//
-//			JoinWindowFunction<I1, I2, Tuple2<I1, I2>> joinWindowFunction = getJoinWindowFunction(
-//					joinFunction, this);
-//
-//			TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1, I2>>(
-//					op.input1.getType(), op.input2.getType());
-
-//			return new JoinedStream<I1, I2, Tuple2<I1, I2>>(this, op.input1
-//					.keyBy(keys1)
-//					.connect(op.input2.keyBy(keys2))
-//					.addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
-//							op.slideInterval, op.timeStamp1, op.timeStamp2));
-			return null;
-		}
-
-		public static class JoinedStream<I1, I2, R> extends
-				SingleOutputStreamOperator<R, JoinedStream<I1, I2, R>> {
-			private final JoinPredicate<I1, I2> predicate;
-
-			private JoinedStream(JoinPredicate<I1, I2> predicate, DataStream<R> ds) {
-				super(ds.getExecutionEnvironment(), ds.getTransformation());
-				this.predicate = predicate;
-			}
-
-			/**
-			 * Completes a stream join. </p> The resulting operator wraps each pair
-			 * of joining elements using the user defined {@link JoinFunction}
-			 *
-			 * @return The joined data stream.
-			 */
-			@SuppressWarnings("unchecked")
-			public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) {
-
-				TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
-						predicate.op.input1.getType(), predicate.op.input2.getType());
-
-//				JoinWindowFunction<I1, I2, OUT> joinWindowFunction = getJoinWindowFunction(joinFunction, predicate);
-//
-
-//				return new JoinedStream<I1, I2, OUT>(
-//						predicate, predicate.op.input1
-//						.keyBy(predicate.keys1)
-//						.connect(predicate.op.input2.keyBy(predicate.keys2))
-//						.addGeneralWindowCombine(joinWindowFunction, outType, predicate.op.windowSize,
-//								predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2));
-				return null;
-			}
-		}
-	}
-
-	public static final class DefaultJoinFunction<I1, I2> implements
-			JoinFunction<I1, I2, Tuple2<I1, I2>> {
-
-		private static final long serialVersionUID = 1L;
-		private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>();
-
-		@Override
-		public Tuple2<I1, I2> join(I1 first, I2 second) throws Exception {
-			outTuple.f0 = first;
-			outTuple.f1 = second;
-			return outTuple;
-		}
-	}
-
-//	private static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction(
-//			JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) {
-//		return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction);
-//	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
deleted file mode 100644
index 9da00f2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream.temporal;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> {
-
-	public final DataStream<I1> input1;
-	public final DataStream<I2> input2;
-
-	public long windowSize;
-	public long slideInterval;
-
-	public TimestampWrapper<I1> timeStamp1;
-	public TimestampWrapper<I2> timeStamp2;
-
-	public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
-		if (input1 == null || input2 == null) {
-			throw new NullPointerException();
-		}
-		this.input1 = input1;
-		this.input2 = input2;
-	}
-
-	/**
-	 * Continues a temporal transformation.<br/>
-	 * Defines the window size on which the two DataStreams will be transformed.
-	 * To define sliding windows call {@link TemporalWindow#every} on the
-	 * resulting operator.
-	 * 
-	 * @param length
-	 *            The size of the window in milliseconds.
-	 * @param timeUnit
-	 *            The unit if time to be used
-	 * @return An incomplete temporal transformation.
-	 */
-	@SuppressWarnings("unchecked")
-	public OP onWindow(long length, TimeUnit timeUnit) {
-		return onWindow(timeUnit.toMillis(length),
-				(TimestampWrapper<I1>) SystemTimestamp.getWrapper(),
-				(TimestampWrapper<I2>) SystemTimestamp.getWrapper());
-	}
-
-	/**
-	 * Continues a temporal transformation.<br/>
-	 * Defines the window size on which the two DataStreams will be
-	 * transformed.To define sliding windows call {@link TemporalWindow#every}
-	 * on the resulting operator.
-	 * 
-	 * @param length
-	 *            The size of the window in milliseconds.
-	 * @param timeStamp1
-	 *            The timestamp used to extract time from the elements of the
-	 *            first data stream.
-	 * @param timeStamp2
-	 *            The timestamp used to extract time from the elements of the
-	 *            second data stream.
-	 * @return An incomplete temporal transformation.
-	 */
-	public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2) {
-		return onWindow(length, timeStamp1, timeStamp2, 0);
-	}
-
-	/**
-	 * Continues a temporal transformation.<br/>
-	 * Defines the window size on which the two DataStreams will be
-	 * transformed.To define sliding windows call {@link TemporalWindow#every}
-	 * on the resulting operator.
-	 * 
-	 * @param length
-	 *            The size of the window in milliseconds.
-	 * @param timeStamp1
-	 *            The timestamp used to extract time from the elements of the
-	 *            first data stream.
-	 * @param timeStamp2
-	 *            The timestamp used to extract time from the elements of the
-	 *            second data stream.
-	 * @param startTime
-	 *            The start time to measure the first window
-	 * @return An incomplete temporal transformation.
-	 */
-	public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2,
-			long startTime) {
-		return onWindow(length, new TimestampWrapper<I1>(timeStamp1, startTime),
-				new TimestampWrapper<I2>(timeStamp2, startTime));
-	}
-
-	private OP onWindow(long length, TimestampWrapper<I1> timeStamp1,
-			TimestampWrapper<I2> timeStamp2) {
-
-		this.windowSize = length;
-		this.slideInterval = length;
-
-		this.timeStamp1 = timeStamp1;
-		this.timeStamp2 = timeStamp2;
-
-		return createNextWindowOperator();
-	}
-
-	protected abstract OP createNextWindowOperator();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java
deleted file mode 100644
index 49c75c4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream.temporal;
-
-import java.util.concurrent.TimeUnit;
-
-public interface TemporalWindow<T> {
-
-	/**
-	 * Defines the slide interval for this temporal operator
-	 * 
-	 * @param length
-	 *            Length of the window
-	 * @param timeUnit
-	 *            Unit of time
-	 * @return The temporal operator with slide interval specified
-	 */
-	public T every(long length, TimeUnit timeUnit);
-
-	/**
-	 * Defines the slide interval for this temporal operator
-	 * 
-	 * @param length
-	 *            Length of the window
-	 * @return The temporal operator with slide interval specified
-	 */
-	public T every(long length);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
index 042fe18..edd8a34 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
@@ -18,37 +18,17 @@
 package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
-public class ReduceWindowFunction<K, W extends Window, T> extends RichWindowFunction<T, T, K, W> {
+public class ReduceWindowFunction<K, W extends Window, T>
+		extends WrappingFunction<ReduceFunction<T>>
+		implements WindowFunction<T, T, K, W> {
 	private static final long serialVersionUID = 1L;
 
-	private final ReduceFunction<T> reduceFunction;
-
 	public ReduceWindowFunction(ReduceFunction<T> reduceFunction) {
-		this.reduceFunction = reduceFunction;
-	}
-
-	@Override
-	public void setRuntimeContext(RuntimeContext ctx) {
-		super.setRuntimeContext(ctx);
-		FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		FunctionUtils.openFunction(reduceFunction, parameters);
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		FunctionUtils.closeFunction(reduceFunction);
+		super(reduceFunction);
 	}
 
 	@Override
@@ -59,7 +39,7 @@ public class ReduceWindowFunction<K, W extends Window, T> extends RichWindowFunc
 			if (result == null) {
 				result = v;
 			} else {
-				result = reduceFunction.reduce(result, v);
+				result = wrappedFunction.reduce(result, v);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
new file mode 100644
index 0000000..c06a608
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
@@ -0,0 +1,372 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.streaming.api;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
+
+	private static List<String> testResults;
+
+	@Test
+	public void testCoGroup() throws Exception {
+
+		testResults = Lists.newArrayList();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.getConfig().enableTimestamps();
+
+		DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple2.of("a", 0));
+				ctx.collect(Tuple2.of("a", 1));
+				ctx.collect(Tuple2.of("a", 2));
+
+				ctx.collect(Tuple2.of("b", 3));
+				ctx.collect(Tuple2.of("b", 4));
+				ctx.collect(Tuple2.of("b", 5));
+
+				ctx.collect(Tuple2.of("a", 6));
+				ctx.collect(Tuple2.of("a", 7));
+				ctx.collect(Tuple2.of("a", 8));
+			}
+
+			@Override
+			public void cancel() {
+			}
+		}).extractTimestamp(new Tuple2TimestampExtractor());
+
+		DataStream<Tuple2<String, Integer>> source2 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple2.of("a", 0));
+				ctx.collect(Tuple2.of("a", 1));
+
+				ctx.collect(Tuple2.of("b", 3));
+
+				ctx.collect(Tuple2.of("c", 6));
+				ctx.collect(Tuple2.of("c", 7));
+				ctx.collect(Tuple2.of("c", 8));
+			}
+
+			@Override
+			public void cancel() {
+			}
+		}).extractTimestamp(new Tuple2TimestampExtractor());
+
+
+		source1.coGroup(source2)
+				.where(new Tuple2KeyExtractor())
+				.equalTo(new Tuple2KeyExtractor())
+				.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+				.apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() {
+					@Override
+					public void coGroup(Iterable<Tuple2<String, Integer>> first,
+							Iterable<Tuple2<String, Integer>> second,
+							Collector<String> out) throws Exception {
+						StringBuilder result = new StringBuilder();
+						result.append("F:");
+						for (Tuple2<String, Integer> t: first) {
+							result.append(t.toString());
+						}
+						result.append(" S:");
+						for (Tuple2<String, Integer> t: second) {
+							result.append(t.toString());
+						}
+						out.collect(result.toString());
+					}
+				})
+				.addSink(new SinkFunction<String>() {
+					@Override
+					public void invoke(String value) throws Exception {
+						testResults.add(value);
+					}
+				});
+
+		env.execute("CoGroup Test");
+
+		List<String> expectedResult = Lists.newArrayList(
+				"F:(a,0)(a,1)(a,2) S:(a,0)(a,1)",
+				"F:(b,3)(b,4)(b,5) S:(b,3)",
+				"F:(a,6)(a,7)(a,8) S:",
+				"F: S:(c,6)(c,7)(c,8)");
+
+		Collections.sort(expectedResult);
+		Collections.sort(testResults);
+
+		Assert.assertEquals(expectedResult, testResults);
+	}
+
+	@Test
+	public void testJoin() throws Exception {
+
+		testResults = Lists.newArrayList();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.getConfig().enableTimestamps();
+
+		DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple3.of("a", "x", 0));
+				ctx.collect(Tuple3.of("a", "y", 1));
+				ctx.collect(Tuple3.of("a", "z", 2));
+
+				ctx.collect(Tuple3.of("b", "u", 3));
+				ctx.collect(Tuple3.of("b", "w", 5));
+
+				ctx.collect(Tuple3.of("a", "i", 6));
+				ctx.collect(Tuple3.of("a", "j", 7));
+				ctx.collect(Tuple3.of("a", "k", 8));
+			}
+
+			@Override
+			public void cancel() {
+			}
+		}).extractTimestamp(new Tuple3TimestampExtractor());
+
+		DataStream<Tuple3<String, String, Integer>> source2 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple3.of("a", "u", 0));
+				ctx.collect(Tuple3.of("a", "w", 1));
+
+				ctx.collect(Tuple3.of("b", "i", 3));
+				ctx.collect(Tuple3.of("b", "k", 5));
+
+				ctx.collect(Tuple3.of("a", "x", 6));
+				ctx.collect(Tuple3.of("a", "z", 8));
+			}
+
+			@Override
+			public void cancel() {
+			}
+		}).extractTimestamp(new Tuple3TimestampExtractor());
+
+
+		source1.join(source2)
+				.where(new Tuple3KeyExtractor())
+				.equalTo(new Tuple3KeyExtractor())
+				.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+				.apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
+					@Override
+					public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
+						return first + ":" + second;
+					}
+				})
+				.addSink(new SinkFunction<String>() {
+					@Override
+					public void invoke(String value) throws Exception {
+						testResults.add(value);
+					}
+				});
+
+		env.execute("Join Test");
+
+		List<String> expectedResult = Lists.newArrayList(
+				"(a,x,0):(a,u,0)",
+				"(a,x,0):(a,w,1)",
+				"(a,y,1):(a,u,0)",
+				"(a,y,1):(a,w,1)",
+				"(a,z,2):(a,u,0)",
+				"(a,z,2):(a,w,1)",
+				"(b,u,3):(b,i,3)",
+				"(b,u,3):(b,k,5)",
+				"(b,w,5):(b,i,3)",
+				"(b,w,5):(b,k,5)",
+				"(a,i,6):(a,x,6)",
+				"(a,i,6):(a,z,8)",
+				"(a,j,7):(a,x,6)",
+				"(a,j,7):(a,z,8)",
+				"(a,k,8):(a,x,6)",
+				"(a,k,8):(a,z,8)");
+
+		Collections.sort(expectedResult);
+		Collections.sort(testResults);
+
+		Assert.assertEquals(expectedResult, testResults);
+	}
+
+	@Test
+	public void testSelfJoin() throws Exception {
+
+		testResults = Lists.newArrayList();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.getConfig().enableTimestamps();
+
+		DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple3.of("a", "x", 0));
+				ctx.collect(Tuple3.of("a", "y", 1));
+				ctx.collect(Tuple3.of("a", "z", 2));
+
+				ctx.collect(Tuple3.of("b", "u", 3));
+				ctx.collect(Tuple3.of("b", "w", 5));
+
+				ctx.collect(Tuple3.of("a", "i", 6));
+				ctx.collect(Tuple3.of("a", "j", 7));
+				ctx.collect(Tuple3.of("a", "k", 8));
+			}
+
+			@Override
+			public void cancel() {
+			}
+		}).extractTimestamp(new Tuple3TimestampExtractor());
+
+		source1.join(source1)
+				.where(new Tuple3KeyExtractor())
+				.equalTo(new Tuple3KeyExtractor())
+				.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+				.apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
+					@Override
+					public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
+						return first + ":" + second;
+					}
+				})
+				.addSink(new SinkFunction<String>() {
+					@Override
+					public void invoke(String value) throws Exception {
+						testResults.add(value);
+					}
+				});
+
+		env.execute("Self-Join Test");
+
+		List<String> expectedResult = Lists.newArrayList(
+				"(a,x,0):(a,x,0)",
+				"(a,x,0):(a,y,1)",
+				"(a,x,0):(a,z,2)",
+				"(a,y,1):(a,x,0)",
+				"(a,y,1):(a,y,1)",
+				"(a,y,1):(a,z,2)",
+				"(a,z,2):(a,x,0)",
+				"(a,z,2):(a,y,1)",
+				"(a,z,2):(a,z,2)",
+				"(b,u,3):(b,u,3)",
+				"(b,u,3):(b,w,5)",
+				"(b,w,5):(b,u,3)",
+				"(b,w,5):(b,w,5)",
+				"(a,i,6):(a,i,6)",
+				"(a,i,6):(a,j,7)",
+				"(a,i,6):(a,k,8)",
+				"(a,j,7):(a,i,6)",
+				"(a,j,7):(a,j,7)",
+				"(a,j,7):(a,k,8)",
+				"(a,k,8):(a,i,6)",
+				"(a,k,8):(a,j,7)",
+				"(a,k,8):(a,k,8)");
+
+		Collections.sort(expectedResult);
+		Collections.sort(testResults);
+
+		Assert.assertEquals(expectedResult, testResults);
+	}
+
+	private static class Tuple2TimestampExtractor implements TimestampExtractor<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long extractTimestamp(Tuple2<String, Integer> element, long currentTimestamp) {
+			return element.f1;
+		}
+
+		@Override
+		public long emitWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
+			return element.f1 - 1;
+		}
+
+		@Override
+		public long getCurrentWatermark() {
+			return Long.MIN_VALUE;
+		}
+	}
+
+	private static class Tuple3TimestampExtractor implements TimestampExtractor<Tuple3<String, String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long extractTimestamp(Tuple3<String, String, Integer> element, long currentTimestamp) {
+			return element.f2;
+		}
+
+		@Override
+		public long emitWatermark(Tuple3<String, String, Integer> element, long currentTimestamp) {
+			return element.f2 - 1;
+		}
+
+		@Override
+		public long getCurrentWatermark() {
+			return Long.MIN_VALUE;
+		}
+	}
+
+	private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String,Integer>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Tuple2<String, Integer> value) throws Exception {
+			return value.f0;
+		}
+	}
+
+	private static class Tuple3KeyExtractor implements KeySelector<Tuple3<String, String, Integer>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Tuple3<String, String, Integer> value) throws Exception {
+			return value.f0;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
deleted file mode 100644
index 7d2a131..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class WindowCrossJoinTest extends StreamingMultipleProgramsTestBase {
-
-	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-
-	private static class MyTimestamp<T> implements Timestamp<T> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long getTimestamp(T value) {
-			return 101L;
-		}
-	}
-
-	/**
-	 * TODO: enable once new join operator is ready
-	 * @throws Exception
-	 */
-	@Ignore
-	@Test
-	public void test() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-		env.setBufferTimeout(1);
-
-		TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> joinResultSink =
-				new TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>>();
-		TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> crossResultSink =
-				new TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>>();
-
-		ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>();
-		ArrayList<Tuple1<Integer>> in2 = new ArrayList<Tuple1<Integer>>();
-
-		in1.add(new Tuple2<Integer, String>(10, "a"));
-		in1.add(new Tuple2<Integer, String>(20, "b"));
-		in1.add(new Tuple2<Integer, String>(20, "x"));
-		in1.add(new Tuple2<Integer, String>(0, "y"));
-
-		in2.add(new Tuple1<Integer>(0));
-		in2.add(new Tuple1<Integer>(5));
-		in2.add(new Tuple1<Integer>(20));
-
-		joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "b"), 20));
-		joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "x"), 20));
-		joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(0, "y"), 0));
-
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(10, "a"), 0));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(10, "a"), 5));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(10, "a"), 20));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "b"), 0));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "b"), 5));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "b"), 20));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "x"), 0));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "x"), 5));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "x"), 20));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(0, "y"), 0));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(0, "y"), 5));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(0, "y"), 20));
-
-		DataStream<Tuple2<Integer, String>> inStream1 = env.fromCollection(in1);
-		DataStream<Tuple1<Integer>> inStream2 = env.fromCollection(in2);
-
-		inStream1
-				.join(inStream2)
-				.onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(),
-						new MyTimestamp<Tuple1<Integer>>(), 100).where(0).equalTo(0)
-				.map(new ResultMap())
-				.addSink(joinResultSink);
-
-		env.execute();
-
-		assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinExpectedResults),
-				new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinResultSink.getResult()));
-		assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossExpectedResults),
-				new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossResultSink.getResult()));
-	}
-
-	private static class ResultMap implements
-			MapFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>,
-					Tuple2<Tuple2<Integer, String>, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Tuple2<Integer, String>, Integer> map(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) throws Exception {
-			return new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
index fc9de1d..c116c01 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
@@ -47,67 +47,6 @@ public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
 	private static List<String> expected;
 
 	/**
-	 * TODO: enable once new join operator is implemented
-	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	@Ignore
-	@Test
-	public void sameDataStreamTest() {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(3);
-
-		TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
-		Timestamp<Integer> timeStamp = new IntegerTimestamp();
-
-		KeySelector keySelector = new KeySelector<Integer, Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value;
-			}
-		};
-
-		DataStream<Integer> src = env.fromElements(1, 3, 5);
-
-		@SuppressWarnings("unused")
-		DataStreamSink<Tuple2<Integer, Integer>> dataStream =
-				src.join(src).onWindow(50L, timeStamp, timeStamp).where(keySelector).equalTo(keySelector)
-						.map(new MapFunction<Tuple2<Integer, Integer>, String>() {
-
-							private static final long serialVersionUID = 1L;
-
-							@Override
-							public String map(Tuple2<Integer, Integer> value) throws Exception {
-								return value.toString();
-							}
-						})
-						.addSink(resultSink);
-
-
-		try {
-			env.execute();
-
-			expected = new ArrayList<String>();
-
-			expected.addAll(Arrays.asList("(1,1)", "(3,3)", "(5,5)"));
-
-			List<String> result = resultSink.getResult();
-
-			Collections.sort(expected);
-			Collections.sort(result);
-
-			assertEquals(expected, result);
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail();
-		}
-	}
-
-	/**
 	 * We connect two different data streams in a chain to a CoMap.
 	 */
 	@Test