You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:29 UTC

[02/24] flink git commit: [FLINK-2550] [streaming] Rework JoinStreams and CoGroupStreams to properly implement operator builder syntax

[FLINK-2550] [streaming] Rework JoinStreams and CoGroupStreams to properly implement operator builder syntax


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69dfc40d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69dfc40d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69dfc40d

Branch: refs/heads/master
Commit: 69dfc40d4b2c9f994d0f828d5f26ed27faaeade0
Parents: c24dca5
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 9 18:20:02 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:10 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/CoGroupedStreams.java        | 176 ++++++++++---------
 .../streaming/api/datastream/DataStream.java    |  10 +-
 .../streaming/api/datastream/JoinedStreams.java | 158 +++++++++--------
 .../streaming/api/scala/CoGroupedStreams.scala  |   2 +-
 .../streaming/api/scala/JoinedStreams.scala     |   4 +-
 5 files changed, 182 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/69dfc40d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index e1f1a96..d1da783 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -36,8 +35,11 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  *{@code CoGroupedStreams} represents two {@link DataStream DataStreams} that have been co-grouped.
  * A streaming co-group operation is evaluated over elements in a window.
@@ -64,93 +66,87 @@ import java.util.List;
  *     .apply(new MyCoGroupFunction());
  * } </pre>
  */
-public class CoGroupedStreams {
+public class CoGroupedStreams<T1, T2> {
 
-	/**
-	 * A co-group operation that does not yet have its {@link KeySelector KeySelectors} defined.
-	 *
-	 * @param <T1> Type of the elements from the first input
-	 * @param <T2> Type of the elements from the second input
-	 */
-	public static class Unspecified<T1, T2> {
-		DataStream<T1> input1;
-		DataStream<T2> input2;
+	/** The first input stream */
+	private final DataStream<T1> input1;
 
-		protected Unspecified(DataStream<T1> input1,
-				DataStream<T2> input2) {
-			this.input1 = input1;
-			this.input2 = input2;
-		}
+	/** The second input stream */
+	private final DataStream<T2> input2;
 
-		/**
-		 * Specifies a {@link KeySelector} for elements from the first input.
-		 */
-		public <KEY> WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector)  {
-			return new WithKey<>(input1, input2, input1.clean(keySelector), null);
-		}
-
-		/**
-		 * Specifies a {@link KeySelector} for elements from the second input.
-		 */
-		public <KEY> WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector)  {
-			return new WithKey<>(input1, input2, null, input1.clean(keySelector));
-		}
+	/**
+	 * Creates new CoGroped data streams, which are the first step towards building a streaming co-group.
+	 * 
+	 * @param input1 The first data stream.
+	 * @param input2 The second data stream.
+	 */
+	public CoGroupedStreams(DataStream<T1> input1, DataStream<T2> input2) {
+		this.input1 = requireNonNull(input1);
+		this.input2 = requireNonNull(input2);
 	}
 
 	/**
-	 * A co-group operation that has {@link KeySelector KeySelectors} defined for either both or
-	 * one input.
-	 *
-	 * <p>
-	 * You need to specify a {@code KeySelector} for both inputs using {@link #where(KeySelector)}
-	 * and {@link #equalTo(KeySelector)} before you can proceeed with specifying a
-	 * {@link WindowAssigner} using {@link #window(WindowAssigner)}.
-	 *
-	 * @param <T1> Type of the elements from the first input
-	 * @param <T2> Type of the elements from the second input
-	 * @param <KEY> Type of the key. This must be the same for both inputs
+	 * Specifies a {@link KeySelector} for elements from the first input.
 	 */
-	public static class WithKey<T1, T2, KEY> {
-		DataStream<T1> input1;
-		DataStream<T2> input2;
+	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
+		TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+		return new Where<>(input1.clean(keySelector), keyType);
+	}
 
-		KeySelector<T1, KEY> keySelector1;
-		KeySelector<T2, KEY> keySelector2;
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * CoGrouped streams that have the key for one side defined.
+	 * 
+	 * @param <KEY> The type of the key.
+	 */
+	public class Where<KEY> {
 
-		protected WithKey(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) {
-			this.input1 = input1;
-			this.input2 = input2;
+		private final KeySelector<T1, KEY> keySelector1;
+		private final TypeInformation<KEY> keyType;
 
+		Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
 			this.keySelector1 = keySelector1;
-			this.keySelector2 = keySelector2;
-		}
-
-		/**
-		 * Specifies a {@link KeySelector} for elements from the first input.
-		 */
-		public WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector)  {
-			return new CoGroupedStreams.WithKey<>(input1, input2, input1.clean(keySelector), keySelector2);
+			this.keyType = keyType;
 		}
-
+	
 		/**
 		 * Specifies a {@link KeySelector} for elements from the second input.
 		 */
-		public CoGroupedStreams.WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector)  {
-			return new CoGroupedStreams.WithKey<>(input1, input2, keySelector1, input1.clean(keySelector));
+		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
+			TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+			if (!otherKey.equals(this.keyType)) {
+				throw new IllegalArgumentException("The keys for the two inputs are not equal: " + 
+						"first key = " + this.keyType + " , second key = " + otherKey);
+			}
+			
+			return new EqualTo(input2.clean(keySelector));
 		}
 
+		// --------------------------------------------------------------------
+		
 		/**
-		 * Specifies the window on which the co-group operation works.
+		 * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
 		 */
-		public <W extends Window> CoGroupedStreams.WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
-			if (keySelector1 == null || keySelector2 == null) {
-				throw new UnsupportedOperationException("You first need to specify KeySelectors for both inputs using where() and equalTo().");
+		public class EqualTo {
+
+			private final KeySelector<T2, KEY> keySelector2;
+
+			EqualTo(KeySelector<T2, KEY> keySelector2) {
+				this.keySelector2 = requireNonNull(keySelector2);
+			}
 
+			/**
+			 * Specifies the window on which the co-group operation works.
+			 */
+			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
+				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
 			}
-			return new WithWindow<>(input1, input2, keySelector1, keySelector2, assigner, null, null);
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	
 	/**
 	 * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
 	 * well as a {@link WindowAssigner}.
@@ -166,6 +162,8 @@ public class CoGroupedStreams {
 
 		private final KeySelector<T1, KEY> keySelector1;
 		private final KeySelector<T2, KEY> keySelector2;
+		
+		private final TypeInformation<KEY> keyType;
 
 		private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
 
@@ -177,6 +175,7 @@ public class CoGroupedStreams {
 				DataStream<T2> input2,
 				KeySelector<T1, KEY> keySelector1,
 				KeySelector<T2, KEY> keySelector2,
+				TypeInformation<KEY> keyType,
 				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
 				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
 				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
@@ -185,7 +184,8 @@ public class CoGroupedStreams {
 
 			this.keySelector1 = keySelector1;
 			this.keySelector2 = keySelector2;
-
+			this.keyType = keyType;
+			
 			this.windowAssigner = windowAssigner;
 			this.trigger = trigger;
 			this.evictor = evictor;
@@ -195,7 +195,8 @@ public class CoGroupedStreams {
 		 * Sets the {@code Trigger} that should be used to trigger window emission.
 		 */
 		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
-			return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, newTrigger, evictor);
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+					windowAssigner, newTrigger, evictor);
 		}
 
 		/**
@@ -206,7 +207,8 @@ public class CoGroupedStreams {
 		 * pre-aggregation of window results cannot be used.
 		 */
 		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
-			return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, trigger, newEvictor);
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+					windowAssigner, trigger, newEvictor);
 		}
 
 		/**
@@ -236,16 +238,21 @@ public class CoGroupedStreams {
 			//clean the closure
 			function = input1.getExecutionEnvironment().clean(function);
 
+			UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
+			UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);
+			
 			DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
 					.map(new Input1Tagger<T1, T2>())
-					.returns(new UnionTypeInfo<>(input1.getType(), input2.getType()));
+					.returns(unionType);
 			DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
 					.map(new Input2Tagger<T1, T2>())
-					.returns(new UnionTypeInfo<>(input1.getType(), input2.getType()));
+					.returns(unionType);
 
-			WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp = taggedInput1
-					.union(taggedInput2)
-					.keyBy(new UnionKeySelector<>(keySelector1, keySelector2))
+			DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
+			
+			// we explicitly create the keyed stream to manually pass the key type information in
+			WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp = 
+					new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
 					.window(windowAssigner);
 
 			if (trigger != null) {
@@ -259,13 +266,10 @@ public class CoGroupedStreams {
 		}
 	}
 
-	/**
-	 * Creates a new co-group operation from the two given inputs.
-	 */
-	public static <T1, T2> Unspecified<T1, T2> createCoGroup(DataStream<T1> input1, DataStream<T2> input2) {
-		return new Unspecified<>(input1, input2);
-	}
-
+	// ------------------------------------------------------------------------
+	//  Data type and type information for Tagged Union
+	// ------------------------------------------------------------------------
+	
 	/**
 	 * Internal class for implementing tagged union co-group.
 	 */
@@ -425,7 +429,7 @@ public class CoGroupedStreams {
 
 		@Override
 		public int getLength() {
-			return 0;
+			return -1;
 		}
 
 		@Override
@@ -494,6 +498,11 @@ public class CoGroupedStreams {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Utility functions that implement the CoGroup logic based on the tagged
+	//  untion window reduce
+	// ------------------------------------------------------------------------
+	
 	private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> {
 		private static final long serialVersionUID = 1L;
 
@@ -537,6 +546,7 @@ public class CoGroupedStreams {
 	private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
 			extends WrappingFunction<CoGroupFunction<T1, T2, T>>
 			implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
+		
 		private static final long serialVersionUID = 1L;
 
 		public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
@@ -548,8 +558,10 @@ public class CoGroupedStreams {
 				W window,
 				Iterable<TaggedUnion<T1, T2>> values,
 				Collector<T> out) throws Exception {
-			List<T1> oneValues = Lists.newArrayList();
-			List<T2> twoValues = Lists.newArrayList();
+			
+			List<T1> oneValues = new ArrayList<>();
+			List<T2> twoValues = new ArrayList<>();
+			
 			for (TaggedUnion<T1, T2> val: values) {
 				if (val.isOne()) {
 					oneValues.add(val.getOne());

http://git-wip-us.apache.org/repos/asf/flink/blob/69dfc40d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 7e686c7..c15ea9b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -239,7 +239,7 @@ public class DataStream<T> {
 	 *            The KeySelector to be used for extracting the key for partitioning
 	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
 	 */
-	public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key){
+	public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
 		return new KeyedStream<T, K>(this, clean(key));
 	}
 
@@ -622,16 +622,16 @@ public class DataStream<T> {
 	 * Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys
 	 * and window can be specified.
 	 */
-	public <T2> CoGroupedStreams.Unspecified<T, T2> coGroup(DataStream<T2> otherStream) {
-		return CoGroupedStreams.createCoGroup(this, otherStream);
+	public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
+		return new CoGroupedStreams<>(this, otherStream);
 	}
 
 	/**
 	 * Creates a join operation. See {@link JoinedStreams} for an example of how the keys
 	 * and window can be specified.
 	 */
-	public <T2> JoinedStreams.Unspecified<T, T2> join(DataStream<T2> otherStream) {
-		return JoinedStreams.createJoin(this, otherStream);
+	public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
+		return new JoinedStreams<>(this, otherStream);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/69dfc40d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index ee848e3..cff9355 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -24,12 +24,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  *{@code JoinedStreams} represents two {@link DataStream DataStreams} that have been joined.
  * A streaming join operation is evaluated over elements in a window.
@@ -56,92 +59,86 @@ import org.apache.flink.util.Collector;
  *     .apply(new MyJoinFunction());
  * } </pre>
  */
-public class JoinedStreams extends CoGroupedStreams{
+public class JoinedStreams<T1, T2> {
+
+	/** The first input stream */
+	private final DataStream<T1> input1;
+
+	/** The second input stream */
+	private final DataStream<T2> input2;
 
 	/**
-	 * A join operation that does not yet have its {@link KeySelector KeySelectors} defined.
+	 * Creates new JoinedStreams data streams, which are the first step towards building a streaming co-group.
 	 *
-	 * @param <T1> Type of the elements from the first input
-	 * @param <T2> Type of the elements from the second input
+	 * @param input1 The first data stream.
+	 * @param input2 The second data stream.
 	 */
-	public static class Unspecified<T1, T2> {
-		DataStream<T1> input1;
-		DataStream<T2> input2;
-
-		protected Unspecified(DataStream<T1> input1,
-				DataStream<T2> input2) {
-			this.input1 = input1;
-			this.input2 = input2;
-		}
-
-		/**
-		 * Specifies a {@link KeySelector} for elements from the first input.
-		 */
-		public <KEY> WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector)  {
-			return new WithKey<>(input1, input2, keySelector, null);
-		}
+	public JoinedStreams(DataStream<T1> input1, DataStream<T2> input2) {
+		this.input1 = requireNonNull(input1);
+		this.input2 = requireNonNull(input2);
+	}
 
-		/**
-		 * Specifies a {@link KeySelector} for elements from the second input.
-		 */
-		public <KEY> WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector)  {
-			return new WithKey<>(input1, input2, null, keySelector);
-		}
+	/**
+	 * Specifies a {@link KeySelector} for elements from the first input.
+	 */
+	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
+		TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+		return new Where<>(input1.clean(keySelector), keyType);
 	}
 
+	// ------------------------------------------------------------------------
+
 	/**
-	 * A join operation that has {@link KeySelector KeySelectors} defined for either both or
-	 * one input.
-	 *
-	 * <p>
-	 * You need to specify a {@code KeySelector} for both inputs using {@link #where(KeySelector)}
-	 * and {@link #equalTo(KeySelector)} before you can proceeed with specifying a
-	 * {@link WindowAssigner} using {@link #window(WindowAssigner)}.
+	 * CoGrouped streams that have the key for one side defined.
 	 *
-	 * @param <T1> Type of the elements from the first input
-	 * @param <T2> Type of the elements from the second input
-	 * @param <KEY> Type of the key. This must be the same for both inputs
+	 * @param <KEY> The type of the key.
 	 */
-	public static class WithKey<T1, T2, KEY> {
-		DataStream<T1> input1;
-		DataStream<T2> input2;
-
-		KeySelector<T1, KEY> keySelector1;
-		KeySelector<T2, KEY> keySelector2;
+	public class Where<KEY> {
 
-		protected WithKey(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) {
-			this.input1 = input1;
-			this.input2 = input2;
+		private final KeySelector<T1, KEY> keySelector1;
+		private final TypeInformation<KEY> keyType;
 
+		Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
 			this.keySelector1 = keySelector1;
-			this.keySelector2 = keySelector2;
-		}
-
-		/**
-		 * Specifies a {@link KeySelector} for elements from the first input.
-		 */
-		public WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector)  {
-			return new JoinedStreams.WithKey<>(input1, input2, keySelector, keySelector2);
+			this.keyType = keyType;
 		}
 
 		/**
 		 * Specifies a {@link KeySelector} for elements from the second input.
 		 */
-		public JoinedStreams.WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector)  {
-			return new JoinedStreams.WithKey<>(input1, input2, keySelector1, keySelector);
+		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
+			TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+			if (!otherKey.equals(this.keyType)) {
+				throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
+						"first key = " + this.keyType + " , second key = " + otherKey);
+			}
+
+			return new EqualTo(input2.clean(keySelector));
 		}
 
+		// --------------------------------------------------------------------
+
 		/**
-		 * Specifies the window on which the join operation works.
+		 * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
 		 */
-		public <W extends Window> JoinedStreams.WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
-			if (keySelector1 == null || keySelector2 == null) {
-				throw new UnsupportedOperationException("You first need to specify KeySelectors for both inputs using where() and equalTo().");
+		public class EqualTo {
 
+			private final KeySelector<T2, KEY> keySelector2;
+
+			EqualTo(KeySelector<T2, KEY> keySelector2) {
+				this.keySelector2 = requireNonNull(keySelector2);
+			}
+
+			/**
+			 * Specifies the window on which the co-group operation works.
+			 */
+			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
+				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
 			}
-			return new WithWindow<>(input1, input2, keySelector1, keySelector2, assigner, null, null);
 		}
 	}
+	
+	// ------------------------------------------------------------------------
 
 	/**
 	 * A join operation that has {@link KeySelector KeySelectors} defined for both inputs as
@@ -153,11 +150,13 @@ public class JoinedStreams extends CoGroupedStreams{
 	 * @param <W> Type of {@link Window} on which the join operation works.
 	 */
 	public static class WithWindow<T1, T2, KEY, W extends Window> {
+		
 		private final DataStream<T1> input1;
 		private final DataStream<T2> input2;
 
 		private final KeySelector<T1, KEY> keySelector1;
 		private final KeySelector<T2, KEY> keySelector2;
+		private final TypeInformation<KEY> keyType;
 
 		private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
 
@@ -169,16 +168,20 @@ public class JoinedStreams extends CoGroupedStreams{
 				DataStream<T2> input2,
 				KeySelector<T1, KEY> keySelector1,
 				KeySelector<T2, KEY> keySelector2,
+				TypeInformation<KEY> keyType,
 				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
 				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
 				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
-			this.input1 = input1;
-			this.input2 = input2;
-
-			this.keySelector1 = keySelector1;
-			this.keySelector2 = keySelector2;
-
-			this.windowAssigner = windowAssigner;
+			
+			this.input1 = requireNonNull(input1);
+			this.input2 = requireNonNull(input2);
+
+			this.keySelector1 = requireNonNull(keySelector1);
+			this.keySelector2 = requireNonNull(keySelector2);
+			this.keyType = requireNonNull(keyType);
+			
+			this.windowAssigner = requireNonNull(windowAssigner);
+			
 			this.trigger = trigger;
 			this.evictor = evictor;
 		}
@@ -187,7 +190,8 @@ public class JoinedStreams extends CoGroupedStreams{
 		 * Sets the {@code Trigger} that should be used to trigger window emission.
 		 */
 		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
-			return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, newTrigger, evictor);
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+					windowAssigner, newTrigger, evictor);
 		}
 
 		/**
@@ -198,7 +202,8 @@ public class JoinedStreams extends CoGroupedStreams{
 		 * pre-aggregation of window results cannot be used.
 		 */
 		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
-			return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, trigger, newEvictor);
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+					windowAssigner, trigger, newEvictor);
 		}
 
 		/**
@@ -213,7 +218,7 @@ public class JoinedStreams extends CoGroupedStreams{
 					true,
 					input1.getType(),
 					input2.getType(),
-					"CoGroup",
+					"Join",
 					false);
 
 			return apply(function, resultType);
@@ -249,7 +254,7 @@ public class JoinedStreams extends CoGroupedStreams{
 					true,
 					input1.getType(),
 					input2.getType(),
-					"CoGroup",
+					"Join",
 					false);
 
 			return apply(function, resultType);
@@ -273,13 +278,10 @@ public class JoinedStreams extends CoGroupedStreams{
 
 		}
 	}
-
-	/**
-	 * Creates a new join operation from the two given inputs.
-	 */
-	public static <T1, T2> Unspecified<T1, T2> createJoin(DataStream<T1> input1, DataStream<T2> input2) {
-		return new Unspecified<>(input1, input2);
-	}
+	
+	// ------------------------------------------------------------------------
+	//  Implementation of the functions
+	// ------------------------------------------------------------------------
 
 	/**
 	 * CoGroup function that does a nested-loop join to get the join result.

http://git-wip-us.apache.org/repos/asf/flink/blob/69dfc40d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index 0164b92..e676f81 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -270,7 +270,7 @@ object CoGroupedStreams {
      */
     def apply[T: TypeInformation](function: CoGroupFunction[T1, T2, T]): DataStream[T] = {
 
-      val coGroup = JavaCoGroupedStreams.createCoGroup(input1.getJavaStream, input2.getJavaStream)
+      val coGroup = new JavaCoGroupedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream)
 
       coGroup
         .where(keySelector1)

http://git-wip-us.apache.org/repos/asf/flink/blob/69dfc40d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index 2fda32d..c259724 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -263,7 +263,7 @@ object JoinedStreams {
      */
     def apply[T: TypeInformation](function: JoinFunction[T1, T2, T]): DataStream[T] = {
 
-      val join = JavaJoinedStreams.createJoin(input1.getJavaStream, input2.getJavaStream)
+      val join = new JavaJoinedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream)
 
       join
         .where(keySelector1)
@@ -280,7 +280,7 @@ object JoinedStreams {
      */
     def apply[T: TypeInformation](function: FlatJoinFunction[T1, T2, T]): DataStream[T] = {
 
-      val join = JavaJoinedStreams.createJoin(input1.getJavaStream, input2.getJavaStream)
+      val join = new JavaJoinedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream)
 
       join
         .where(keySelector1)