You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/09 16:20:53 UTC

[4/6] flink git commit: [FLINK-2576] [javaAPI] [scalaAPI] [optimizer] Add outerJoin to DataSet API (Java, Scala) and optimizer.

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index e76e3c9..7f15f9e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.operators;
 
-import java.security.InvalidParameterException;
 import java.util.Arrays;
 
 import com.google.common.base.Preconditions;
@@ -34,22 +33,25 @@ import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.java.operators.join.JoinType;
+import org.apache.flink.api.java.operators.join.JoinFunctionAssigner;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import org.apache.flink.api.java.operators.translation.PlanBothUnwrappingJoinOperator;
-import org.apache.flink.api.java.operators.translation.PlanLeftUnwrappingJoinOperator;
-import org.apache.flink.api.java.operators.translation.PlanRightUnwrappingJoinOperator;
+import org.apache.flink.api.java.operators.translation.TupleRightUnwrappingJoiner;
+import org.apache.flink.api.java.operators.translation.TupleLeftUnwrappingJoiner;
+import org.apache.flink.api.java.operators.translation.TupleUnwrappingJoiner;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -69,18 +71,19 @@ import org.apache.flink.api.java.tuple.*;
  * @see DataSet
  */
 public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, JoinOperator<I1, I2, OUT>> {
-	
+
 	protected final Keys<I1> keys1;
 	protected final Keys<I2> keys2;
 	
 	private final JoinHint joinHint;
-	
+	protected final JoinType joinType;
+
 	private Partitioner<?> customPartitioner;
 	
 	
-	protected JoinOperator(DataSet<I1> input1, DataSet<I2> input2, 
+	protected JoinOperator(DataSet<I1> input1, DataSet<I2> input2,
 			Keys<I1> keys1, Keys<I2> keys2,
-			TypeInformation<OUT> returnType, JoinHint hint)
+			TypeInformation<OUT> returnType, JoinHint hint, JoinType type)
 	{
 		super(input1, input2, returnType);
 		
@@ -116,7 +119,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 		this.keys1 = keys1;
 		this.keys2 = keys2;
-		this.joinHint = hint == null ? JoinHint.OPTIMIZER_CHOOSES : hint;
+		this.joinHint = hint == null ? InnerJoinOperatorBase.JoinHint.OPTIMIZER_CHOOSES : hint;
+		this.joinType = type;
 	}
 	
 	protected Keys<I1> getKeys1() {
@@ -135,6 +139,15 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 	public JoinHint getJoinHint() {
 		return this.joinHint;
 	}
+
+	/**
+	 * Gets the JoinType that describes this join operation (e.g. inner, outer)
+	 *
+	 * @return The JoinType
+	 */
+	public JoinType getJoinType() {
+		return this.joinType;
+	}
 	
 	/**
 	 * Sets a custom partitioner for this join. The partitioner will be called on the join keys to determine
@@ -189,12 +202,23 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		private boolean preserve2;
 		
 		private final String joinLocationName;
-		
+
 		public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
 				Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> function,
-				TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName)
-		{
-			super(input1, input2, keys1, keys2, returnType, hint);
+				TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName) {
+			this(input1, input2, keys1, keys2, function, returnType, hint, joinLocationName, JoinType.INNER);
+		}
+
+		public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
+				Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> function,
+				TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName) {
+			this(input1, input2, keys1, keys2, generatedFunction, function, returnType, hint, joinLocationName, JoinType.INNER);
+		}
+
+		public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
+				Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> function,
+				TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName, JoinType type) {
+			super(input1, input2, keys1, keys2, returnType, hint, type);
 			
 			if (function == null) {
 				throw new NullPointerException();
@@ -208,9 +232,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 		public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
 				Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> function,
-				TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName)
-		{
-			super(input1, input2, keys1, keys2, returnType, hint);
+				TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName, JoinType type) {
+			super(input1, input2, keys1, keys2, returnType, hint, type);
 			
 			this.joinLocationName = joinLocationName;
 
@@ -282,232 +305,220 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 		@Override
 		protected JoinOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
+			String name = getName() != null ? getName() : "Join at " + joinLocationName;
+
+			JoinOperatorBaseBuilder<OUT> builder = new JoinOperatorBaseBuilder<OUT>(name, joinType)
+					.withParallelism(getParallelism())
+					.withPartitioner(getPartitioner())
+					.withJoinHint(getJoinHint())
+					.withResultType(getResultType());
+
+			final boolean requiresTupleUnwrapping = keys1 instanceof Keys.SelectorFunctionKeys || keys2 instanceof Keys.SelectorFunctionKeys;
+			if (requiresTupleUnwrapping) {
+				if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) {
+					// Both join sides have a key selector function, so we need to do the
+					// tuple wrapping/unwrapping on both sides.
+
+					@SuppressWarnings("unchecked")
+					Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
+					@SuppressWarnings("unchecked")
+					Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
+
+					builder = builder
+							.withUdf(new TupleUnwrappingJoiner<>(function))
+							.withWrappedInput1(input1, selectorKeys1, getInput1Type())
+							.withWrappedInput2(input2, selectorKeys2, getInput2Type());
+				} else if (keys2 instanceof Keys.SelectorFunctionKeys) {
+					// The right side of the join needs the tuple wrapping/unwrapping
+
+					@SuppressWarnings("unchecked")
+					Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
+
+					builder = builder
+							.withUdf(new TupleRightUnwrappingJoiner<>(function))
+							.withInput1(input1, getInput1Type(), keys1)
+							.withWrappedInput2(input2, selectorKeys2, getInput2Type());
+				} else {
+					// The left side of the join needs the tuple wrapping/unwrapping
 
-			String name = getName() != null ? getName() : "Join at "+joinLocationName;
+					@SuppressWarnings("unchecked")
+					Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
 
-			final JoinOperatorBase<?, ?, OUT, ?> translated;
-			
-			if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) {
-				// Both join sides have a key selector function, so we need to do the
-				// tuple wrapping/unwrapping on both sides.
+					builder = builder
+							.withUdf(new TupleLeftUnwrappingJoiner<>(function))
+							.withWrappedInput1(input1, selectorKeys1, getInput1Type())
+							.withInput2(input2, getInput2Type(), keys2);
+				}
+			} else if (keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys) {
+				// Neither side needs the tuple wrapping/unwrapping
 
-				@SuppressWarnings("unchecked")
-				Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
-				@SuppressWarnings("unchecked")
-				Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
-				
-				PlanBothUnwrappingJoinOperator<I1, I2, OUT, ?> po =
-						translateSelectorFunctionJoin(selectorKeys1, selectorKeys2, function, 
-						getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
-				
-				// set parallelism
-				po.setParallelism(this.getParallelism());
-				
-				translated = po;
+				builder = builder
+						.withUdf(function)
+						.withInput1(input1, getInput1Type(), keys1)
+						.withInput2(input2, getInput2Type(), keys2);
+			} else {
+				throw new UnsupportedOperationException("Unrecognized or incompatible key types.");
 			}
-			else if (keys2 instanceof Keys.SelectorFunctionKeys) {
-				// The right side of the join needs the tuple wrapping/unwrapping
 
-				int[] logicalKeyPositions1 = keys1.computeLogicalKeyPositions();
+			return builder.build();
+		}
+
 
-				@SuppressWarnings("unchecked")
-				Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 =
-						(Keys.SelectorFunctionKeys<I2, ?>) keys2;
+		private static final class JoinOperatorBaseBuilder<OUT> {
+
+			private final String name;
+			private final JoinType joinType;
 
-				PlanRightUnwrappingJoinOperator<I1, I2, OUT, ?> po =
-						translateSelectorFunctionJoinRight(logicalKeyPositions1, selectorKeys2,
-								function, getInput1Type(), getInput2Type(), getResultType(), name,
-								input1, input2);
+			private int parallelism;
+			private FlatJoinFunction<?, ?, OUT> udf;
+			private TypeInformation<OUT> resultType;
 
-				// set parallelism
-				po.setParallelism(this.getParallelism());
+			private Operator input1;
+			private TypeInformation<?> input1Type;
+			private Keys<?> keys1;
 
-				translated = po;
+			private Operator input2;
+			private TypeInformation<?> input2Type;
+			private Keys<?> keys2;
+
+			private Partitioner<?> partitioner;
+			private JoinHint joinHint;
+
+			public JoinOperatorBaseBuilder(String name, JoinType joinType) {
+				this.name = name;
+				this.joinType = joinType;
 			}
-			else if (keys1 instanceof Keys.SelectorFunctionKeys) {
-				// The left side of the join needs the tuple wrapping/unwrapping
 
+			public <I1, K> JoinOperatorBaseBuilder<OUT> withWrappedInput1(
+					Operator<I1> input1,
+					Keys.SelectorFunctionKeys<I1, ?> rawKeys1,
+					TypeInformation<I1> inputType1) {
+				TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = new TupleTypeInfo<>(rawKeys1.getKeyType(), inputType1);
 
-				@SuppressWarnings("unchecked")
-				Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 =
-						(Keys.SelectorFunctionKeys<I1, ?>) keys1;
+				MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
+						createKeyMapper(rawKeys1, inputType1, input1, "Key Extractor 1");
 
-				int[] logicalKeyPositions2 = keys2.computeLogicalKeyPositions();
+				return this.withInput1(keyMapper1, typeInfoWithKey1, rawKeys1);
+			}
 
-				PlanLeftUnwrappingJoinOperator<I1, I2, OUT, ?> po =
-						translateSelectorFunctionJoinLeft(selectorKeys1, logicalKeyPositions2, function,
-								getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
+			public <I2, K> JoinOperatorBaseBuilder<OUT> withWrappedInput2(
+					Operator<I2> input2,
+					Keys.SelectorFunctionKeys<I2, ?> rawKeys2,
+					TypeInformation<I2> inputType2) {
+				TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = new TupleTypeInfo<>(rawKeys2.getKeyType(), inputType2);
 
-				// set parallelism
-				po.setParallelism(this.getParallelism());
+				MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
+						createKeyMapper(rawKeys2, inputType2, input2, "Key Extractor 2");
 
-				translated = po;
+				return withInput2(keyMapper2, typeInfoWithKey2, rawKeys2);
 			}
-			else if (super.keys1 instanceof Keys.ExpressionKeys && super.keys2 instanceof Keys.ExpressionKeys)
-			{
-				// Neither side needs the tuple wrapping/unwrapping
 
-				int[] logicalKeyPositions1 = super.keys1.computeLogicalKeyPositions();
-				int[] logicalKeyPositions2 = super.keys2.computeLogicalKeyPositions();
-				
-				JoinOperatorBase<I1, I2, OUT, FlatJoinFunction<I1, I2, OUT>> po =
-						new JoinOperatorBase<I1, I2, OUT, FlatJoinFunction<I1, I2, OUT>>(function,
-								new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()),
-								logicalKeyPositions1, logicalKeyPositions2,
-								name);
-				
-				// set inputs
-				po.setFirstInput(input1);
-				po.setSecondInput(input2);
-				// set parallelism
-				po.setParallelism(this.getParallelism());
-				
-				translated = po;
+			public <I1> JoinOperatorBaseBuilder<OUT> withInput1(
+					Operator<I1> input1,
+					TypeInformation<I1> input1Type,
+					Keys<?> keys1) {
+				this.input1 = input1;
+				this.input1Type = input1Type;
+				this.keys1 = keys1;
+				return this;
 			}
-			else {
-				throw new UnsupportedOperationException("Unrecognized or incompatible key types.");
+
+			public <I2> JoinOperatorBaseBuilder<OUT> withInput2(
+					Operator<I2> input2,
+					TypeInformation<I2> input2Type,
+					Keys<?> keys2) {
+				this.input2 = input2;
+				this.input2Type = input2Type;
+				this.keys2 = keys2;
+				return this;
 			}
-			
-			translated.setJoinHint(getJoinHint());
-			translated.setCustomPartitioner(getPartitioner());
-			
-			return translated;
-		}
-		
-		private static <I1, I2, K, OUT> PlanBothUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoin(
-				Keys.SelectorFunctionKeys<I1, ?> rawKeys1, Keys.SelectorFunctionKeys<I2, ?> rawKeys2, 
-				FlatJoinFunction<I1, I2, OUT> function,
-				TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
-				Operator<I1> input1, Operator<I2> input2)
-		{
-			@SuppressWarnings("unchecked")
-			final Keys.SelectorFunctionKeys<I1, K> keys1 = (Keys.SelectorFunctionKeys<I1, K>) rawKeys1;
-			@SuppressWarnings("unchecked")
-			final Keys.SelectorFunctionKeys<I2, K> keys2 = (Keys.SelectorFunctionKeys<I2, K>) rawKeys2;
-			
-			final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = new TupleTypeInfo<Tuple2<K, I1>>(keys1.getKeyType(), inputType1);
-			final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = new TupleTypeInfo<Tuple2<K, I2>>(keys2.getKeyType(), inputType2);
-			
-			final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
-			final KeyExtractingMapper<I2, K> extractor2 = new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
-
-			final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
-					new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
-			final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
-					new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
-			final PlanBothUnwrappingJoinOperator<I1, I2, OUT, K> join = new PlanBothUnwrappingJoinOperator<I1, I2, OUT, K>(function, keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
-			
-			join.setFirstInput(keyMapper1);
-			join.setSecondInput(keyMapper2);
-			
-			keyMapper1.setInput(input1);
-			keyMapper2.setInput(input2);
-			// set parallelism
-			keyMapper1.setParallelism(input1.getParallelism());
-			keyMapper2.setParallelism(input2.getParallelism());
-			
-			return join;
-		}
-		
-		private static <I1, I2, K, OUT> PlanRightUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoinRight(
-				int[] logicalKeyPositions1,
-				Keys.SelectorFunctionKeys<I2, ?> rawKeys2,
-				FlatJoinFunction<I1, I2, OUT> function,
-				TypeInformation<I1> inputType1,
-				TypeInformation<I2> inputType2,
-				TypeInformation<OUT> outputType,
-				String name,
-				Operator<I1> input1,
-				Operator<I2> input2) {
-
-			if(!inputType1.isTupleType()) {
-				throw new InvalidParameterException("Should not happen.");
+
+			public JoinOperatorBaseBuilder<OUT> withParallelism(int parallelism) {
+				this.parallelism = parallelism;
+				return this;
 			}
-			
-			@SuppressWarnings("unchecked")
-			final Keys.SelectorFunctionKeys<I2, K> keys2 =
-					(Keys.SelectorFunctionKeys<I2, K>) rawKeys2;
-			
-			final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 =
-					new TupleTypeInfo<Tuple2<K, I2>>(keys2.getKeyType(), inputType2);
-			
-			final KeyExtractingMapper<I2, K> extractor2 =
-					new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
-
-			final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
-					new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(
-							extractor2,
-							new UnaryOperatorInformation<I2,Tuple2<K, I2>>(inputType2, typeInfoWithKey2),
-							"Key Extractor 2");
-			
-			final PlanRightUnwrappingJoinOperator<I1, I2, OUT, K> join =
-					new PlanRightUnwrappingJoinOperator<I1, I2, OUT, K>(
-							function,
-							logicalKeyPositions1,
-							keys2,
-							name,
-							outputType,
-							inputType1,
-							typeInfoWithKey2);
-			
-			join.setFirstInput(input1);
-			join.setSecondInput(keyMapper2);
-			
-			keyMapper2.setInput(input2);
-			// set parallelism
-			keyMapper2.setParallelism(input2.getParallelism());
-			
-			return join;
-		}
-		
-		private static <I1, I2, K, OUT> PlanLeftUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoinLeft(
-				Keys.SelectorFunctionKeys<I1, ?> rawKeys1,
-				int[] logicalKeyPositions2,
-				FlatJoinFunction<I1, I2, OUT> function,
-				TypeInformation<I1> inputType1,
-				TypeInformation<I2> inputType2,
-				TypeInformation<OUT> outputType,
-				String name,
-				Operator<I1> input1,
-				Operator<I2> input2) {
-
-			if(!inputType2.isTupleType()) {
-				throw new InvalidParameterException("Should not happen.");
+
+			public JoinOperatorBaseBuilder<OUT> withPartitioner(Partitioner<?> partitioner) {
+				this.partitioner = partitioner;
+				return this;
 			}
-			
+
+			public JoinOperatorBaseBuilder<OUT> withJoinHint(JoinHint joinHint) {
+				this.joinHint = joinHint;
+				return this;
+			}
+
+			public JoinOperatorBaseBuilder<OUT> withUdf(FlatJoinFunction<?, ?, OUT> udf) {
+				this.udf = udf;
+				return this;
+			}
+
+			public JoinOperatorBaseBuilder<OUT> withResultType(TypeInformation<OUT> resultType) {
+				this.resultType = resultType;
+				return this;
+			}
+
 			@SuppressWarnings("unchecked")
-			final Keys.SelectorFunctionKeys<I1, K> keys1 = (Keys.SelectorFunctionKeys<I1, K>) rawKeys1;
-			
-			final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 =
-					new TupleTypeInfo<Tuple2<K, I1>>(keys1.getKeyType(), inputType1);
-
-			final KeyExtractingMapper<I1, K> extractor1 =
-					new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
-
-			final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
-					new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(
-							extractor1,
-							new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1),
-							"Key Extractor 1");
-
-			final PlanLeftUnwrappingJoinOperator<I1, I2, OUT, K> join =
-					new PlanLeftUnwrappingJoinOperator<I1, I2, OUT, K>(
-							function,
-							keys1,
-							logicalKeyPositions2,
-							name,
-							outputType,
-							typeInfoWithKey1,
-							inputType2);
-			
-			join.setFirstInput(keyMapper1);
-			join.setSecondInput(input2);
-			
-			keyMapper1.setInput(input1);
-			// set parallelism
-			keyMapper1.setParallelism(input1.getParallelism());
+			public JoinOperatorBase<?, ?, OUT, ?> build() {
+				JoinOperatorBase<?, ?, OUT, ?> operator;
+				if (joinType.isOuter()) {
+					operator = new OuterJoinOperatorBase<>(
+							udf,
+							new BinaryOperatorInformation(input1Type, input2Type, resultType),
+							this.keys1.computeLogicalKeyPositions(),
+							this.keys2.computeLogicalKeyPositions(),
+							this.name,
+							getOuterJoinType());
+				} else {
+					operator = new InnerJoinOperatorBase<>(
+							udf,
+							new BinaryOperatorInformation(input1Type, input2Type, resultType),
+							this.keys1.computeLogicalKeyPositions(),
+							this.keys2.computeLogicalKeyPositions(),
+							this.name);
+				}
+
+				operator.setFirstInput(input1);
+				operator.setSecondInput(input2);
+				operator.setParallelism(parallelism);
+				operator.setCustomPartitioner(partitioner);
+				operator.setJoinHint(joinHint);
+				return operator;
+			}
 
-			return join;
+			private OuterJoinOperatorBase.OuterJoinType getOuterJoinType() {
+				switch (joinType) {
+					case LEFT_OUTER:
+						return OuterJoinOperatorBase.OuterJoinType.LEFT;
+					case RIGHT_OUTER:
+						return OuterJoinOperatorBase.OuterJoinType.RIGHT;
+					case FULL_OUTER:
+						return OuterJoinOperatorBase.OuterJoinType.FULL;
+					default:
+						throw new UnsupportedOperationException();
+				}
+			}
+
+			private static <I, K> MapOperatorBase<I, Tuple2<K, I>, MapFunction<I, Tuple2<K, I>>> createKeyMapper(
+					Keys.SelectorFunctionKeys<I, ?> rawKeys,
+					TypeInformation<I> inputType,
+					Operator<I> input,
+					String mapperName) {
+
+				@SuppressWarnings("unchecked")
+				final Keys.SelectorFunctionKeys<I, K> keys = (Keys.SelectorFunctionKeys<I, K>) rawKeys;
+				final TypeInformation<Tuple2<K, I>> typeInfoWithKey = new TupleTypeInfo<>(keys.getKeyType(), inputType);
+				final KeyExtractingMapper<I, K> extractor = new KeyExtractingMapper<>(keys.getKeyExtractor());
+
+				final MapOperatorBase<I, Tuple2<K, I>, MapFunction<I, Tuple2<K, I>>> keyMapper =
+						new MapOperatorBase<I, Tuple2<K, I>, MapFunction<I, Tuple2<K, I>>>(
+								extractor,
+								new UnaryOperatorInformation<>(inputType, typeInfoWithKey),
+								mapperName);
+				keyMapper.setInput(input);
+				keyMapper.setParallelism(input.getParallelism());
+				return keyMapper;
+			}
 		}
 	}
 	
@@ -521,16 +532,16 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 	 * @see Tuple2
 	 * @see DataSet
 	 */
-	public static final class DefaultJoin<I1, I2> extends EquiJoin<I1, I2, Tuple2<I1, I2>> {
+	public static final class DefaultJoin<I1, I2> extends EquiJoin<I1, I2, Tuple2<I1, I2>> implements JoinFunctionAssigner<I1, I2> {
 
-		protected DefaultJoin(DataSet<I1> input1, DataSet<I2> input2, 
-				Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, String joinLocationName)
+		public DefaultJoin(DataSet<I1> input1, DataSet<I2> input2,
+				Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, String joinLocationName, JoinType type)
 		{
-			super(input1, input2, keys1, keys2, 
-				(RichFlatJoinFunction<I1, I2, Tuple2<I1, I2>>) new DefaultFlatJoinFunction<I1, I2>(),
-				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint, joinLocationName);
+			super(input1, input2, keys1, keys2,
+				new DefaultFlatJoinFunction<I1, I2>(),
+				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint, joinLocationName, type);
 		}
-		
+
 		/**
 		 * Finalizes a Join transformation by applying a {@link org.apache.flink.api.common.functions.RichFlatJoinFunction} to each pair of joined elements.<br/>
 		 * Each JoinFunction call returns exactly one element. 
@@ -547,16 +558,16 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				throw new NullPointerException("Join function must not be null.");
 			}
 			TypeInformation<R> returnType = TypeExtractor.getFlatJoinReturnTypes(function, getInput1Type(), getInput2Type());
-			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), clean(function), returnType, getJoinHint(), Utils.getCallLocationName());
+			return new EquiJoin<>(getInput1(), getInput2(), getKeys1(), getKeys2(), clean(function), returnType, getJoinHint(), Utils.getCallLocationName(), joinType);
 		}
 
 		public <R> EquiJoin<I1, I2, R> with (JoinFunction<I1, I2, R> function) {
 			if (function == null) {
 				throw new NullPointerException("Join function must not be null.");
 			}
-			FlatJoinFunction<I1, I2, R> generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(clean(function));
+			FlatJoinFunction<I1, I2, R> generatedFunction = new WrappingFlatJoinFunction<>(clean(function));
 			TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
-			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint(), Utils.getCallLocationName());
+			return new EquiJoin<>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint(), Utils.getCallLocationName(), joinType);
 		}
 
 		public static class WrappingFlatJoinFunction<IN1, IN2, OUT> extends WrappingFunction<JoinFunction<IN1,IN2,OUT>> implements FlatJoinFunction<IN1, IN2, OUT> {
@@ -582,7 +593,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
 		 *
-		 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+		 * <b>Note: With the current implementation, the Project transformation loses type information.</b>
 		 *
 		 * @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
@@ -595,7 +606,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 */
 		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectFirst(int... firstFieldIndexes) {
 			JoinProjection<I1, I2> joinProjection = new JoinProjection<I1, I2>(getInput1(), getInput2(), getKeys1(), getKeys2(), getJoinHint(), firstFieldIndexes, null);
-			
+
 			return joinProjection.projectTupleX();
 		}
 		
@@ -608,7 +619,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
 		 *
-		 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+		 * <b>Note: With the current implementation, the Project transformation loses type information.</b>
 		 *
 		 * @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields. 
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
@@ -624,7 +635,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			
 			return joinProjection.projectTupleX();
 		}
-		
+
 //		public JoinOperator<I1, I2, I1> leftSemiJoin() {
 //			return new LeftSemiJoin<I1, I2>(getInput1(), getInput2(), getKeys1(), getKeys2(), getJoinHint());
 //		}
@@ -659,22 +670,21 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		private JoinProjection<I1, I2> joinProj;
 		
 		protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType) {
-			super(input1, input2, keys1, keys2, 
+			super(input1, input2, keys1, keys2,
 					new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()),
 					returnType, hint, Utils.getCallLocationName(4)); // We need to use the 4th element in the stack because the call comes through .types().
 
-			
 			joinProj = null;
 		}
 		
 		protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType, JoinProjection<I1, I2> joinProj) {
-			super(input1, input2, keys1, keys2, 
+			super(input1, input2, keys1, keys2,
 					new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()),
 					returnType, hint, Utils.getCallLocationName(4));
-			
+
 			this.joinProj = joinProj;
 		}
-		
+
 		@Override
 		protected ProjectFlatJoinFunction<I1, I2, OUT> getFunction() {
 			return (ProjectFlatJoinFunction<I1, I2, OUT>) super.getFunction();
@@ -689,7 +699,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
 		 *
-		 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+		 * <b>Note: With the current implementation, the Project transformation loses type information.</b>
 		 *
 		 * @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
@@ -716,7 +726,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
 		 *
-		 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+		 * <b>Note: With the current implementation, the Project transformation loses type information.</b>
 		 *
 		 * @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
@@ -834,188 +844,6 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 //	}
 	
 	// --------------------------------------------------------------------------------------------
-	// Builder classes for incremental construction
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Intermediate step of a Join transformation. <br/>
-	 * To continue the Join transformation, select the join key of the first input {@link DataSet} by calling 
-	 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets#where(int...)} or
-	 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets#where(KeySelector)}.
-	 *
-	 * @param <I1> The type of the first input DataSet of the Join transformation.
-	 * @param <I2> The type of the second input DataSet of the Join transformation.
-	 */
-	public static final class JoinOperatorSets<I1, I2> {
-		
-		private final DataSet<I1> input1;
-		private final DataSet<I2> input2;
-		
-		private final JoinHint joinHint;
-		
-		public JoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2) {
-			this(input1, input2, JoinHint.OPTIMIZER_CHOOSES);
-		}
-		
-		public JoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2, JoinHint hint) {
-			if (input1 == null || input2 == null) {
-				throw new NullPointerException();
-			}
-			
-			this.input1 = input1;
-			this.input2 = input2;
-			this.joinHint = hint;
-		}
-		
-		/**
-		 * Continues a Join transformation. <br/>
-		 * Defines the {@link Tuple} fields of the first join {@link DataSet} that should be used as join keys.<br/>
-		 * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br/>
-		 *
-		 * @param fields The indexes of the other Tuple fields of the first join DataSets that should be used as keys.
-		 * @return An incomplete Join transformation. 
-		 *           Call {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)} or
-		 *           {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}
-		 *           to continue the Join. 
-		 * 
-		 * @see Tuple
-		 * @see DataSet
-		 */
-		public JoinOperatorSetsPredicate where(int... fields) {
-			return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(fields, input1.getType()));
-		}
-
-		/**
-		 * Continues a Join transformation. <br/>
-		 * Defines the fields of the first join {@link DataSet} that should be used as grouping keys. Fields
-		 * are the names of member fields of the underlying type of the data set.
-		 *
-		 * @param fields The  fields of the first join DataSets that should be used as keys.
-		 * @return An incomplete Join transformation.
-		 *           Call {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)} or
-		 *           {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}
-		 *           to continue the Join.
-		 *
-		 * @see Tuple
-		 * @see DataSet
-		 */
-		public JoinOperatorSetsPredicate where(String... fields) {
-			return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(fields, input1.getType()));
-		}
-		
-		/**
-		 * Continues a Join transformation and defines a {@link KeySelector} function for the first join {@link DataSet}.</br>
-		 * The KeySelector function is called for each element of the first DataSet and extracts a single 
-		 * key value on which the DataSet is joined. </br>
-		 * 
-		 * @param keySelector The KeySelector function which extracts the key values from the DataSet on which it is joined.
-		 * @return An incomplete Join transformation. 
-		 *           Call {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)} or
-		 *           {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}
-		 *           to continue the Join. 
-		 * 
-		 * @see KeySelector
-		 * @see DataSet
-		 */
-		public <K> JoinOperatorSetsPredicate where(KeySelector<I1, K> keySelector) {
-			TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
-			return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<I1, K>(keySelector, input1.getType(), keyType));
-		}
-		
-		// ----------------------------------------------------------------------------------------
-		
-		/**
-		 * Intermediate step of a Join transformation. <br/>
-		 * To continue the Join transformation, select the join key of the second input {@link DataSet} by calling 
-		 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)} or
-		 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}.
-		 *
-		 */
-		public class JoinOperatorSetsPredicate {
-			
-			private final Keys<I1> keys1;
-			
-			private JoinOperatorSetsPredicate(Keys<I1> keys1) {
-				if (keys1 == null) {
-					throw new NullPointerException();
-				}
-				
-				if (keys1.isEmpty()) {
-					throw new InvalidProgramException("The join keys must not be empty.");
-				}
-				
-				this.keys1 = keys1;
-			}
-			
-			/**
-			 * Continues a Join transformation and defines the {@link Tuple} fields of the second join 
-			 * {@link DataSet} that should be used as join keys.<br/>
-			 * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br/>
-			 * 
-			 * The resulting {@link org.apache.flink.api.java.operators.JoinOperator.DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with 
-			 * the element of the first input being the first field of the tuple and the element of the 
-			 * second input being the second field of the tuple. 
-			 *
-			 * @param fields The indexes of the Tuple fields of the second join DataSet that should be used as keys.
-			 * @return A DefaultJoin that represents the joined DataSet.
-			 */
-			public DefaultJoin<I1, I2> equalTo(int... fields) {
-				return createJoinOperator(new Keys.ExpressionKeys<I2>(fields, input2.getType()));
-			}
-
-			/**
-			 * Continues a Join transformation and defines the  fields of the second join
-			 * {@link DataSet} that should be used as join keys.<br/>
-			 *
-			 * The resulting {@link org.apache.flink.api.java.operators.JoinOperator.DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
-			 * the element of the first input being the first field of the tuple and the element of the
-			 * second input being the second field of the tuple.
-			 *
-			 * @param fields The fields of the second join DataSet that should be used as keys.
-			 * @return A DefaultJoin that represents the joined DataSet.
-			 */
-			public DefaultJoin<I1, I2> equalTo(String... fields) {
-				return createJoinOperator(new Keys.ExpressionKeys<I2>(fields, input2.getType()));
-			}
-
-			/**
-			 * Continues a Join transformation and defines a {@link KeySelector} function for the second join {@link DataSet}.</br>
-			 * The KeySelector function is called for each element of the second DataSet and extracts a single 
-			 * key value on which the DataSet is joined. </br>
-			 * 
-			 * The resulting {@link org.apache.flink.api.java.operators.JoinOperator.DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with 
-			 * the element of the first input being the first field of the tuple and the element of the 
-			 * second input being the second field of the tuple. 
-			 * 
-			 * @param keySelector The KeySelector function which extracts the key values from the second DataSet on which it is joined.
-			 * @return A DefaultJoin that represents the joined DataSet.
-			 */
-			public <K> DefaultJoin<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
-				TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
-				return createJoinOperator(new Keys.SelectorFunctionKeys<I2, K>(keySelector, input2.getType(), keyType));
-			}
-			
-			protected DefaultJoin<I1, I2> createJoinOperator(Keys<I2> keys2) {
-				if (keys2 == null) {
-					throw new NullPointerException("The join keys may not be null.");
-				}
-				
-				if (keys2.isEmpty()) {
-					throw new InvalidProgramException("The join keys may not be empty.");
-				}
-				
-				try {
-					keys1.areCompatible(keys2);
-				} catch (IncompatibleKeysException e) {
-					throw new InvalidProgramException("The pair of join keys are not compatible with each other.",e);
-				}
-
-				return new DefaultJoin<I1, I2>(input1, input2, keys1, keys2, joinHint, Utils.getCallLocationName(4));
-			}
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
 	//  default join functions
 	// --------------------------------------------------------------------------------------------
 
@@ -1033,7 +861,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			out.collect(outTuple);
 		}
 	}
-	
+
 	public static final class ProjectFlatJoinFunction<T1, T2, R extends Tuple> extends RichFlatJoinFunction<T1, T2, R> {
 		
 		private static final long serialVersionUID = 1L;
@@ -1052,10 +880,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * @param outTupleInstance An instance of an output tuple.
 		 */
 		private ProjectFlatJoinFunction(int[] fields, boolean[] isFromFirst, R outTupleInstance) {
-			
 			if(fields.length != isFromFirst.length) {
 				throw new IllegalArgumentException("Fields and isFromFirst arrays must have same length!"); 
 			}
+
 			this.fields = fields;
 			this.isFromFirst = isFromFirst;
 			this.outTuple = outTupleInstance;
@@ -1070,16 +898,16 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		public void join(T1 in1, T2 in2, Collector<R> out) {
-			for(int i=0; i<fields.length; i++) {
-				if(isFromFirst[i]) {
-					if(fields[i] >= 0 && in1 != null) {
-						outTuple.setField(((Tuple)in1).getField(fields[i]), i);
+			for (int i = 0; i < fields.length; i++) {
+				if (isFromFirst[i]) {
+					if (fields[i] >= 0 && in1 != null) {
+						outTuple.setField(((Tuple) in1).getField(fields[i]), i);
 					} else {
 						outTuple.setField(in1, i);
 					}
 				} else {
-					if(fields[i] >= 0 && in2 != null) {
-						outTuple.setField(((Tuple)in2).getField(fields[i]), i);
+					if (fields[i] >= 0 && in2 != null) {
+						outTuple.setField(((Tuple) in2).getField(fields[i]), i);
 					} else {
 						outTuple.setField(in2, i);
 					}
@@ -1097,7 +925,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		private final Keys<I1> keys1;
 		private final Keys<I2> keys2;
 		private final JoinHint hint;
-		
+
 		private int[] fieldIndexes;
 		private boolean[] isFieldInFirst;
 		
@@ -1105,13 +933,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		private final int numFieldsDs2;
 		
 		public JoinProjection(DataSet<I1> ds1, DataSet<I2> ds2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] firstFieldIndexes, int[] secondFieldIndexes) {
-			
 			this.ds1 = ds1;
 			this.ds2 = ds2;
 			this.keys1 = keys1;
 			this.keys2 = keys2;
 			this.hint = hint;
-			
+
 			boolean isFirstTuple;
 			boolean isSecondTuple;
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/join/InnerJoinOperatorSets.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/InnerJoinOperatorSets.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/InnerJoinOperatorSets.java
new file mode 100644
index 0000000..e0e15ca
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/InnerJoinOperatorSets.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.join;
+
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Intermediate step of a Join transformation. <br/>
+ * To continue the Join transformation, select the join key of the first input {@link DataSet} by calling
+ * {@link InnerJoinOperatorSets#where(int...)} or
+ * {@link InnerJoinOperatorSets#where(KeySelector)}.
+ *
+ * @param <I1> The type of the first input DataSet of the Join transformation.
+ * @param <I2> The type of the second input DataSet of the Join transformation.
+ */
+public final class InnerJoinOperatorSets<I1, I2> extends JoinOperatorSets<I1, I2> {
+
+	public InnerJoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2) {
+		super(input1, input2);
+	}
+
+	public InnerJoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2, JoinHint hint) {
+		super(input1, input2, hint);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 *
+	 * @return An incomplete Join transformation.
+	 *           Call {@link InnerJoinOperatorSetsPredicate#equalTo(int...)} or
+	 *           {@link InnerJoinOperatorSetsPredicate#equalTo(KeySelector)}
+	 *           to continue the Join.
+	 */
+	@Override
+	public InnerJoinOperatorSetsPredicate where(int... fields) {
+		return new InnerJoinOperatorSetsPredicate(new Keys.ExpressionKeys<>(fields, input1.getType()));
+	}
+
+	/**
+	 * {@inheritDoc}
+	 *
+	 * @return An incomplete Join transformation.
+	 *           Call {@link InnerJoinOperatorSetsPredicate#equalTo(int...)} or
+	 *           {@link InnerJoinOperatorSetsPredicate#equalTo(KeySelector)}
+	 *           to continue the Join.
+	 */
+	@Override
+	public InnerJoinOperatorSetsPredicate where(String... fields) {
+		return new InnerJoinOperatorSetsPredicate(new Keys.ExpressionKeys<>(fields, input1.getType()));
+	}
+
+	/**
+	 * {@inheritDoc}
+	 *
+	 * @return An incomplete Join transformation.
+	 *           Call {@link InnerJoinOperatorSetsPredicate#equalTo(int...)} or
+	 *           {@link InnerJoinOperatorSetsPredicate#equalTo(KeySelector)}
+	 *           to continue the Join.
+	 */
+	@Override
+	public <K> InnerJoinOperatorSetsPredicate where(KeySelector<I1, K> keySelector) {
+		TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+		return new InnerJoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<>(keySelector, input1.getType(), keyType));
+	}
+
+
+	/**
+	 * Intermediate step of a Join transformation. <br/>
+	 * To continue the Join transformation, select the join key of the second input {@link DataSet} by calling
+	 * {@link InnerJoinOperatorSetsPredicate#equalTo(int...)} or
+	 * {@link InnerJoinOperatorSetsPredicate#equalTo(KeySelector)}.
+	 */
+	public class InnerJoinOperatorSetsPredicate extends JoinOperatorSetsPredicate {
+
+		private InnerJoinOperatorSetsPredicate(Keys<I1> keys1) {
+			super(keys1);
+		}
+
+		/**
+		 * Continues a Join transformation and defines the {@link Tuple} fields of the second join
+		 * {@link DataSet} that should be used as join keys.<br/>
+		 * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br/>
+		 * <p/>
+		 * The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
+		 * the element of the first input being the first field of the tuple and the element of the
+		 * second input being the second field of the tuple.
+		 *
+		 * @param fields The indexes of the Tuple fields of the second join DataSet that should be used as keys.
+		 * @return A DefaultJoin that represents the joined DataSet.
+		 */
+		@Override
+		public DefaultJoin<I1, I2> equalTo(int... fields) {
+			return createDefaultJoin(new Keys.ExpressionKeys<>(fields, input2.getType()));
+		}
+
+		/**
+		 * Continues a Join transformation and defines the fields of the second join
+		 * {@link DataSet} that should be used as join keys.<br/>
+		 * <p/>
+		 * The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
+		 * the element of the first input being the first field of the tuple and the element of the
+		 * second input being the second field of the tuple.
+		 *
+		 * @param fields The fields of the second join DataSet that should be used as keys.
+		 * @return A DefaultJoin that represents the joined DataSet.
+		 */
+		@Override
+		public DefaultJoin<I1, I2> equalTo(String... fields) {
+			return createDefaultJoin(new Keys.ExpressionKeys<>(fields, input2.getType()));
+		}
+
+		/**
+		 * Continues a Join transformation and defines a {@link KeySelector} function for the second join {@link DataSet}.</br>
+		 * The KeySelector function is called for each element of the second DataSet and extracts a single
+		 * key value on which the DataSet is joined. </br>
+		 * <p/>
+		 * The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
+		 * the element of the first input being the first field of the tuple and the element of the
+		 * second input being the second field of the tuple.
+		 *
+		 * @param keySelector The KeySelector function which extracts the key values from the second DataSet on which it is joined.
+		 * @return A DefaultJoin that represents the joined DataSet.
+		 */
+		@Override
+		public <K> DefaultJoin<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
+			TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+			return createDefaultJoin(new Keys.SelectorFunctionKeys<>(keySelector, input2.getType(), keyType));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinFunctionAssigner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinFunctionAssigner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinFunctionAssigner.java
new file mode 100644
index 0000000..163c5a6
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinFunctionAssigner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.join;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.operators.JoinOperator;
+
+/**
+ * A Join transformation that needs to be finished by specifying either a
+ * {@link JoinFunction} or a {@link FlatJoinFunction} before it can be used as an input
+ * to other operators.
+ *
+ * @param <I1> The type of the first input DataSet of the Join transformation.
+ * @param <I2> The type of the second input DataSet of the Join transformation.
+ */
+public interface JoinFunctionAssigner<I1, I2> {
+
+	<R> JoinOperator<I1, I2, R> with(JoinFunction<I1, I2, R> joinFunction);
+
+	<R> JoinOperator<I1, I2, R> with(FlatJoinFunction<I1, I2, R> joinFunction);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSets.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSets.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSets.java
new file mode 100644
index 0000000..705952c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSets.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.join;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin;
+import org.apache.flink.api.java.operators.JoinOperator.EquiJoin;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Intermediate step of an Outer Join transformation. <br/>
+ * To continue the Join transformation, select the join key of the first input {@link DataSet} by calling
+ * {@link JoinOperatorSets#where(int...)} or
+ * {@link JoinOperatorSets#where(KeySelector)}.
+ *
+ * @param <I1> The type of the first input DataSet of the Join transformation.
+ * @param <I2> The type of the second input DataSet of the Join transformation.
+ */
+public class JoinOperatorSets<I1, I2> {
+
+	protected final DataSet<I1> input1;
+	protected final DataSet<I2> input2;
+
+	protected final JoinHint joinHint;
+	protected final JoinType joinType;
+
+	public JoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2) {
+		this(input1, input2, JoinHint.OPTIMIZER_CHOOSES);
+	}
+
+	public JoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2, JoinHint hint) {
+		this(input1, input2, hint, JoinType.INNER);
+	}
+
+	public JoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2, JoinHint hint, JoinType type) {
+		if (input1 == null || input2 == null) {
+			throw new NullPointerException();
+		}
+
+		this.input1 = input1;
+		this.input2 = input2;
+		this.joinHint = hint;
+		this.joinType = type;
+	}
+
+	/**
+	 * Continues a Join transformation. <br/>
+	 * Defines the {@link Tuple} fields of the first join {@link DataSet} that should be used as join keys.<br/>
+	 * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br/>
+	 *
+	 * @param fields The indexes of the other Tuple fields of the first join DataSets that should be used as keys.
+	 * @return An incomplete Join transformation.
+	 *           Call {@link JoinOperatorSetsPredicate#equalTo(int...)} or
+	 *           {@link JoinOperatorSetsPredicate#equalTo(KeySelector)}
+	 *           to continue the Join.
+	 *
+	 * @see Tuple
+	 * @see DataSet
+	 */
+	public JoinOperatorSetsPredicate where(int... fields) {
+		return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<>(fields, input1.getType()));
+	}
+
+	/**
+	 * Continues a Join transformation. <br/>
+	 * Defines the fields of the first join {@link DataSet} that should be used as grouping keys. Fields
+	 * are the names of member fields of the underlying type of the data set.
+	 *
+	 * @param fields The  fields of the first join DataSets that should be used as keys.
+	 * @return An incomplete Join transformation.
+	 *           Call {@link JoinOperatorSetsPredicate#equalTo(int...)} or
+	 *           {@link JoinOperatorSetsPredicate#equalTo(KeySelector)}
+	 *           to continue the Join.
+	 *
+	 * @see Tuple
+	 * @see DataSet
+	 */
+	public JoinOperatorSetsPredicate where(String... fields) {
+		return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<>(fields, input1.getType()));
+	}
+
+	/**
+	 * Continues a Join transformation and defines a {@link KeySelector} function for the first join {@link DataSet}.</br>
+	 * The KeySelector function is called for each element of the first DataSet and extracts a single
+	 * key value on which the DataSet is joined. </br>
+	 *
+	 * @param keySelector The KeySelector function which extracts the key values from the DataSet on which it is joined.
+	 * @return An incomplete Join transformation.
+	 *           Call {@link JoinOperatorSetsPredicate#equalTo(int...)} or
+	 *           {@link JoinOperatorSetsPredicate#equalTo(KeySelector)}
+	 *           to continue the Join.
+	 *
+	 * @see KeySelector
+	 * @see DataSet
+	 */
+	public <K> JoinOperatorSetsPredicate where(KeySelector<I1, K> keySelector) {
+		TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+		return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<>(keySelector, input1.getType(), keyType));
+	}
+
+
+	/**
+	 * Intermediate step of a Join transformation. <br/>
+	 * To continue the Join transformation, select the join key of the second input {@link DataSet} by calling
+	 * {@link JoinOperatorSetsPredicate#equalTo(int...)} or
+	 * {@link JoinOperatorSetsPredicate#equalTo(KeySelector)}.
+	 *
+	 */
+	public class JoinOperatorSetsPredicate {
+
+		protected final Keys<I1> keys1;
+
+		protected JoinOperatorSetsPredicate(Keys<I1> keys1) {
+			if (keys1 == null) {
+				throw new NullPointerException();
+			}
+
+			if (keys1.isEmpty()) {
+				throw new InvalidProgramException("The join keys must not be empty.");
+			}
+
+			this.keys1 = keys1;
+		}
+
+		/**
+		 * Continues a Join transformation and defines the {@link Tuple} fields of the second join
+		 * {@link DataSet} that should be used as join keys.<br/>
+		 * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br/>
+		 *
+		 * The resulting {@link JoinFunctionAssigner} needs to be finished by providing a
+		 * {@link JoinFunction} by calling {@link JoinFunctionAssigner#with(JoinFunction)}
+		 *
+		 * @param fields The indexes of the Tuple fields of the second join DataSet that should be used as keys.
+		 * @return A JoinFunctionAssigner.
+		 */
+		public JoinFunctionAssigner<I1, I2> equalTo(int... fields) {
+			return createJoinFunctionAssigner(new Keys.ExpressionKeys<>(fields, input2.getType()));
+		}
+
+		/**
+		 * Continues a Join transformation and defines the fields of the second join
+		 * {@link DataSet} that should be used as join keys.<br/>
+		 *
+		 * The resulting {@link JoinFunctionAssigner} needs to be finished by providing a
+		 * {@link JoinFunction} by calling {@link JoinFunctionAssigner#with(JoinFunction)}
+		 *
+		 * @param fields The fields of the second join DataSet that should be used as keys.
+		 * @return A JoinFunctionAssigner.
+		 */
+		public JoinFunctionAssigner<I1, I2> equalTo(String... fields) {
+			return createJoinFunctionAssigner(new Keys.ExpressionKeys<>(fields, input2.getType()));
+		}
+
+		/**
+		 * Continues a Join transformation and defines a {@link KeySelector} function for the second join {@link DataSet}.</br>
+		 * The KeySelector function is called for each element of the second DataSet and extracts a single
+		 * key value on which the DataSet is joined. </br>
+		 *
+		 * The resulting {@link JoinFunctionAssigner} needs to be finished by providing a
+		 * {@link JoinFunction} by calling {@link JoinFunctionAssigner#with(JoinFunction)}
+		 *
+		 * @param keySelector The KeySelector function which extracts the key values from the second DataSet on which it is joined.
+		 * @return A JoinFunctionAssigner.
+		 */
+		public <K> JoinFunctionAssigner<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
+			TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+			return createJoinFunctionAssigner(new Keys.SelectorFunctionKeys<>(keySelector, input2.getType(), keyType));
+		}
+
+		protected JoinFunctionAssigner<I1, I2> createJoinFunctionAssigner(Keys<I2> keys2) {
+			DefaultJoin<I1, I2> join = createDefaultJoin(keys2);
+			return new DefaultJoinFunctionAssigner(join);
+		}
+
+		protected DefaultJoin<I1, I2> createDefaultJoin(Keys<I2> keys2) {
+			if (keys2 == null) {
+				throw new NullPointerException("The join keys may not be null.");
+			}
+
+			if (keys2.isEmpty()) {
+				throw new InvalidProgramException("The join keys may not be empty.");
+			}
+
+			try {
+				keys1.areCompatible(keys2);
+			} catch (Keys.IncompatibleKeysException e) {
+				throw new InvalidProgramException("The pair of join keys are not compatible with each other.",e);
+			}
+			return new DefaultJoin<>(input1, input2, keys1, keys2, joinHint, Utils.getCallLocationName(4), joinType);
+		}
+
+		private class DefaultJoinFunctionAssigner implements JoinFunctionAssigner<I1, I2> {
+
+			private final DefaultJoin<I1, I2> defaultJoin;
+
+			public DefaultJoinFunctionAssigner(DefaultJoin<I1, I2> defaultJoin) {
+				this.defaultJoin = defaultJoin;
+			}
+
+			public <R> EquiJoin<I1, I2, R> with(JoinFunction<I1, I2, R> joinFunction) {
+				return defaultJoin.with(joinFunction);
+			}
+
+			public <R> EquiJoin<I1, I2, R> with(FlatJoinFunction<I1, I2, R> joinFunction) {
+				return defaultJoin.with(joinFunction);
+			}
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinType.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinType.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinType.java
new file mode 100644
index 0000000..9d00fab
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.join;
+
+public enum JoinType {
+
+	INNER, LEFT_OUTER, RIGHT_OUTER, FULL_OUTER;
+
+	public boolean isOuter() {
+		return this == LEFT_OUTER || this == RIGHT_OUTER || this == FULL_OUTER;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingJoinOperator.java
deleted file mode 100644
index e9ded4f..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingJoinOperator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators.translation;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class PlanBothUnwrappingJoinOperator<I1, I2, OUT, K>
-	extends JoinOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
-{
-
-	public PlanBothUnwrappingJoinOperator(
-			FlatJoinFunction<I1, I2, OUT> udf,
-			Keys.SelectorFunctionKeys<I1, K> key1,
-			Keys.SelectorFunctionKeys<I2, K> key2, String name,
-			TypeInformation<OUT> resultType,
-			TypeInformation<Tuple2<K, I1>> typeInfoWithKey1,
-			TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) {
-
-		super(
-				new TupleUnwrappingJoiner<I1, I2, OUT, K>(udf),
-				new BinaryOperatorInformation<Tuple2<K, I1>, Tuple2<K, I2>, OUT>(
-						typeInfoWithKey1,
-						typeInfoWithKey2,
-						resultType),
-				key1.computeLogicalKeyPositions(), key2.computeLogicalKeyPositions(), name);
-	}
-
-	public static final class TupleUnwrappingJoiner<I1, I2, OUT, K>
-		extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
-		implements FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT> {
-
-		private static final long serialVersionUID = 1L;
-		
-		private TupleUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
-			super(wrapped);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public void join (Tuple2<K, I1> value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception {
-			wrappedFunction.join ((I1)(value1.getField(1)), (I2)(value2.getField(1)), collector);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java
deleted file mode 100644
index c6ee804..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators.translation;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class PlanLeftUnwrappingJoinOperator<I1, I2, OUT, K>
-		extends JoinOperatorBase<Tuple2<K, I1>, I2, OUT, FlatJoinFunction<Tuple2<K, I1>, I2, OUT>> {
-
-	public PlanLeftUnwrappingJoinOperator(
-			FlatJoinFunction<I1, I2, OUT> udf,
-			Keys.SelectorFunctionKeys<I1, K> key1,
-			int[] key2, String name,
-			TypeInformation<OUT> resultType,
-			TypeInformation<Tuple2<K, I1>> typeInfoWithKey1,
-			TypeInformation<I2> typeInfo2) {
-		super(
-				new TupleLeftUnwrappingJoiner<I1, I2, OUT, K>(udf),
-				new BinaryOperatorInformation<Tuple2<K, I1>, I2, OUT>(
-						typeInfoWithKey1,
-						typeInfo2,
-						resultType),
-				key1.computeLogicalKeyPositions(), key2, name);
-	}
-
-	public static final class TupleLeftUnwrappingJoiner<I1, I2, OUT, K>
-			extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
-			implements FlatJoinFunction<Tuple2<K, I1>, I2, OUT> {
-
-		private static final long serialVersionUID = 1L;
-
-		private TupleLeftUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
-			super(wrapped);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public void join (Tuple2<K, I1> value1, I2 value2, Collector<OUT> collector) throws Exception {
-			wrappedFunction.join ((I1)(value1.getField(1)), value2, collector);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java
deleted file mode 100644
index dc460f8..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators.translation;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class PlanRightUnwrappingJoinOperator<I1, I2, OUT, K>
-		extends JoinOperatorBase<I1, Tuple2<K, I2>, OUT, FlatJoinFunction<I1, Tuple2<K, I2>, OUT>> {
-
-	public PlanRightUnwrappingJoinOperator(
-			FlatJoinFunction<I1, I2, OUT> udf,
-			int[] key1,
-			Keys.SelectorFunctionKeys<I2, K> key2,
-			String name,
-			TypeInformation<OUT> type,
-			TypeInformation<I1> typeInfo1,
-			TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) {
-
-		super(
-				new TupleRightUnwrappingJoiner<I1, I2, OUT, K>(udf),
-				new BinaryOperatorInformation<I1, Tuple2<K, I2>, OUT>(
-						typeInfo1,
-						typeInfoWithKey2,
-						type),
-				key1, key2.computeLogicalKeyPositions(), name);
-	}
-
-	public static final class TupleRightUnwrappingJoiner<I1, I2, OUT, K>
-			extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
-			implements FlatJoinFunction<I1, Tuple2<K, I2>, OUT> {
-
-		private static final long serialVersionUID = 1L;
-
-		private TupleRightUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
-			super(wrapped);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public void join (I1 value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception {
-			wrappedFunction.join (value1, (I2)(value2.getField(1)), collector);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
new file mode 100644
index 0000000..18d3378
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+public final class TupleLeftUnwrappingJoiner<I1, I2, OUT, K>
+		extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
+		implements FlatJoinFunction<Tuple2<K, I1>, I2, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	public TupleLeftUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
+		super(wrapped);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void join(Tuple2<K, I1> value1, I2 value2, Collector<OUT> collector) throws Exception {
+		wrappedFunction.join(value1 == null ? null : (I1) value1.getField(1), value2, collector);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
new file mode 100644
index 0000000..004a0ae
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+public final class TupleRightUnwrappingJoiner<I1, I2, OUT, K>
+		extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
+		implements FlatJoinFunction<I1, Tuple2<K, I2>, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	public TupleRightUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
+		super(wrapped);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void join(I1 value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception {
+		wrappedFunction.join(value1, value2 == null ? null : (I2) value2.getField(1), collector);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
new file mode 100644
index 0000000..15cc137
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+public final class TupleUnwrappingJoiner<I1, I2, OUT, K>
+		extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
+		implements FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	public TupleUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
+		super(wrapped);
+	}
+
+	@Override
+	public void join(Tuple2<K, I1> value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception {
+		wrappedFunction.join(unwrap(value1), unwrap(value2), collector);
+	}
+
+	@SuppressWarnings("unchecked")
+	private <V> V unwrap(Tuple2<K, V> t) {
+		return t == null ? null : (V) (t.getField(1));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
index 6dc150e..75744fe 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
@@ -26,7 +26,7 @@ import java.util.Map;
 
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
@@ -50,7 +50,7 @@ import com.google.common.base.Preconditions;
  */
 
 @Deprecated
-public class JoinOperator extends JoinOperatorBase<Record, Record, Record, JoinFunction> implements RecordOperator {
+public class JoinOperator extends InnerJoinOperatorBase<Record, Record, Record, JoinFunction> implements RecordOperator {
 	
 	/**
 	 * The types of the keys that the operator operates on.

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
new file mode 100644
index 0000000..8bc29d4
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+@SuppressWarnings({ "unchecked", "serial" })
+public class InnerJoinOperatorBaseTest implements Serializable {
+
+	
+	@Test
+	public void testTupleBaseJoiner(){
+		final FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double, String>> joiner =
+					new FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double, String>>()
+		{
+			@Override
+			public void join(Tuple3<String, Double, Integer> first, Tuple2<Integer, String> second, Collector<Tuple2<Double, String>> out) {
+
+				assertEquals(first.f0, second.f1);
+				assertEquals(first.f2, second.f0);
+
+				out.collect(new Tuple2<>(first.f1, second.f0.toString()));
+			}
+		};
+
+		final TupleTypeInfo<Tuple3<String, Double, Integer>> leftTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo
+				(String.class, Double.class, Integer.class);
+		final TupleTypeInfo<Tuple2<Integer, String>> rightTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class,
+				String.class);
+		final TupleTypeInfo<Tuple2<Double, String>> outTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Double.class,
+				String.class);
+
+		final int[] leftKeys = new int[]{0,2};
+		final int[] rightKeys = new int[]{1,0};
+
+		final String taskName = "Collection based tuple joiner";
+
+		final BinaryOperatorInformation<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double,
+				String>> binaryOpInfo = new BinaryOperatorInformation<Tuple3<String, Double, Integer>, Tuple2<Integer,
+				String>, Tuple2<Double, String>>(leftTypeInfo, rightTypeInfo, outTypeInfo);
+
+		final InnerJoinOperatorBase<Tuple3<String, Double, Integer>, Tuple2<Integer,
+						String>, Tuple2<Double, String>, FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer,
+						String>, Tuple2<Double, String>>> base = new InnerJoinOperatorBase<Tuple3<String, Double, Integer>,
+										Tuple2<Integer, String>, Tuple2<Double, String>, FlatJoinFunction<Tuple3<String, Double, Integer>,
+										Tuple2<Integer, String>, Tuple2<Double, String>>>(joiner, binaryOpInfo, leftKeys, rightKeys, taskName);
+
+		final List<Tuple3<String, Double, Integer> > inputData1 = new ArrayList<Tuple3<String, Double,
+				Integer>>(Arrays.asList(
+				new Tuple3<>("foo", 42.0, 1),
+				new Tuple3<>("bar", 1.0, 2),
+				new Tuple3<>("bar", 2.0, 3),
+				new Tuple3<>("foobar", 3.0, 4),
+				new Tuple3<>("bar", 3.0, 3)
+		));
+
+		final List<Tuple2<Integer, String>> inputData2 = new ArrayList<Tuple2<Integer, String>>(Arrays.asList(
+				new Tuple2<>(3, "bar"),
+				new Tuple2<>(4, "foobar"),
+				new Tuple2<>(2, "foo")
+		));
+		final Set<Tuple2<Double, String>> expected = new HashSet<Tuple2<Double, String>>(Arrays.asList(
+				new Tuple2<>(2.0, "3"),
+				new Tuple2<>(3.0, "3"),
+				new Tuple2<>(3.0, "4")
+		));
+
+		try {
+			ExecutionConfig executionConfig = new ExecutionConfig();
+			executionConfig.disableObjectReuse();
+			List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			executionConfig.enableObjectReuse();
+			List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+
+			assertEquals(expected, new HashSet<>(resultSafe));
+			assertEquals(expected, new HashSet<>(resultRegular));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}