You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 22:33:53 UTC

[4/8] flink git commit: [FLINK-2819] Add Windowed Join/CoGroup Operator Based on Tagged Union

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 5230e9b..8abf9d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -19,31 +19,36 @@ package org.apache.flink.streaming.examples.join;
 
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
 
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Example illustrating join over sliding windows of streams in Flink.
- * <p/>
+ *
  * <p>
- * his example will join two streams with a sliding window. One which emits
- * grades and one which emits salaries of people.
- * </p>
- * <p/>
- * <p/>
+ * This example will join two streams with a sliding window. One which emits grades and one which
+ * emits salaries of people. The input format for both sources has an additional timestamp
+ * as field 0. This is used to to event-time windowing. Time timestamps must be
+ * monotonically increasing.
+ *
  * This example shows how to:
  * <ul>
- * <li>do windowed joins,
- * <li>use tuple data types,
- * <li>write a simple streaming program.
+ *   <li>do windowed joins,
+ *   <li>use tuple data types,
+ *   <li>write a simple streaming program.
+ * </ul>
  */
 public class WindowJoin {
 
@@ -51,9 +56,6 @@ public class WindowJoin {
 	// PROGRAM
 	// *************************************************************************
 
-	private static DataStream<Tuple2<String, Integer>> grades;
-	private static DataStream<Tuple2<String, Integer>> salaries;
-
 	public static void main(String[] args) throws Exception {
 
 		if (!parseParameters(args)) {
@@ -62,18 +64,25 @@ public class WindowJoin {
 
 		// obtain execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableTimestamps();
 
 		// connect to the data sources for grades and salaries
-		setInputStreams(env);
+		Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> input = getInputStreams(env);
+		DataStream<Tuple3<Long, String, Integer>> grades = input.f0;
+		DataStream<Tuple3<Long, String, Integer>> salaries = input.f1;
+
+		// extract the timestamps
+		grades = grades.extractTimestamp(new MyTimestampExtractor());
+		salaries = salaries.extractTimestamp(new MyTimestampExtractor());
 
 		// apply a temporal join over the two stream based on the names over one
 		// second windows
 		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades
 				.join(salaries)
-				.onWindow(1, new MyTimestamp(0), new MyTimestamp(0))
-				.where(0)
-				.equalTo(0)
-				.with(new MyJoinFunction());
+				.where(new NameKeySelector())
+				.equalTo(new NameKeySelector())
+				.window(TumblingTimeWindows.of(Time.of(5, TimeUnit.MILLISECONDS)))
+				.apply(new MyJoinFunction());
 
 		// emit result
 		if (fileOutput) {
@@ -98,24 +107,25 @@ public class WindowJoin {
 	/**
 	 * Continuously emit tuples with random names and integers (grades).
 	 */
-	public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
+	public static class GradeSource implements SourceFunction<Tuple3<Long, String, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		private Random rand;
-		private Tuple2<String, Integer> outTuple;
+		private Tuple3<Long, String, Integer> outTuple;
 		private volatile boolean isRunning = true;
 		private int counter;
 
 		public GradeSource() {
 			rand = new Random();
-			outTuple = new Tuple2<String, Integer>();
+			outTuple = new Tuple3<>();
 		}
 
 		@Override
-		public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+		public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
 			while (isRunning && counter < 100) {
-				outTuple.f0 = names[rand.nextInt(names.length)];
-				outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
+				outTuple.f0 = System.currentTimeMillis();
+				outTuple.f1 = names[rand.nextInt(names.length)];
+				outTuple.f2 = rand.nextInt(GRADE_COUNT) + 1;
 				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
 				counter++;
 				ctx.collect(outTuple);
@@ -131,27 +141,28 @@ public class WindowJoin {
 	/**
 	 * Continuously emit tuples with random names and integers (salaries).
 	 */
-	public static class SalarySource extends RichSourceFunction<Tuple2<String, Integer>> {
+	public static class SalarySource extends RichSourceFunction<Tuple3<Long, String, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		private transient Random rand;
-		private transient Tuple2<String, Integer> outTuple;
+		private transient Tuple3<Long, String, Integer> outTuple;
 		private volatile boolean isRunning;
 		private int counter;
 
 		public void open(Configuration parameters) throws Exception {
 			super.open(parameters);
 			rand = new Random();
-			outTuple = new Tuple2<String, Integer>();
+			outTuple = new Tuple3<Long, String, Integer>();
 			isRunning = true;
 		}
 
 
 		@Override
-		public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+		public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
 			while (isRunning && counter < 100) {
-				outTuple.f0 = names[rand.nextInt(names.length)];
-				outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
+				outTuple.f0 = System.currentTimeMillis();
+				outTuple.f1 = names[rand.nextInt(names.length)];
+				outTuple.f2 = rand.nextInt(SALARY_MAX) + 1;
 				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
 				counter++;
 				ctx.collect(outTuple);
@@ -164,7 +175,7 @@ public class WindowJoin {
 		}
 	}
 
-	public static class MySourceMap extends RichMapFunction<String, Tuple2<String, Integer>> {
+	public static class MySourceMap extends RichMapFunction<String, Tuple3<Long, String, Integer>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -175,44 +186,55 @@ public class WindowJoin {
 		}
 
 		@Override
-		public Tuple2<String, Integer> map(String line) throws Exception {
+		public Tuple3<Long, String, Integer> map(String line) throws Exception {
 			record = line.substring(1, line.length() - 1).split(",");
-			return new Tuple2<String, Integer>(record[0], Integer.parseInt(record[1]));
+			return new Tuple3<>(Long.parseLong(record[0]), record[1], Integer.parseInt(record[2]));
 		}
 	}
 
 	public static class MyJoinFunction
 			implements
-			JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
+			JoinFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple3<String, Integer, Integer>> {
 
 		private static final long serialVersionUID = 1L;
 
-		private Tuple3<String, Integer, Integer> joined = new Tuple3<String, Integer, Integer>();
+		private Tuple3<String, Integer, Integer> joined = new Tuple3<>();
 
 		@Override
-		public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first,
-				Tuple2<String, Integer> second) throws Exception {
-			joined.f0 = first.f0;
-			joined.f1 = first.f1;
-			joined.f2 = second.f1;
+		public Tuple3<String, Integer, Integer> join(Tuple3<Long, String, Integer> first,
+				Tuple3<Long, String, Integer> second) throws Exception {
+			joined.f0 = first.f1;
+			joined.f1 = first.f2;
+			joined.f2 = second.f2;
 			return joined;
 		}
 	}
 
-	public static class MyTimestamp implements Timestamp<Tuple2<String, Integer>> {
-
+	private static class MyTimestampExtractor implements TimestampExtractor<Tuple3<Long, String, Integer>> {
 		private static final long serialVersionUID = 1L;
 
-		private int counter;
+		@Override
+		public long extractTimestamp(Tuple3<Long, String, Integer> element, long currentTimestamp) {
+			return element.f0;
+		}
 
-		public MyTimestamp(int starttime) {
-			this.counter = starttime;
+		@Override
+		public long emitWatermark(Tuple3<Long, String, Integer> element, long currentTimestamp) {
+			return element.f0 - 1;
 		}
 
 		@Override
-		public long getTimestamp(Tuple2<String, Integer> value) {
-			counter += SLEEP_TIME;
-			return counter;
+		public long getCurrentWatermark() {
+			return Long.MIN_VALUE;
+		}
+	}
+
+	private static class NameKeySelector implements KeySelector<Tuple3<Long, String, Integer>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Tuple3<Long, String, Integer> value) throws Exception {
+			return value.f1;
 		}
 	}
 
@@ -253,7 +275,12 @@ public class WindowJoin {
 		return true;
 	}
 
-	private static void setInputStreams(StreamExecutionEnvironment env) {
+	private static Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> getInputStreams(
+			StreamExecutionEnvironment env) {
+
+		DataStream<Tuple3<Long, String, Integer>> grades;
+		DataStream<Tuple3<Long, String, Integer>> salaries;
+
 		if (fileInput) {
 			grades = env.readTextFile(gradesPath).map(new MySourceMap());
 			salaries = env.readTextFile(salariesPath).map(new MySourceMap());
@@ -261,5 +288,8 @@ public class WindowJoin {
 			grades = env.addSource(new GradeSource());
 			salaries = env.addSource(new SalarySource());
 		}
+
+		return Tuple2.of(grades, salaries);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
index 23d29b1..15c1280 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
@@ -19,42 +19,42 @@ package org.apache.flink.streaming.examples.join.util;
 
 public class WindowJoinData {
 
-	public static final String GRADES_INPUT = "(john,5)\n" + "(tom,3)\n" + "(alice,1)\n" + "(grace,5)\n" +
-			"(john,4)\n" + "(bob,1)\n" + "(alice,2)\n" + "(alice,3)\n" + "(bob,5)\n" + "(alice,3)\n" + "(tom,5)\n" +
-			"(john,2)\n" + "(john,1)\n" + "(grace,2)\n" + "(jerry,2)\n" + "(tom,4)\n" + "(bob,4)\n" + "(bob,2)\n" +
-			"(tom,2)\n" + "(alice,5)\n" + "(grace,5)\n" + "(grace,1)\n" + "(alice,1)\n" + "(grace,3)\n" + "(tom,1)\n" +
-			"(jerry,5)\n" + "(john,3)\n" + "(john,4)\n" + "(john,1)\n" + "(jerry,3)\n" + "(grace,3)\n" + "(bob,3)\n" +
-			"(john,3)\n" + "(jerry,4)\n" + "(tom,5)\n" + "(tom,4)\n" + "(john,2)\n" + "(jerry,1)\n" + "(bob,1)\n" +
-			"(john,5)\n" + "(grace,4)\n" + "(tom,5)\n" + "(john,4)\n" + "(tom,1)\n" + "(grace,1)\n" + "(john,2)\n" +
-			"(jerry,3)\n" + "(jerry,5)\n" + "(tom,2)\n" + "(tom,2)\n" + "(alice,4)\n" + "(tom,4)\n" + "(jerry,4)\n" +
-			"(john,3)\n" + "(grace,4)\n" + "(tom,3)\n" + "(jerry,4)\n" + "(john,5)\n" + "(john,4)\n" + "(jerry,1)\n" +
-			"(john,5)\n" + "(alice,2)\n" + "(tom,1)\n" + "(alice,5)\n" + "(grace,4)\n" + "(bob,4)\n" + "(jerry,1)\n" +
-			"(john,5)\n" + "(tom,4)\n" + "(tom,5)\n" + "(jerry,5)\n" + "(tom,1)\n" + "(grace,3)\n" + "(bob,5)\n" +
-			"(john,1)\n" + "(alice,1)\n" + "(grace,3)\n" + "(grace,1)\n" + "(jerry,1)\n" + "(jerry,4)\n" +
-			"(bob,4)\n" + "(alice,3)\n" + "(tom,5)\n" + "(alice,4)\n" + "(alice,4)\n" + "(grace,4)\n" + "(john,5)\n" +
-			"(john,5)\n" + "(grace,4)\n" + "(tom,4)\n" + "(john,4)\n" + "(john,5)\n" + "(alice,5)\n" + "(jerry,5)\n" +
-			"(john,3)\n" + "(tom,5)\n" + "(jerry,4)\n" + "(grace,4)\n" + "(john,3)\n" + "(bob,2)";
+	public static final String GRADES_INPUT = "(0,john,5)\n" + "(0,tom,3)\n" + "(0,alice,1)\n" + "(0,grace,5)\n" +
+			"(1,john,4)\n" + "(1,bob,1)\n" + "(1,alice,2)\n" + "(1,alice,3)\n" + "(1,bob,5)\n" + "(1,alice,3)\n" + "(1,tom,5)\n" +
+			"(2,john,2)\n" + "(2,john,1)\n" + "(2,grace,2)\n" + "(2,jerry,2)\n" + "(2,tom,4)\n" + "(2,bob,4)\n" + "(2,bob,2)\n" +
+			"(3, tom,2)\n" + "(3,alice,5)\n" + "(3,grace,5)\n" + "(3,grace,1)\n" + "(3,alice,1)\n" + "(3,grace,3)\n" + "(3,tom,1)\n" +
+			"(4,jerry,5)\n" + "(4,john,3)\n" + "(4,john,4)\n" + "(4,john,1)\n" + "(4,jerry,3)\n" + "(4,grace,3)\n" + "(4,bob,3)\n" +
+			"(5,john,3)\n" + "(5,jerry,4)\n" + "(5,tom,5)\n" + "(5,tom,4)\n" + "(5,john,2)\n" + "(5,jerry,1)\n" + "(5,bob,1)\n" +
+			"(6,john,5)\n" + "(6,grace,4)\n" + "(6,tom,5)\n" + "(6,john,4)\n" + "(6,tom,1)\n" + "(6,grace,1)\n" + "(6,john,2)\n" +
+			"(7,jerry,3)\n" + "(7,jerry,5)\n" + "(7,tom,2)\n" + "(7,tom,2)\n" + "(7,alice,4)\n" + "(7,tom,4)\n" + "(7,jerry,4)\n" +
+			"(8,john,3)\n" + "(8,grace,4)\n" + "(8,tom,3)\n" + "(8,jerry,4)\n" + "(8,john,5)\n" + "(8,john,4)\n" + "(8,jerry,1)\n" +
+			"(9,john,5)\n" + "(9,alice,2)\n" + "(9,tom,1)\n" + "(9,alice,5)\n" + "(9,grace,4)\n" + "(9,bob,4)\n" + "(9,jerry,1)\n" +
+			"(10,john,5)\n" + "(10,tom,4)\n" + "(10,tom,5)\n" + "(10,jerry,5)\n" + "(10,tom,1)\n" + "(10,grace,3)\n" + "(10,bob,5)\n" +
+			"(11,john,1)\n" + "(11,alice,1)\n" + "(11,grace,3)\n" + "(11,grace,1)\n" + "(11,jerry,1)\n" + "(11,jerry,4)\n" +
+			"(12,bob,4)\n" + "(12,alice,3)\n" + "(12,tom,5)\n" + "(12,alice,4)\n" + "(12,alice,4)\n" + "(12,grace,4)\n" + "(12,john,5)\n" +
+			"(13,john,5)\n" + "(13,grace,4)\n" + "(13,tom,4)\n" + "(13,john,4)\n" + "(13,john,5)\n" + "(13,alice,5)\n" + "(13,jerry,5)\n" +
+			"(14,john,3)\n" + "(14,tom,5)\n" + "(14,jerry,4)\n" + "(14,grace,4)\n" + "(14,john,3)\n" + "(14,bob,2)";
 
-	public static final String SALARIES_INPUT = "(john,6469)\n" + "(jerry,6760)\n" + "(jerry,8069)\n" +
-			"(tom,3662)\n" + "(grace,8427)\n" + "(john,9425)\n" + "(bob,9018)\n" + "(john,352)\n" + "(tom,3770)\n" +
-			"(grace,7622)\n" + "(jerry,7441)\n" + "(alice,1468)\n" + "(bob,5472)\n" + "(grace,898)\n" +
-			"(tom,3849)\n" + "(grace,1865)\n" + "(alice,5582)\n" + "(john,9511)\n" + "(alice,1541)\n" +
-			"(john,2477)\n" + "(grace,3561)\n" + "(john,1670)\n" + "(grace,7290)\n" + "(grace,6565)\n" +
-			"(tom,6179)\n" + "(tom,1601)\n" + "(john,2940)\n" + "(bob,4685)\n" + "(bob,710)\n" + "(bob,5936)\n" +
-			"(jerry,1412)\n" + "(grace,6515)\n" + "(grace,3321)\n" + "(tom,8088)\n" + "(john,2876)\n" +
-			"(bob,9896)\n" + "(grace,7368)\n" + "(grace,9749)\n" + "(bob,2048)\n" + "(alice,4782)\n" +
-			"(alice,3375)\n" + "(tom,5841)\n" + "(bob,958)\n" + "(bob,5258)\n" + "(tom,3935)\n" + "(jerry,4394)\n" +
-			"(alice,102)\n" + "(alice,4931)\n" + "(alice,5240)\n" + "(jerry,7951)\n" + "(john,5675)\n" +
-			"(bob,609)\n" + "(alice,5997)\n" + "(jerry,9651)\n" + "(alice,1328)\n" + "(bob,1022)\n" +
-			"(grace,2578)\n" + "(jerry,9704)\n" + "(tom,4476)\n" + "(grace,3784)\n" + "(alice,6144)\n" +
-			"(bob,6213)\n" + "(alice,7525)\n" + "(jerry,2908)\n" + "(grace,8464)\n" + "(jerry,9920)\n" +
-			"(bob,3720)\n" + "(bob,7612)\n" + "(alice,7211)\n" + "(jerry,6484)\n" + "(alice,1711)\n" +
-			"(jerry,5994)\n" + "(grace,928)\n" + "(jerry,2492)\n" + "(grace,9080)\n" + "(tom,4330)\n" +
-			"(bob,8302)\n" + "(john,4981)\n" + "(tom,1781)\n" + "(grace,1379)\n" + "(jerry,3700)\n" +
-			"(jerry,3584)\n" + "(jerry,2038)\n" + "(jerry,3902)\n" + "(tom,1336)\n" + "(jerry,7500)\n" +
-			"(tom,3648)\n" + "(alice,2533)\n" + "(tom,8685)\n" + "(bob,3968)\n" + "(tom,3241)\n" + "(bob,7461)\n" +
-			"(jerry,2138)\n" + "(alice,7503)\n" + "(alice,6424)\n" + "(tom,140)\n" + "(john,9802)\n" +
-			"(grace,2977)\n" + "(grace,889)\n" + "(john,1338)";
+	public static final String SALARIES_INPUT = "(0,john,6469)\n" + "(0,jerry,6760)\n" + "(0,jerry,8069)\n" +
+			"(1,tom,3662)\n" + "(1,grace,8427)\n" + "(1,john,9425)\n" + "(1,bob,9018)\n" + "(1,john,352)\n" + "(1,tom,3770)\n" +
+			"(2,grace,7622)\n" + "(2,jerry,7441)\n" + "(2,alice,1468)\n" + "(2,bob,5472)\n" + "(2,grace,898)\n" +
+			"(3,tom,3849)\n" + "(3,grace,1865)\n" + "(3,alice,5582)\n" + "(3,john,9511)\n" + "(3,alice,1541)\n" +
+			"(4,john,2477)\n" + "(4,grace,3561)\n" + "(4,john,1670)\n" + "(4,grace,7290)\n" + "(4,grace,6565)\n" +
+			"(5,tom,6179)\n" + "(5,tom,1601)\n" + "(5,john,2940)\n" + "(5,bob,4685)\n" + "(5,bob,710)\n" + "(5,bob,5936)\n" +
+			"(6,jerry,1412)\n" + "(6,grace,6515)\n" + "(6,grace,3321)\n" + "(6,tom,8088)\n" + "(6,john,2876)\n" +
+			"(7,bob,9896)\n" + "(7,grace,7368)\n" + "(7,grace,9749)\n" + "(7,bob,2048)\n" + "(7,alice,4782)\n" +
+			"(8,alice,3375)\n" + "(8,tom,5841)\n" + "(8,bob,958)\n" + "(8,bob,5258)\n" + "(8,tom,3935)\n" + "(8,jerry,4394)\n" +
+			"(9,alice,102)\n" + "(9,alice,4931)\n" + "(9,alice,5240)\n" + "(9,jerry,7951)\n" + "(9,john,5675)\n" +
+			"(10,bob,609)\n" + "(10,alice,5997)\n" + "(10,jerry,9651)\n" + "(10,alice,1328)\n" + "(10,bob,1022)\n" +
+			"(11,grace,2578)\n" + "(11,jerry,9704)\n" + "(11,tom,4476)\n" + "(11,grace,3784)\n" + "(11,alice,6144)\n" +
+			"(12,bob,6213)\n" + "(12,alice,7525)\n" + "(12,jerry,2908)\n" + "(12,grace,8464)\n" + "(12,jerry,9920)\n" +
+			"(13,bob,3720)\n" + "(13,bob,7612)\n" + "(13,alice,7211)\n" + "(13,jerry,6484)\n" + "(13,alice,1711)\n" +
+			"(14,jerry,5994)\n" + "(14,grace,928)\n" + "(14,jerry,2492)\n" + "(14,grace,9080)\n" + "(14,tom,4330)\n" +
+			"(15,bob,8302)\n" + "(15,john,4981)\n" + "(15,tom,1781)\n" + "(15,grace,1379)\n" + "(15,jerry,3700)\n" +
+			"(16,jerry,3584)\n" + "(16,jerry,2038)\n" + "(16,jerry,3902)\n" + "(16,tom,1336)\n" + "(16,jerry,7500)\n" +
+			"(17,tom,3648)\n" + "(17,alice,2533)\n" + "(17,tom,8685)\n" + "(17,bob,3968)\n" + "(17,tom,3241)\n" + "(17,bob,7461)\n" +
+			"(18,jerry,2138)\n" + "(18,alice,7503)\n" + "(18,alice,6424)\n" + "(18,tom,140)\n" + "(18,john,9802)\n" +
+			"(19,grace,2977)\n" + "(19,grace,889)\n" + "(19,john,1338)";
 
 	private WindowJoinData() {
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 239f1fa..225dab7 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.scala.examples.join
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
 
 import scala.Stream._
 import scala.language.postfixOps
@@ -32,8 +34,8 @@ object WindowJoin {
   // PROGRAM
   // *************************************************************************
 
-  case class Grade(name: String, grade: Int)
-  case class Salary(name: String, salary: Int)
+  case class Grade(time: Long, name: String, grade: Int)
+  case class Salary(time: Long, name: String, salary: Int)
   case class Person(name: String, grade: Int, salary: Int)
 
   def main(args: Array[String]) {
@@ -43,6 +45,7 @@ object WindowJoin {
     }
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableTimestamps()
 
     //Create streams for grades and salaries by mapping the inputs to the corresponding objects
     val grades = setGradesInput(env)
@@ -50,11 +53,11 @@ object WindowJoin {
 
     //Join the two input streams by name on the last 2 seconds every second and create new
     //Person objects containing both grade and salary
-    val joined =
-      grades.join(salaries).onWindow(2, TimeUnit.SECONDS)
-                      .every(1, TimeUnit.SECONDS)
-                      .where("name")
-                      .equalTo("name") { (g, s) => Person(g.name, g.grade, s.salary) }
+    val joined = grades.join(salaries)
+        .where(_.name)
+        .equalTo(_.name)
+        .window(SlidingTimeWindows.of(Time.of(2, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)))
+        .apply { (g, s) => Person(g.name, g.grade, s.salary) }
 
     if (fileOutput) {
       joined.writeAsText(outputPath)
@@ -74,27 +77,27 @@ object WindowJoin {
   val salaryMax = 10000
   val sleepInterval = 100
   
-  def gradeStream(): Stream[(String, Int)] = {
-    def gradeMapper(names: Array[String])(x: Int): (String, Int) =
+  def gradeStream: Stream[(Long, String, Int)] = {
+    def gradeMapper(names: Array[String])(x: Int): (Long, String, Int) =
       {
         if (x % sleepInterval == 0) Thread.sleep(sleepInterval)
-        (names(Random.nextInt(names.length)), Random.nextInt(gradeCount))
+        (System.currentTimeMillis(),names(Random.nextInt(names.length)),Random.nextInt(gradeCount))
       }
     range(1, 100).map(gradeMapper(names))
   }
 
-  def salaryStream(): Stream[(String, Int)] = {
-    def salaryMapper(x: Int): (String, Int) =
+  def salaryStream: Stream[(Long, String, Int)] = {
+    def salaryMapper(x: Int): (Long, String, Int) =
       {
         if (x % sleepInterval == 0) Thread.sleep(sleepInterval)
-        (names(Random.nextInt(names.length)), Random.nextInt(salaryMax))
+        (System.currentTimeMillis(), names(Random.nextInt(names.length)), Random.nextInt(salaryMax))
       }
     range(1, 100).map(salaryMapper)
   }
 
-  def parseMap(line : String): (String, Int) = {
+  def parseMap(line : String): (Long, String, Int) = {
     val record = line.substring(1, line.length - 1).split(",")
-    (record(0), record(1).toInt)
+    (record(0).toLong, record(1), record(2).toInt)
   }
 
   // *************************************************************************
@@ -130,23 +133,23 @@ object WindowJoin {
       System.out.println("  Provide parameter to write to file.")
       System.out.println("  Usage: WindowJoin <result path>")
     }
-    return true
+    true
   }
 
   private def setGradesInput(env: StreamExecutionEnvironment) : DataStream[Grade] = {
     if (fileInput) {
-      env.readTextFile(gradesPath).map(parseMap(_)).map(x => Grade(x._1, x._2))
+      env.readTextFile(gradesPath).map(parseMap _ ).map(x => Grade(x._1, x._2, x._3))
     } else {
-      env.fromCollection(gradeStream).map(x => Grade(x._1, x._2))
+      env.fromCollection(gradeStream).map(x => Grade(x._1, x._2, x._3))
     }
   }
 
   private def setSalariesInput(env: StreamExecutionEnvironment) : DataStream[Salary] = {
     if (fileInput) {
-      env.readTextFile(salariesPath).map(parseMap(_)).map(x => Salary(x._1, x._2))
+      env.readTextFile(salariesPath).map(parseMap _).map(x => Salary(x._1, x._2, x._3))
     }
     else {
-      env.fromCollection(salaryStream).map(x => Salary(x._1, x._2))
+      env.fromCollection(salaryStream).map(x => Salary(x._1, x._2, x._3))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
index aae4b93..e657b67 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
@@ -1,51 +1,50 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-// TODO: reactivate once we have new join implementation
-//package org.apache.flink.streaming.test.exampleJavaPrograms.join;
-//
-//import org.apache.flink.streaming.examples.join.WindowJoin;
-//import org.apache.flink.streaming.examples.join.util.WindowJoinData;
-//import org.apache.flink.streaming.util.StreamingProgramTestBase;
-//
-//public class WindowJoinITCase extends StreamingProgramTestBase {
-//
-//	protected String gradesPath;
-//	protected String salariesPath;
-//	protected String resultPath;
-//
-//	@Override
-//	protected void preSubmit() throws Exception {
-//		gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
-//		salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
-//		resultPath = getTempDirPath("result");
-//	}
-//
-//	@Override
-//	protected void postSubmit() throws Exception {
-//		// since the two sides of the join might have different speed
-//		// the exact output can not be checked just whether it is well-formed
-//		// checks that the result lines look like e.g. (bob, 2, 2015)
-//		checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)");
-//	}
-//
-//	@Override
-//	protected void testProgram() throws Exception {
-//		WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
-//	}
-//}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.join;
+
+import org.apache.flink.streaming.examples.join.WindowJoin;
+import org.apache.flink.streaming.examples.join.util.WindowJoinData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class WindowJoinITCase extends StreamingProgramTestBase {
+
+	protected String gradesPath;
+	protected String salariesPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
+		salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		// since the two sides of the join might have different speed
+		// the exact output can not be checked just whether it is well-formed
+		// checks that the result lines look like e.g. (bob, 2, 2015)
+		checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
index 0aa884f..08ce890 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
@@ -1,51 +1,50 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-// TODO: reactivate once we have new join implementation
-//package org.apache.flink.streaming.test.exampleScalaPrograms.join;
-//
-//import org.apache.flink.streaming.scala.examples.join.WindowJoin;
-//import org.apache.flink.streaming.examples.join.util.WindowJoinData;
-//import org.apache.flink.streaming.util.StreamingProgramTestBase;
-//
-//public class WindowJoinITCase extends StreamingProgramTestBase {
-//
-//	protected String gradesPath;
-//	protected String salariesPath;
-//	protected String resultPath;
-//
-//	@Override
-//	protected void preSubmit() throws Exception {
-//		gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
-//		salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
-//		resultPath = getTempDirPath("result");
-//	}
-//
-//	@Override
-//	protected void postSubmit() throws Exception {
-//		// since the two sides of the join might have different speed
-//		// the exact output can not be checked just whether it is well-formed
-//		// checks that the result lines look like e.g. Person(bob, 2, 2015)
-//		checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)");
-//	}
-//
-//	@Override
-//	protected void testProgram() throws Exception {
-//		WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
-//	}
-//}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleScalaPrograms.join;
+
+import org.apache.flink.streaming.scala.examples.join.WindowJoin;
+import org.apache.flink.streaming.examples.join.util.WindowJoinData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class WindowJoinITCase extends StreamingProgramTestBase {
+
+	protected String gradesPath;
+	protected String salariesPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
+		salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		// since the two sides of the join might have different speed
+		// the exact output can not be checked just whether it is well-formed
+		// checks that the result lines look like e.g. Person(bob, 2, 2015)
+		checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
new file mode 100644
index 0000000..1b16e44
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.{CoGroupedStreams => JavaCoGroupedStreams}
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+
+import scala.reflect.ClassTag
+
+/**
+ * `CoGroupedStreams` represents two [[DataStream]]s that have been co-grouped.
+ * A streaming co-group operation is evaluated over elements in a window.
+ *
+ * To finalize the co-group operation you also need to specify a [[KeySelector]] for
+ * both the first and second input and a [[WindowAssigner]]
+ *
+ * Note: Right now, the groups are being built in memory so you need to ensure that they don't
+ * get too big. Otherwise the JVM might crash.
+ *
+ * Example:
+ *
+ * {{{
+ * val one: DataStream[(String, Int)]  = ...
+ * val two: DataStream[(String, Int)] = ...
+ *
+ * val result = one.coGroup(two)
+ *     .where(new MyFirstKeySelector())
+ *     .equalTo(new MyFirstKeySelector())
+ *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ *     .apply(new MyCoGroupFunction())
+ * } }}}
+ */
+object CoGroupedStreams {
+
+  /**
+   * A co-group operation that does not yet have its [[KeySelector]]s defined.
+   *
+   * @tparam T1 Type of the elements from the first input
+   * @tparam T2 Type of the elements from the second input
+   */
+  class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
+
+    /**
+     * Specifies a [[KeySelector]] for elements from the first input.
+     */
+    def where[KEY](keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
+      val cleanFun = clean(keySelector)
+      val javaSelector = new KeySelector[T1, KEY] {
+        def getKey(in: T1) = cleanFun(in)
+      }
+      new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, null)
+    }
+
+    /**
+     * Specifies a [[KeySelector]] for elements from the second input.
+     */
+    def equalTo[KEY](keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
+      val cleanFun = clean(keySelector)
+      val javaSelector = new KeySelector[T2, KEY] {
+        def getKey(in: T2) = cleanFun(in)
+      }
+      new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, null, javaSelector)
+    }
+
+    /**
+     * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+     */
+    private[flink] def clean[F <: AnyRef](f: F): F = {
+      new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+    }
+  }
+
+  /**
+   * A co-group operation that has [[KeySelector]]s defined for either both or
+   * one input.
+   *
+   * You need to specify a [[KeySelector]] for both inputs using [[where()]] and [[equalTo()]]
+   * before you can proceeed with specifying a [[WindowAssigner]] using [[window()]].
+   *
+   * @tparam T1 Type of the elements from the first input
+   * @tparam T2 Type of the elements from the second input
+   * @tparam KEY Type of the key. This must be the same for both inputs
+   */
+  class WithKey[T1, T2, KEY](
+      input1: DataStream[T1],
+      input2: DataStream[T2],
+      keySelector1: KeySelector[T1, KEY],
+      keySelector2: KeySelector[T2, KEY]) {
+
+    /**
+     * Specifies a [[KeySelector]] for elements from the first input.
+     */
+    def where(keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
+      val cleanFun = clean(keySelector)
+      val javaSelector = new KeySelector[T1, KEY] {
+        def getKey(in: T1) = cleanFun(in)
+      }
+      new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2)
+    }
+
+    /**
+     * Specifies a [[KeySelector]] for elements from the second input.
+     */
+    def equalTo(keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
+      val cleanFun = clean(keySelector)
+      val javaSelector = new KeySelector[T2, KEY] {
+        def getKey(in: T2) = cleanFun(in)
+      }
+      new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector)
+    }
+
+    /**
+     * Specifies the window on which the co-group operation works.
+     */
+    def window[W <: Window](
+        assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W])
+        : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
+      if (keySelector1 == null || keySelector2 == null) {
+        throw new UnsupportedOperationException("You first need to specify KeySelectors for both" +
+          "inputs using where() and equalTo().")
+      }
+      new CoGroupedStreams.WithWindow[T1, T2, KEY, W](
+        input1,
+        input2,
+        keySelector1,
+        keySelector2,
+        clean(assigner),
+        null,
+        null)
+    }
+
+    /**
+     * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+     */
+    private[flink] def clean[F <: AnyRef](f: F): F = {
+      new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+    }
+  }
+
+  /**
+   * A co-group operation that has [[KeySelector]]s defined for both inputs as
+   * well as a [[WindowAssigner]].
+   *
+   * @tparam T1 Type of the elements from the first input
+   * @tparam T2 Type of the elements from the second input
+   * @tparam KEY Type of the key. This must be the same for both inputs
+   * @tparam W Type of { @link Window} on which the co-group operation works.
+   */
+  class WithWindow[T1, T2, KEY, W <: Window](
+      input1: DataStream[T1],
+      input2: DataStream[T2],
+      keySelector1: KeySelector[T1, KEY],
+      keySelector2: KeySelector[T2, KEY],
+      windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
+      trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
+      evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) {
+
+
+    /**
+     * Sets the [[Trigger]] that should be used to trigger window emission.
+     */
+    def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
+    : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
+      new WithWindow[T1, T2, KEY, W](
+        input1,
+        input2,
+        keySelector1,
+        keySelector2,
+        windowAssigner,
+        newTrigger,
+        evictor)
+    }
+
+    /**
+     * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
+     *
+     * Note: When using an evictor window performance will degrade significantly, since
+     * pre-aggregation of window results cannot be used.
+     */
+    def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
+    : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
+      new WithWindow[T1, T2, KEY, W](
+        input1,
+        input2,
+        keySelector1,
+        keySelector2,
+        windowAssigner,
+        trigger,
+        newEvictor)
+    }
+
+    /**
+     * Completes the co-group operation with the user function that is executed
+     * for windowed groups.
+     */
+    def apply[O: TypeInformation: ClassTag](
+        fun: (Iterator[T1], Iterator[T2]) => O): DataStream[O] = {
+      require(fun != null, "CoGroup function must not be null.")
+
+      val coGrouper = new CoGroupFunction[T1, T2, O] {
+        val cleanFun = clean(fun)
+        def coGroup(
+            left: java.lang.Iterable[T1],
+            right: java.lang.Iterable[T2], out: Collector[O]) = {
+          out.collect(cleanFun(left.iterator().asScala, right.iterator().asScala))
+        }
+      }
+      apply(coGrouper)
+    }
+
+    /**
+     * Completes the co-group operation with the user function that is executed
+     * for windowed groups.
+     */
+    def apply[O: TypeInformation: ClassTag](
+        fun: (Iterator[T1], Iterator[T2], Collector[O]) => Unit): DataStream[O] = {
+      require(fun != null, "CoGroup function must not be null.")
+
+      val coGrouper = new CoGroupFunction[T1, T2, O] {
+        val cleanFun = clean(fun)
+        def coGroup(
+            left: java.lang.Iterable[T1],
+            right: java.lang.Iterable[T2], out: Collector[O]) = {
+          cleanFun(left.iterator.asScala, right.iterator.asScala, out)
+        }
+      }
+      apply(coGrouper)
+    }
+
+    /**
+     * Completes the co-group operation with the user function that is executed
+     * for windowed groups.
+     */
+    def apply[T: TypeInformation](function: CoGroupFunction[T1, T2, T]): DataStream[T] = {
+
+      val coGroup = JavaCoGroupedStreams.createCoGroup(input1.getJavaStream, input2.getJavaStream)
+
+      coGroup
+        .where(keySelector1)
+        .equalTo(keySelector2)
+        .window(windowAssigner)
+        .trigger(trigger)
+        .evictor(evictor)
+        .apply(clean(function), implicitly[TypeInformation[T]])
+    }
+
+    /**
+     * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+     */
+    private[flink] def clean[F <: AnyRef](f: F): F = {
+      new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+    }
+  }
+
+
+  /**
+   * Creates a new co-group operation from the two given inputs.
+   */
+  def createCoGroup[T1, T2](input1: DataStream[T1], input2: DataStream[T2])
+      : CoGroupedStreams.Unspecified[T1, T2] = {
+    new CoGroupedStreams.Unspecified[T1, T2](input1, input2)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 8aeacb4..7babc40 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -751,18 +751,20 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Initiates a temporal Join transformation that joins the elements of two
-   * data streams on key equality over a specified time window.
-   *
-   * This method returns a StreamJoinOperator on which the
-   * .onWindow(..) should be called to define the
-   * window, and then the .where(..) and .equalTo(..) methods can be used to defin
-   * the join keys.</p> The user can also use the apply method of the returned JoinedStream
-   * to use custom join function.
-   *
+   * Creates a co-group operation. See [[CoGroupedStreams]] for an example of how the keys
+   * and window can be specified.
    */
-  def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] =
-    new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
+  def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams.Unspecified[T, T2] = {
+    CoGroupedStreams.createCoGroup(this, otherStream)
+  }
+
+  /**
+   * Creates a join operation. See [[JoinedStreams]] for an example of how the keys
+   * and window can be specified.
+   */
+  def join[T2](otherStream: DataStream[T2]): JoinedStreams.Unspecified[T, T2] = {
+    JoinedStreams.createJoin(this, otherStream)
+  }
 
   /**
    * Writes a DataStream to the standard output stream (stdout). For each

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
new file mode 100644
index 0000000..be059b8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.datastream.{JoinedStreams => JavaJoinedStreams, CoGroupedStreams => JavaCoGroupedStreams}
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+import scala.reflect.ClassTag
+
+/**
+ * `JoinedStreams` represents two [[DataStream]]s that have been joined.
+ * A streaming join operation is evaluated over elements in a window.
+ *
+ * To finalize the join operation you also need to specify a [[KeySelector]] for
+ * both the first and second input and a [[WindowAssigner]]
+ *
+ * Note: Right now, the groups are being built in memory so you need to ensure that they don't
+ * get too big. Otherwise the JVM might crash.
+ *
+ * Example:
+ *
+ * {{{
+ * val one: DataStream[(String, Int)]  = ...
+ * val two: DataStream[(String, Int)] = ...
+ *
+ * val result = one.join(two)
+ *     .where {t => ... }
+ *     .equal {t => ... }
+ *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ *     .apply(new MyJoinFunction())
+ * } }}}
+ */
+object JoinedStreams {
+
+  /**
+   * A join operation that does not yet have its [[KeySelector]]s defined.
+   *
+   * @tparam T1 Type of the elements from the first input
+   * @tparam T2 Type of the elements from the second input
+   */
+  class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
+
+    /**
+     * Specifies a [[KeySelector]] for elements from the first input.
+     */
+    def where[KEY](keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
+      val cleanFun = clean(keySelector)
+      val javaSelector = new KeySelector[T1, KEY] {
+        def getKey(in: T1) = cleanFun(in)
+      }
+      new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, null)
+    }
+
+    /**
+     * Specifies a [[KeySelector]] for elements from the second input.
+     */
+    def equalTo[KEY](keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
+      val cleanFun = clean(keySelector)
+      val javaSelector = new KeySelector[T2, KEY] {
+        def getKey(in: T2) = cleanFun(in)
+      }
+      new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, null, javaSelector)
+    }
+
+    /**
+     * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+     */
+    private[flink] def clean[F <: AnyRef](f: F): F = {
+      new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+    }
+  }
+
+  /**
+   * A join operation that has [[KeySelector]]s defined for either both or
+   * one input.
+   *
+   * You need to specify a [[KeySelector]] for both inputs using [[where()]] and [[equalTo()]]
+   * before you can proceeed with specifying a [[WindowAssigner]] using [[window()]].
+   *
+   * @tparam T1 Type of the elements from the first input
+   * @tparam T2 Type of the elements from the second input
+   * @tparam KEY Type of the key. This must be the same for both inputs
+   */
+  class WithKey[T1, T2, KEY](
+      input1: DataStream[T1],
+      input2: DataStream[T2],
+      keySelector1: KeySelector[T1, KEY],
+      keySelector2: KeySelector[T2, KEY]) {
+
+    /**
+     * Specifies a [[KeySelector]] for elements from the first input.
+     */
+    def where(keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
+      val cleanFun = clean(keySelector)
+      val javaSelector = new KeySelector[T1, KEY] {
+        def getKey(in: T1) = cleanFun(in)
+      }
+      new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2)
+    }
+
+    /**
+     * Specifies a [[KeySelector]] for elements from the second input.
+     */
+    def equalTo(keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
+      val cleanFun = clean(keySelector)
+      val javaSelector = new KeySelector[T2, KEY] {
+        def getKey(in: T2) = cleanFun(in)
+      }
+      new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector)
+    }
+
+    /**
+     * Specifies the window on which the join operation works.
+     */
+    def window[W <: Window](
+        assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W])
+        : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
+      if (keySelector1 == null || keySelector2 == null) {
+        throw new UnsupportedOperationException("You first need to specify KeySelectors for both" +
+          "inputs using where() and equalTo().")
+      }
+      new JoinedStreams.WithWindow[T1, T2, KEY, W](
+        input1,
+        input2,
+        keySelector1,
+        keySelector2,
+        clean(assigner),
+        null,
+        null)
+    }
+
+    /**
+     * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+     */
+    private[flink] def clean[F <: AnyRef](f: F): F = {
+      new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+    }
+  }
+
+  /**
+   * A join operation that has [[KeySelector]]s defined for both inputs as
+   * well as a [[WindowAssigner]].
+   *
+   * @tparam T1 Type of the elements from the first input
+   * @tparam T2 Type of the elements from the second input
+   * @tparam KEY Type of the key. This must be the same for both inputs
+   * @tparam W Type of { @link Window} on which the join operation works.
+   */
+  class WithWindow[T1, T2, KEY, W <: Window](
+      input1: DataStream[T1],
+      input2: DataStream[T2],
+      keySelector1: KeySelector[T1, KEY],
+      keySelector2: KeySelector[T2, KEY],
+      windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
+      trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
+      evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) {
+
+
+    /**
+     * Sets the [[Trigger]] that should be used to trigger window emission.
+     */
+    def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
+    : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
+      new WithWindow[T1, T2, KEY, W](
+        input1,
+        input2,
+        keySelector1,
+        keySelector2,
+        windowAssigner,
+        newTrigger,
+        evictor)
+    }
+
+    /**
+     * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
+     *
+     * Note: When using an evictor window performance will degrade significantly, since
+     * pre-aggregation of window results cannot be used.
+     */
+    def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
+    : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
+      new WithWindow[T1, T2, KEY, W](
+        input1,
+        input2,
+        keySelector1,
+        keySelector2,
+        windowAssigner,
+        trigger,
+        newEvictor)
+    }
+
+    /**
+     * Completes the join operation with the user function that is executed
+     * for windowed groups.
+     */
+    def apply[O: TypeInformation: ClassTag](fun: (T1, T2) => O): DataStream[O] = {
+      require(fun != null, "Join function must not be null.")
+
+      val joiner = new FlatJoinFunction[T1, T2, O] {
+        val cleanFun = clean(fun)
+        def join(left: T1, right: T2, out: Collector[O]) = {
+          out.collect(cleanFun(left, right))
+        }
+      }
+      apply(joiner)
+    }
+
+    /**
+     * Completes the join operation with the user function that is executed
+     * for windowed groups.
+     */
+
+    def apply[O: TypeInformation: ClassTag](fun: (T1, T2, Collector[O]) => Unit): DataStream[O] = {
+      require(fun != null, "Join function must not be null.")
+
+      val joiner = new FlatJoinFunction[T1, T2, O] {
+        val cleanFun = clean(fun)
+        def join(left: T1, right: T2, out: Collector[O]) = {
+          cleanFun(left, right, out)
+        }
+      }
+      apply(joiner)
+    }
+
+    /**
+     * Completes the join operation with the user function that is executed
+     * for windowed groups.
+     */
+    def apply[T: TypeInformation](function: JoinFunction[T1, T2, T]): DataStream[T] = {
+
+      val join = JavaJoinedStreams.createJoin(input1.getJavaStream, input2.getJavaStream)
+
+      join
+        .where(keySelector1)
+        .equalTo(keySelector2)
+        .window(windowAssigner)
+        .trigger(trigger)
+        .evictor(evictor)
+        .apply(clean(function), implicitly[TypeInformation[T]])
+    }
+
+    /**
+     * Completes the join operation with the user function that is executed
+     * for windowed groups.
+     */
+    def apply[T: TypeInformation](function: FlatJoinFunction[T1, T2, T]): DataStream[T] = {
+
+      val join = JavaJoinedStreams.createJoin(input1.getJavaStream, input2.getJavaStream)
+
+      join
+        .where(keySelector1)
+        .equalTo(keySelector2)
+        .window(windowAssigner)
+        .trigger(trigger)
+        .evictor(evictor)
+        .apply(clean(function), implicitly[TypeInformation[T]])
+    }
+
+    /**
+     * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
+     */
+    private[flink] def clean[F <: AnyRef](f: F): F = {
+      new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+    }
+  }
+
+
+  /**
+   * Creates a new join operation from the two given inputs.
+   */
+  def createJoin[T1, T2](input1: DataStream[T1], input2: DataStream[T2])
+      : JoinedStreams.Unspecified[T1, T2] = {
+    new JoinedStreams.Unspecified[T1, T2](input1, input2)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
deleted file mode 100644
index e0bbaf8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.common.functions.JoinFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.operators.Keys
-import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import org.apache.flink.streaming.util.keys.KeySelectorUtil
-
-import scala.Array.canBuildFrom
-import scala.reflect.ClassTag
-
-class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends 
-TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
-
-  override def createNextWindowOperator() = {
-    new StreamJoinOperator.JoinWindow[I1, I2](this)
-  }
-}
-
-object StreamJoinOperator {
-
-  class JoinWindow[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2]) extends
-  TemporalWindow[JoinWindow[I1, I2]] {
-
-    private[flink] val type1 = op.input1.getType()
-
-    /**
-     * Continues a temporal Join transformation by defining
-     * the fields in the first stream to be used as keys for the join.
-     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
-     * to define the second key.
-     */
-    def where(fields: Int*) = {
-      new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
-        new Keys.ExpressionKeys(fields.toArray, type1),
-        type1,
-        op.input1.getExecutionEnvironment.getConfig))
-    }
-
-    /**
-     * Continues a temporal Join transformation by defining
-     * the fields in the first stream to be used as keys for the join.
-     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
-     * to define the second key.
-     */
-    def where(firstField: String, otherFields: String*) =
-      new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
-        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1),
-        type1,
-        op.input1.getExecutionEnvironment.getConfig))
-
-    /**
-     * Continues a temporal Join transformation by defining
-     * the keyselector function that will be used to extract keys from the first stream
-     * for the join.
-     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
-     * to define the second key.
-     */
-    def where[K: TypeInformation](fun: (I1) => K) = {
-      val keyType = implicitly[TypeInformation[K]]
-      val cleanFun = op.input1.clean(fun)
-      val keyExtractor = new KeySelector[I1, K] {
-        def getKey(in: I1) = cleanFun(in)
-      }
-      new JoinPredicate[I1, I2](op, keyExtractor)
-    }
-
-    override def every(length: Long, timeUnit: TimeUnit): JoinWindow[I1, I2] = {
-      every(timeUnit.toMillis(length))
-    }
-
-    override def every(length: Long): JoinWindow[I1, I2] = {
-      op.slideInterval = length
-      this
-    }
-
-  }
-
-  class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2],
-    private[flink] val keys1: KeySelector[I1, _]) {
-    private[flink] var keys2: KeySelector[I2, _] = null
-    private[flink] val type2 = op.input2.getType()
-
-    /**
-     * Creates a temporal join transformation by defining the second join key.
-     * The returned transformation wrapes each joined element pair in a tuple2:
-     * (first, second)
-     * To define a custom wrapping, use JoinedStream.apply(...)
-     */
-    def equalTo(fields: Int*): JoinedStream[I1, I2] = {
-      finish(KeySelectorUtil.getSelectorForKeys(
-        new Keys.ExpressionKeys(fields.toArray, type2),
-        type2,
-        op.input1.getExecutionEnvironment.getConfig))
-    }
-
-    /**
-     * Creates a temporal join transformation by defining the second join key.
-     * The returned transformation wrapes each joined element pair in a tuple2:
-     * (first, second)
-     * To define a custom wrapping, use JoinedStream.apply(...)
-     */
-    def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] =
-      finish(KeySelectorUtil.getSelectorForKeys(
-        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2),
-        type2,
-        op.input1.getExecutionEnvironment.getConfig))
-
-    /**
-     * Creates a temporal join transformation by defining the second join key.
-     * The returned transformation wrapes each joined element pair in a tuple2:
-     * (first, second)
-     * To define a custom wrapping, use JoinedStream.apply(...)
-     */
-    def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = {
-      val keyType = implicitly[TypeInformation[K]]
-      val cleanFun = op.input1.clean(fun)
-      val keyExtractor = new KeySelector[I2, K] {
-        def getKey(in: I2) = cleanFun(in)
-      }
-      finish(keyExtractor)
-    }
-
-    private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = {
-      this.keys2 = keys2
-      new JoinedStream[I1, I2](this, createJoinOperator())
-    }
-
-    private def createJoinOperator(): JavaStream[(I1, I2)] = {
-
-//      val returnType = createTuple2TypeInformation[I1, I2](op.input1.getType, op.input2.getType)
-//      op.input1.keyBy(keys1).connect(op.input2.keyBy(keys2))
-//        .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
-//          returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
-      null
-    }
-  }
-
-  class JoinedStream[I1, I2](
-      jp: JoinPredicate[I1, I2],
-      javaStream: JavaStream[(I1, I2)]) extends DataStream[(I1, I2)](javaStream) {
-
-    private val op = jp.op
-
-    /**
-     * Sets a wrapper for the joined elements. For each joined pair, the result of the
-     * udf call will be emitted.
-     */
-    def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
-
-      val cleanFun = clean(getJoinWindowFunction(jp, fun))
-
-//      op.input1.keyBy(jp.keys1).connect(op.input2.keyBy(jp.keys2))
-//        .addGeneralWindowCombine[R](
-//          cleanFun,
-//          implicitly[TypeInformation[R]],
-//          op.windowSize,
-//          op.slideInterval,
-//          op.timeStamp1,
-//          op.timeStamp2)
-      null
-    }
-  }
-
-  private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2],
-    joinFunction: (I1, I2) => R) = {
-    require(joinFunction != null, "Join function must not be null.")
-
-    val cleanFun = jp.op.input1.clean(joinFunction)
-
-    val joinFun = new JoinFunction[I1, I2, R] {
-      override def join(first: I1, second: I2): R = {
-        cleanFun(first, second)
-      }
-    }
-
-//    new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
-    null
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
deleted file mode 100644
index 8357c4d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.streaming.api.datastream.temporal.{ TemporalOperator => JTempOp }
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import org.apache.flink.streaming.api.windowing.helper.Timestamp
-
-abstract class TemporalOperator[I1, I2, OP <: TemporalWindow[OP]](
-  i1: JavaStream[I1], i2: JavaStream[I2]) extends JTempOp[I1, I2, OP](i1, i2) {
-
-  def onWindow(length: Long, ts1: I1 => Long, ts2: I2 => Long, startTime: Long = 0): OP = {
-    val timeStamp1 = getTS(ts1)
-    val timeStamp2 = getTS(ts2)
-    onWindow(length, timeStamp1, timeStamp2, startTime)
-  }
-
-  def getTS[R](ts: R => Long): Timestamp[R] = {
-    val cleanFun = clean(ts)
-    new Timestamp[R] {
-      def getTimestamp(in: R) = cleanFun(in)
-    }
-  }
-
-  /**
-   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
-   */
-  private[flink] def clean[F <: AnyRef](f: F): F = {
-    new StreamExecutionEnvironment(i1.getExecutionEnvironment).scalaClean(f)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
new file mode 100644
index 0000000..7232309
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.functions.TimestampExtractor
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Test
+import org.junit.Assert._
+
+import scala.collection.mutable
+
+class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testCoGroup(): Unit = {
+    CoGroupJoinITCase.testResults = mutable.MutableList()
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    env.getConfig.enableTimestamps
+    
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+      }
+
+      def cancel() {
+      }
+    }).extractTimestamp(new CoGroupJoinITCase.Tuple2TimestampExtractor)
+
+    val source2 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("b", 3))
+        ctx.collect(("c", 6))
+        ctx.collect(("c", 7))
+        ctx.collect(("c", 8))
+      }
+
+      def cancel() {
+      }
+    }).extractTimestamp(new CoGroupJoinITCase.Tuple2TimestampExtractor)
+
+    source1.coGroup(source2)
+      .where(_._1)
+      .equalTo(_._1)
+      .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .apply { (first: Iterator[(String, Int)], second: Iterator[(String, Int)]) =>
+          "F:" + first.mkString("") + " S:" + second.mkString("")
+      }
+      .addSink(new SinkFunction[String]() {
+        def invoke(value: String) {
+          CoGroupJoinITCase.testResults += value
+        }
+      })
+
+    env.execute("CoGroup Test")
+
+    val expectedResult = mutable.MutableList(
+      "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)",
+      "F:(b,3)(b,4)(b,5) S:(b,3)",
+      "F:(a,6)(a,7)(a,8) S:",
+      "F: S:(c,6)(c,7)(c,8)")
+
+    assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted)
+  }
+
+  @Test
+  def testJoin(): Unit = {
+    CoGroupJoinITCase.testResults = mutable.MutableList()
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    env.getConfig.enableTimestamps
+
+    val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
+        ctx.collect(("a", "x", 0))
+        ctx.collect(("a", "y", 1))
+        ctx.collect(("a", "z", 2))
+
+        ctx.collect(("b", "u", 3))
+        ctx.collect(("b", "w", 5))
+
+        ctx.collect(("a", "i", 6))
+        ctx.collect(("a", "j", 7))
+        ctx.collect(("a", "k", 8))
+      }
+
+      def cancel() {
+      }
+    }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+
+    val source2 = env.addSource(new SourceFunction[(String, String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
+        ctx.collect(("a", "u", 0))
+        ctx.collect(("a", "w", 1))
+
+        ctx.collect(("b", "i", 3))
+        ctx.collect(("b", "k", 5))
+
+        ctx.collect(("a", "x", 6))
+        ctx.collect(("a", "z", 8))
+      }
+
+      def cancel() {
+      }
+    }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+
+    source1.join(source2)
+      .where(_._1)
+      .equalTo(_._1)
+      .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .apply( (l, r) => l.toString + ":" + r.toString)
+      .addSink(new SinkFunction[String]() {
+        def invoke(value: String) {
+          CoGroupJoinITCase.testResults += value
+        }
+      })
+
+    env.execute("Join Test")
+
+    val expectedResult = mutable.MutableList(
+      "(a,x,0):(a,u,0)",
+      "(a,x,0):(a,w,1)",
+      "(a,y,1):(a,u,0)",
+      "(a,y,1):(a,w,1)",
+      "(a,z,2):(a,u,0)",
+      "(a,z,2):(a,w,1)",
+      "(b,u,3):(b,i,3)",
+      "(b,u,3):(b,k,5)",
+      "(b,w,5):(b,i,3)",
+      "(b,w,5):(b,k,5)",
+      "(a,i,6):(a,x,6)",
+      "(a,i,6):(a,z,8)",
+      "(a,j,7):(a,x,6)",
+      "(a,j,7):(a,z,8)",
+      "(a,k,8):(a,x,6)",
+      "(a,k,8):(a,z,8)")
+
+    assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSelfJoin(): Unit = {
+    CoGroupJoinITCase.testResults = mutable.MutableList()
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    env.getConfig.enableTimestamps
+
+    val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
+        ctx.collect(("a", "x", 0))
+        ctx.collect(("a", "y", 1))
+        ctx.collect(("a", "z", 2))
+
+        ctx.collect(("b", "u", 3))
+        ctx.collect(("b", "w", 5))
+
+        ctx.collect(("a", "i", 6))
+        ctx.collect(("a", "j", 7))
+        ctx.collect(("a", "k", 8))
+      }
+
+      def cancel() {
+      }
+    }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+
+    source1.join(source1)
+      .where(_._1)
+      .equalTo(_._1)
+      .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .apply( (l, r) => l.toString + ":" + r.toString)
+      .addSink(new SinkFunction[String]() {
+      def invoke(value: String) {
+        CoGroupJoinITCase.testResults += value
+      }
+    })
+
+    env.execute("Self-Join Test")
+
+    val expectedResult = mutable.MutableList(
+      "(a,x,0):(a,x,0)",
+      "(a,x,0):(a,y,1)",
+      "(a,x,0):(a,z,2)",
+      "(a,y,1):(a,x,0)",
+      "(a,y,1):(a,y,1)",
+      "(a,y,1):(a,z,2)",
+      "(a,z,2):(a,x,0)",
+      "(a,z,2):(a,y,1)",
+      "(a,z,2):(a,z,2)",
+      "(b,u,3):(b,u,3)",
+      "(b,u,3):(b,w,5)",
+      "(b,w,5):(b,u,3)",
+      "(b,w,5):(b,w,5)",
+      "(a,i,6):(a,i,6)",
+      "(a,i,6):(a,j,7)",
+      "(a,i,6):(a,k,8)",
+      "(a,j,7):(a,i,6)",
+      "(a,j,7):(a,j,7)",
+      "(a,j,7):(a,k,8)",
+      "(a,k,8):(a,i,6)",
+      "(a,k,8):(a,j,7)",
+      "(a,k,8):(a,k,8)")
+
+    assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted)
+  }
+
+}
+
+
+object CoGroupJoinITCase {
+  private var testResults: mutable.MutableList[String] = null
+
+  private class Tuple2TimestampExtractor extends TimestampExtractor[(String, Int)] {
+    def extractTimestamp(element: (String, Int), currentTimestamp: Long): Long = {
+      element._2
+    }
+
+    def emitWatermark(element: (String, Int), currentTimestamp: Long): Long = {
+      element._2 - 1
+    }
+
+    def getCurrentWatermark: Long = {
+      Long.MinValue
+    }
+  }
+
+  private class Tuple3TimestampExtractor extends TimestampExtractor[(String, String, Int)] {
+    def extractTimestamp(element: (String, String, Int), currentTimestamp: Long): Long = {
+      element._3
+    }
+
+    def emitWatermark(element: (String, String, Int), currentTimestamp: Long): Long = {
+      element._3 - 1
+    }
+
+    def getCurrentWatermark: Long = {
+      Long.MinValue
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index d1fd233..c6bd87a 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala
 import java.lang.reflect.Method
 
 import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, JoinedStreams}
 
 import scala.language.existentials
 
@@ -138,14 +138,14 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       classOf[KeyedStream[_, _]])
 
     checkMethods(
-      "StreamJoinOperator", "StreamJoinOperator",
-      classOf[org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator[_,_]],
-      classOf[StreamJoinOperator[_,_]])
+      "JoinedStreams.WithWindow", "JoinedStreams.WithWindow",
+      classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_,_,_,_]],
+      classOf[JoinedStreams.WithWindow[_,_,_,_]])
 
     checkMethods(
-      "TemporalOperator", "TemporalOperator",
-      classOf[org.apache.flink.streaming.api.datastream.temporal.TemporalOperator[_,_,_]],
-      classOf[TemporalOperator[_,_,_]])
+      "CoGroupedStreams.WithWindow", "CoGroupedStreams.WithWindow",
+      classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_,_,_,_]],
+      classOf[CoGroupedStreams.WithWindow[_,_,_,_]])
 
     checkMethods(
       "WindowedDataStream", "WindowedDataStream",