You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/09 16:20:54 UTC

[5/6] flink git commit: [FLINK-2576] [javaAPI] [scalaAPI] [optimizer] Add outerJoin to DataSet API (Java, Scala) and optimizer.

[FLINK-2576] [javaAPI] [scalaAPI] [optimizer] Add outerJoin to DataSet API (Java, Scala) and optimizer.

This closes #1138


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b00c1d7e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b00c1d7e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b00c1d7e

Branch: refs/heads/master
Commit: b00c1d7e7b83a3f2e42223fe97dd369755f884c3
Parents: 6b22227
Author: Johann Kovacs <me...@jkovacs.de>
Authored: Tue Sep 8 18:23:54 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 9 16:19:21 2015 +0200

----------------------------------------------------------------------
 .../api/common/io/ReplicatingInputFormat.java   |   5 +-
 .../operators/base/InnerJoinOperatorBase.java   | 148 ++++
 .../common/operators/base/JoinOperatorBase.java | 155 +----
 .../operators/base/OuterJoinOperatorBase.java   |   2 +-
 .../base/InnerJoinOperatorBaseTest.java         | 141 ++++
 .../operators/base/JoinOperatorBaseTest.java    | 141 ----
 .../java/org/apache/flink/api/java/DataSet.java | 197 +++++-
 .../flink/api/java/operators/JoinOperator.java  | 693 +++++++------------
 .../operators/join/InnerJoinOperatorSets.java   | 154 +++++
 .../operators/join/JoinFunctionAssigner.java    |  39 ++
 .../java/operators/join/JoinOperatorSets.java   | 235 +++++++
 .../flink/api/java/operators/join/JoinType.java |  28 +
 .../PlanBothUnwrappingJoinOperator.java         |  66 --
 .../PlanLeftUnwrappingJoinOperator.java         |  64 --
 .../PlanRightUnwrappingJoinOperator.java        |  66 --
 .../translation/TupleLeftUnwrappingJoiner.java  |  40 ++
 .../translation/TupleRightUnwrappingJoiner.java |  40 ++
 .../translation/TupleUnwrappingJoiner.java      |  45 ++
 .../api/java/record/operators/JoinOperator.java |   4 +-
 .../base/InnerJoinOperatorBaseTest.java         | 120 ++++
 .../operators/base/JoinOperatorBaseTest.java    | 122 ----
 .../SemanticPropertiesProjectionTest.java       |   6 +-
 .../SemanticPropertiesTranslationTest.java      |  16 +-
 .../flink/api/java/operators/NamesTest.java     |   4 +-
 .../DeltaIterationTranslationTest.java          |   6 +-
 .../flink/optimizer/costs/CostEstimator.java    |   3 +
 .../apache/flink/optimizer/dag/JoinNode.java    |  18 +-
 .../apache/flink/optimizer/dag/MatchNode.java   | 167 -----
 .../flink/optimizer/dag/OuterJoinNode.java      | 130 ++++
 .../AbstractSortMergeJoinDescriptor.java        |  81 +++
 .../optimizer/operators/CoGroupDescriptor.java  |  25 +-
 .../operators/OperatorDescriptorDual.java       |  24 +
 .../SortMergeFullOuterJoinDescriptor.java       |  39 ++
 .../operators/SortMergeInnerJoinDescriptor.java |  44 ++
 .../operators/SortMergeJoinDescriptor.java      | 110 ---
 .../SortMergeLeftOuterJoinDescriptor.java       |  43 ++
 .../SortMergeRightOuterJoinDescriptor.java      |  43 ++
 .../traversals/GraphCreatingVisitor.java        |  15 +-
 .../optimizer/FeedbackPropertiesMatchTest.java  |   4 +-
 .../SemanticPropertiesAPIToPlanTest.java        |   4 +-
 .../JoinGlobalPropertiesCompatibilityTest.java  |   6 +-
 .../org/apache/flink/api/scala/DataSet.scala    | 117 +++-
 .../apache/flink/api/scala/joinDataSet.scala    | 139 +++-
 .../apache/flink/test/util/TestEnvironment.java |  21 +-
 .../test/javaApiOperators/OuterJoinITCase.java  | 605 ++++++++++++++++
 .../SemanticPropertiesTranslationTest.scala     |  10 +-
 .../api/scala/operators/OuterJoinITCase.scala   | 214 ++++++
 .../DeltaIterationTranslationTest.scala         |  10 +-
 48 files changed, 2946 insertions(+), 1463 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
index 3d0ea99..5c8eb27 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.io;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -33,7 +34,7 @@ import java.io.IOException;
  * This is done by assigning all {@link org.apache.flink.core.io.InputSplit}s generated by the
  * replicated InputFormat to each parallel instance.
  *
- * Replicated data can only be used as input for a {@link org.apache.flink.api.common.operators.base.JoinOperatorBase} or
+ * Replicated data can only be used as input for a {@link InnerJoinOperatorBase} or
  * {@link org.apache.flink.api.common.operators.base.CrossOperatorBase} with the same parallelism as the DataSource.
  * Before being used as an input to a Join or Cross operator, replicated data might be processed in local pipelines by
  * by Map-based operators with the same parallelism as the source. Map-based operators are
@@ -54,7 +55,7 @@ import java.io.IOException;
  *
  * @see org.apache.flink.api.common.io.InputFormat
  * @see org.apache.flink.api.common.io.RichInputFormat
- * @see org.apache.flink.api.common.operators.base.JoinOperatorBase
+ * @see InnerJoinOperatorBase
  * @see org.apache.flink.api.common.operators.base.CrossOperatorBase
  * @see org.apache.flink.api.common.operators.base.MapOperatorBase
  * @see org.apache.flink.api.common.operators.base.FlatMapOperatorBase

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBase.java
new file mode 100644
index 0000000..8ec0c9e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBase.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @see org.apache.flink.api.common.functions.FlatJoinFunction
+ */
+public class InnerJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends JoinOperatorBase<IN1, IN2, OUT, FT> {
+
+	public InnerJoinOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+			int[] keyPositions1, int[] keyPositions2, String name) {
+		super(udf, operatorInfo, keyPositions1, keyPositions2, name);
+	}
+
+	public InnerJoinOperatorBase(FT udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1,
+			int[] keyPositions2, String name) {
+		super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
+	}
+
+	public InnerJoinOperatorBase(Class<? extends FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+			int[] keyPositions1, int[] keyPositions2, String name) {
+		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext,
+			ExecutionConfig executionConfig) throws Exception {
+		FlatJoinFunction<IN1, IN2, OUT> function = userFunction.getUserCodeObject();
+
+		FunctionUtils.setFunctionRuntimeContext(function, runtimeContext);
+		FunctionUtils.openFunction(function, this.parameters);
+
+		TypeInformation<IN1> leftInformation = getOperatorInfo().getFirstInputType();
+		TypeInformation<IN2> rightInformation = getOperatorInfo().getSecondInputType();
+		TypeInformation<OUT> outInformation = getOperatorInfo().getOutputType();
+
+		TypeSerializer<IN1> leftSerializer = leftInformation.createSerializer(executionConfig);
+		TypeSerializer<IN2> rightSerializer = rightInformation.createSerializer(executionConfig);
+
+		TypeComparator<IN1> leftComparator;
+		TypeComparator<IN2> rightComparator;
+
+		if (leftInformation instanceof AtomicType) {
+			leftComparator = ((AtomicType<IN1>) leftInformation).createComparator(true, executionConfig);
+		} else if (leftInformation instanceof CompositeType) {
+			int[] keyPositions = getKeyColumns(0);
+			boolean[] orders = new boolean[keyPositions.length];
+			Arrays.fill(orders, true);
+
+			leftComparator = ((CompositeType<IN1>) leftInformation).createComparator(keyPositions, orders, 0, executionConfig);
+		} else {
+			throw new RuntimeException("Type information for left input of type " + leftInformation.getClass()
+					.getCanonicalName() + " is not supported. Could not generate a comparator.");
+		}
+
+		if (rightInformation instanceof AtomicType) {
+			rightComparator = ((AtomicType<IN2>) rightInformation).createComparator(true, executionConfig);
+		} else if (rightInformation instanceof CompositeType) {
+			int[] keyPositions = getKeyColumns(1);
+			boolean[] orders = new boolean[keyPositions.length];
+			Arrays.fill(orders, true);
+
+			rightComparator = ((CompositeType<IN2>) rightInformation).createComparator(keyPositions, orders, 0, executionConfig);
+		} else {
+			throw new RuntimeException("Type information for right input of type " + rightInformation.getClass()
+					.getCanonicalName() + " is not supported. Could not generate a comparator.");
+		}
+
+		TypePairComparator<IN1, IN2> pairComparator = new GenericPairComparator<IN1, IN2>(leftComparator, rightComparator);
+
+		List<OUT> result = new ArrayList<OUT>();
+		Collector<OUT> collector = new CopyingListCollector<OUT>(result, outInformation.createSerializer(executionConfig));
+
+		Map<Integer, List<IN2>> probeTable = new HashMap<Integer, List<IN2>>();
+
+		//Build hash table
+		for (IN2 element : inputData2) {
+			List<IN2> list = probeTable.get(rightComparator.hash(element));
+			if (list == null) {
+				list = new ArrayList<IN2>();
+				probeTable.put(rightComparator.hash(element), list);
+			}
+
+			list.add(element);
+		}
+
+		//Probing
+		for (IN1 left : inputData1) {
+			List<IN2> matchingHashes = probeTable.get(leftComparator.hash(left));
+
+			if (matchingHashes != null) {
+				pairComparator.setReference(left);
+				for (IN2 right : matchingHashes) {
+					if (pairComparator.equalToReference(right)) {
+						function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector);
+					}
+				}
+			}
+		}
+
+		FunctionUtils.closeFunction(function);
+
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
index 799496c..98194e1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
@@ -18,207 +18,96 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.CopyingListCollector;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @see org.apache.flink.api.common.functions.FlatJoinFunction
- */
-public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
-	
+
+public abstract class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
+
 	/**
 	 * An enumeration of hints, optionally usable to tell the system how exactly execute the join.
 	 */
 	public static enum JoinHint {
-		
+
 		/**
 		 * Leave the choice how to do the join to the optimizer. If in doubt, the
 		 * optimizer will choose a repartitioning join.
 		 */
 		OPTIMIZER_CHOOSES,
-		
+
 		/**
 		 * Hint that the first join input is much smaller than the second. This results in
 		 * broadcasting and hashing the first input, unless the optimizer infers that
 		 * prior existing partitioning is available that is even cheaper to exploit.
 		 */
 		BROADCAST_HASH_FIRST,
-		
+
 		/**
 		 * Hint that the second join input is much smaller than the first. This results in
 		 * broadcasting and hashing the second input, unless the optimizer infers that
 		 * prior existing partitioning is available that is even cheaper to exploit.
 		 */
 		BROADCAST_HASH_SECOND,
-		
+
 		/**
 		 * Hint that the first join input is a bit smaller than the second. This results in
 		 * repartitioning both inputs and hashing the first input, unless the optimizer infers that
 		 * prior existing partitioning and orders are available that are even cheaper to exploit.
 		 */
 		REPARTITION_HASH_FIRST,
-		
+
 		/**
-		 * Hint that the second join input is a bit smaller than the second. This results in
+		 * Hint that the second join input is a bit smaller than the first. This results in
 		 * repartitioning both inputs and hashing the second input, unless the optimizer infers that
 		 * prior existing partitioning and orders are available that are even cheaper to exploit.
 		 */
 		REPARTITION_HASH_SECOND,
-		
+
 		/**
 		 * Hint that the join should repartitioning both inputs and use sorting and merging
 		 * as the join strategy.
 		 */
 		REPARTITION_SORT_MERGE
-	};
-	
-	// --------------------------------------------------------------------------------------------
-	
-	
+	}
+
 	private JoinHint joinHint = JoinHint.OPTIMIZER_CHOOSES;
-	
 	private Partitioner<?> partitioner;
-	
-	
+
+
 	public JoinOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
 		super(udf, operatorInfo, keyPositions1, keyPositions2, name);
 	}
 
 	public JoinOperatorBase(FT udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
-		super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
+		super(new UserCodeObjectWrapper<>(udf), operatorInfo, keyPositions1, keyPositions2, name);
 	}
-	
+
 	public JoinOperatorBase(Class<? extends FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
-		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
+		super(new UserCodeClassWrapper<>(udf), operatorInfo, keyPositions1, keyPositions2, name);
 	}
-	
-	
+
+
 	public void setJoinHint(JoinHint joinHint) {
 		if (joinHint == null) {
 			throw new IllegalArgumentException("Join Hint must not be null.");
 		}
 		this.joinHint = joinHint;
 	}
-	
+
 	public JoinHint getJoinHint() {
 		return joinHint;
 	}
-	
+
 	public void setCustomPartitioner(Partitioner<?> partitioner) {
 		this.partitioner = partitioner;
 	}
-	
+
 	public Partitioner<?> getCustomPartitioner() {
 		return partitioner;
 	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception {
-		FlatJoinFunction<IN1, IN2, OUT> function = userFunction.getUserCodeObject();
-
-		FunctionUtils.setFunctionRuntimeContext(function, runtimeContext);
-		FunctionUtils.openFunction(function, this.parameters);
-
-		TypeInformation<IN1> leftInformation = getOperatorInfo().getFirstInputType();
-		TypeInformation<IN2> rightInformation = getOperatorInfo().getSecondInputType();
-		TypeInformation<OUT> outInformation = getOperatorInfo().getOutputType();
 
-		TypeSerializer<IN1> leftSerializer = leftInformation.createSerializer(executionConfig);
-		TypeSerializer<IN2> rightSerializer = rightInformation.createSerializer(executionConfig);
-		
-		TypeComparator<IN1> leftComparator;
-		TypeComparator<IN2> rightComparator;
-
-		if (leftInformation instanceof AtomicType) {
-			leftComparator = ((AtomicType<IN1>) leftInformation).createComparator(true, executionConfig);
-		}
-		else if (leftInformation instanceof CompositeType) {
-			int[] keyPositions = getKeyColumns(0);
-			boolean[] orders = new boolean[keyPositions.length];
-			Arrays.fill(orders, true);
-
-			leftComparator = ((CompositeType<IN1>) leftInformation).createComparator(keyPositions, orders, 0, executionConfig);
-		}
-		else {
-			throw new RuntimeException("Type information for left input of type " + leftInformation.getClass()
-					.getCanonicalName() + " is not supported. Could not generate a comparator.");
-		}
-
-		if (rightInformation instanceof AtomicType) {
-			rightComparator = ((AtomicType<IN2>) rightInformation).createComparator(true, executionConfig);
-		}
-		else if (rightInformation instanceof CompositeType) {
-			int[] keyPositions = getKeyColumns(1);
-			boolean[] orders = new boolean[keyPositions.length];
-			Arrays.fill(orders, true);
-
-			rightComparator = ((CompositeType<IN2>) rightInformation).createComparator(keyPositions, orders, 0, executionConfig);
-		}
-		else {
-			throw new RuntimeException("Type information for right input of type " + rightInformation.getClass()
-					.getCanonicalName() + " is not supported. Could not generate a comparator.");
-		}
-
-		TypePairComparator<IN1, IN2> pairComparator = new GenericPairComparator<IN1, IN2>(leftComparator, rightComparator);
-
-		List<OUT> result = new ArrayList<OUT>();
-		Collector<OUT> collector = new CopyingListCollector<OUT>(result, outInformation.createSerializer(executionConfig));
-
-		Map<Integer, List<IN2>> probeTable = new HashMap<Integer, List<IN2>>();
-
-		//Build hash table
-		for (IN2 element: inputData2){
-			List<IN2> list = probeTable.get(rightComparator.hash(element));
-			if(list == null){
-				list = new ArrayList<IN2>();
-				probeTable.put(rightComparator.hash(element), list);
-			}
-
-			list.add(element);
-		}
-
-		//Probing
-		for (IN1 left: inputData1) {
-			List<IN2> matchingHashes = probeTable.get(leftComparator.hash(left));
-
-			if (matchingHashes != null) {
-				pairComparator.setReference(left);
-				for (IN2 right : matchingHashes) {
-					if (pairComparator.equalToReference(right)) {
-						function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector);
-					}
-				}
-			}
-		}
-
-		FunctionUtils.closeFunction(function);
-
-		return result;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
index 7666d10..02c8981 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
@@ -45,7 +45,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
-public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends AbstractJoinOperatorBase<IN1, IN2, OUT, FT> {
+public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends JoinOperatorBase<IN1, IN2, OUT, FT> {
 
 	public static enum OuterJoinType {LEFT, RIGHT, FULL}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
new file mode 100644
index 0000000..04505d0
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@SuppressWarnings("serial")
+public class InnerJoinOperatorBaseTest implements Serializable {
+
+	@Test
+	public void testJoinPlain(){
+		final FlatJoinFunction<String, String, Integer> joiner = new FlatJoinFunction<String, String, Integer>() {
+
+			@Override
+			public void join(String first, String second, Collector<Integer> out) throws Exception {
+				out.collect(first.length());
+				out.collect(second.length());
+			}
+		};
+
+		@SuppressWarnings({ "rawtypes", "unchecked" })
+		InnerJoinOperatorBase<String, String, Integer,
+						FlatJoinFunction<String, String,Integer> > base = new InnerJoinOperatorBase(joiner,
+				new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
+						BasicTypeInfo.INT_TYPE_INFO), new int[0], new int[0], "TestJoiner");
+
+		List<String> inputData1 = new ArrayList<String>(Arrays.asList("foo", "bar", "foobar"));
+		List<String> inputData2 = new ArrayList<String>(Arrays.asList("foobar", "foo"));
+		List<Integer> expected = new ArrayList<Integer>(Arrays.asList(3, 3, 6 ,6));
+
+		try {
+			ExecutionConfig executionConfig = new ExecutionConfig();
+			executionConfig.disableObjectReuse();
+			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, null, executionConfig);
+			executionConfig.enableObjectReuse();
+			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, null, executionConfig);
+
+			assertEquals(expected, resultSafe);
+			assertEquals(expected, resultRegular);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testJoinRich(){
+		final AtomicBoolean opened = new AtomicBoolean(false);
+		final AtomicBoolean closed = new AtomicBoolean(false);
+		final String taskName = "Test rich join function";
+
+		final RichFlatJoinFunction<String, String, Integer> joiner = new RichFlatJoinFunction<String, String, Integer>() {
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				opened.compareAndSet(false, true);
+				assertEquals(0, getRuntimeContext().getIndexOfThisSubtask());
+				assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+			}
+
+			@Override
+			public void close() throws Exception{
+				closed.compareAndSet(false, true);
+			}
+
+			@Override
+			public void join(String first, String second, Collector<Integer> out) throws Exception {
+				out.collect(first.length());
+				out.collect(second.length());
+			}
+		};
+
+		InnerJoinOperatorBase<String, String, Integer,
+						RichFlatJoinFunction<String, String, Integer>> base = new InnerJoinOperatorBase<String, String, Integer,
+										RichFlatJoinFunction<String, String, Integer>>(joiner, new BinaryOperatorInformation<String, String,
+				Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO), new int[0], new int[0], taskName);
+
+		final List<String> inputData1 = new ArrayList<String>(Arrays.asList("foo", "bar", "foobar"));
+		final List<String> inputData2 = new ArrayList<String>(Arrays.asList("foobar", "foo"));
+		final List<Integer> expected = new ArrayList<Integer>(Arrays.asList(3, 3, 6, 6));
+
+
+		try {
+			final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
+			final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
+
+			ExecutionConfig executionConfig = new ExecutionConfig();
+			executionConfig.disableObjectReuse();
+			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+			executionConfig.enableObjectReuse();
+			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+
+			assertEquals(expected, resultSafe);
+			assertEquals(expected, resultRegular);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+
+		assertTrue(opened.get());
+		assertTrue(closed.get());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
deleted file mode 100644
index 6d4ff33..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.operators.base;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.RichFlatJoinFunction;
-import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-@SuppressWarnings("serial")
-public class JoinOperatorBaseTest implements Serializable {
-
-	@Test
-	public void testJoinPlain(){
-		final FlatJoinFunction<String, String, Integer> joiner = new FlatJoinFunction<String, String, Integer>() {
-
-			@Override
-			public void join(String first, String second, Collector<Integer> out) throws Exception {
-				out.collect(first.length());
-				out.collect(second.length());
-			}
-		};
-
-		@SuppressWarnings({ "rawtypes", "unchecked" })
-		JoinOperatorBase<String, String, Integer,
-				FlatJoinFunction<String, String,Integer> > base = new JoinOperatorBase(joiner,
-				new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
-						BasicTypeInfo.INT_TYPE_INFO), new int[0], new int[0], "TestJoiner");
-
-		List<String> inputData1 = new ArrayList<String>(Arrays.asList("foo", "bar", "foobar"));
-		List<String> inputData2 = new ArrayList<String>(Arrays.asList("foobar", "foo"));
-		List<Integer> expected = new ArrayList<Integer>(Arrays.asList(3, 3, 6 ,6));
-
-		try {
-			ExecutionConfig executionConfig = new ExecutionConfig();
-			executionConfig.disableObjectReuse();
-			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, null, executionConfig);
-			executionConfig.enableObjectReuse();
-			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, null, executionConfig);
-
-			assertEquals(expected, resultSafe);
-			assertEquals(expected, resultRegular);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testJoinRich(){
-		final AtomicBoolean opened = new AtomicBoolean(false);
-		final AtomicBoolean closed = new AtomicBoolean(false);
-		final String taskName = "Test rich join function";
-
-		final RichFlatJoinFunction<String, String, Integer> joiner = new RichFlatJoinFunction<String, String, Integer>() {
-			@Override
-			public void open(Configuration parameters) throws Exception {
-				opened.compareAndSet(false, true);
-				assertEquals(0, getRuntimeContext().getIndexOfThisSubtask());
-				assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-			}
-
-			@Override
-			public void close() throws Exception{
-				closed.compareAndSet(false, true);
-			}
-
-			@Override
-			public void join(String first, String second, Collector<Integer> out) throws Exception {
-				out.collect(first.length());
-				out.collect(second.length());
-			}
-		};
-
-		JoinOperatorBase<String, String, Integer,
-				RichFlatJoinFunction<String, String, Integer>> base = new JoinOperatorBase<String, String, Integer,
-				RichFlatJoinFunction<String, String, Integer>>(joiner, new BinaryOperatorInformation<String, String,
-				Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
-				BasicTypeInfo.INT_TYPE_INFO), new int[0], new int[0], taskName);
-
-		final List<String> inputData1 = new ArrayList<String>(Arrays.asList("foo", "bar", "foobar"));
-		final List<String> inputData2 = new ArrayList<String>(Arrays.asList("foobar", "foo"));
-		final List<Integer> expected = new ArrayList<Integer>(Arrays.asList(3, 3, 6, 6));
-
-
-		try {
-			final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
-			final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
-
-			ExecutionConfig executionConfig = new ExecutionConfig();
-			executionConfig.disableObjectReuse();
-			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
-			executionConfig.enableObjectReuse();
-			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
-
-			assertEquals(expected, resultSafe);
-			assertEquals(expected, resultRegular);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-
-		assertTrue(opened.get());
-		assertTrue(closed.get());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 98a94c6..6c8df21 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -63,7 +63,7 @@ import org.apache.flink.api.java.operators.FlatMapOperator;
 import org.apache.flink.api.java.operators.GroupCombineOperator;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
+import org.apache.flink.api.java.operators.join.InnerJoinOperatorSets;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
@@ -75,6 +75,8 @@ import org.apache.flink.api.java.operators.SortPartitionOperator;
 import org.apache.flink.api.java.operators.SortedGrouping;
 import org.apache.flink.api.java.operators.UnionOperator;
 import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.operators.join.JoinOperatorSets;
+import org.apache.flink.api.java.operators.join.JoinType;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
@@ -290,8 +292,8 @@ public abstract class DataSet<T> {
 	
 	/**
 	 * Applies a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
-	 * <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br>
-	 * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
+	 * <b>Note: Only Tuple DataSets can be projected using field indexes.</b><br/>
+	 * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.<br/>
 	 * Additional fields can be added to the projection by calling {@link ProjectOperator#project(int[])}.
 	 *
 	 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
@@ -638,7 +640,7 @@ public abstract class DataSet<T> {
 	/**
 	 * Groups a {@link DataSet} using a {@link KeySelector} function. 
 	 * The KeySelector function is called for each element of the DataSet and extracts a single 
-	 *   key value on which the DataSet is grouped. </br>
+	 *   key value on which the DataSet is grouped. <br/>
 	 * This method returns an {@link UnsortedGrouping} on which one of the following grouping transformation 
 	 *   can be applied. 
 	 * <ul>
@@ -665,7 +667,7 @@ public abstract class DataSet<T> {
 	
 	/**
 	 * Groups a {@link Tuple} {@link DataSet} using field position keys.<br/> 
-	 * <b>Note: Field position keys only be specified for Tuple DataSets.</b></br>
+	 * <b>Note: Field position keys only be specified for Tuple DataSets.</b><br/>
 	 * The field position keys specify the fields of Tuples on which the DataSet is grouped.
 	 * This method returns an {@link UnsortedGrouping} on which one of the following grouping transformation 
 	 *   can be applied. 
@@ -725,83 +727,204 @@ public abstract class DataSet<T> {
 	 * Initiates a Join transformation. <br/>
 	 * A Join transformation joins the elements of two 
 	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine 
-	 *   joining elements into one DataSet.</br>
+	 *   joining elements into one DataSet.<br/>
 	 * 
-	 * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
+	 * This method returns a {@link InnerJoinOperatorSets} on which one of the {@code where} methods
 	 * can be called to define the join key of the first joining (i.e., this) DataSet.
 	 *  
 	 * @param other The other DataSet with which this DataSet is joined.
-	 * @return A JoinOperatorSets to continue the definition of the Join transformation.
+	 * @return A InnerJoinOperatorSets to continue the definition of the Join transformation.
 	 * 
-	 * @see JoinOperatorSets
+	 * @see InnerJoinOperatorSets
 	 * @see DataSet
 	 */
-	public <R> JoinOperatorSets<T, R> join(DataSet<R> other) {
-		return new JoinOperatorSets<T, R>(this, other);
+	public <R> InnerJoinOperatorSets<T, R> join(DataSet<R> other) {
+		return new InnerJoinOperatorSets<T, R>(this, other);
 	}
 	
 	/**
 	 * Initiates a Join transformation. <br/>
 	 * A Join transformation joins the elements of two 
 	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine 
-	 *   joining elements into one DataSet.</br>
+	 *   joining elements into one DataSet.<br/>
 	 * 
-	 * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
+	 * This method returns a {@link InnerJoinOperatorSets} on which one of the {@code where} methods
 	 * can be called to define the join key of the first joining (i.e., this) DataSet.
 	 *  
 	 * @param other The other DataSet with which this DataSet is joined.
-	 * @param strategy The strategy that should be used execute the join. If {@code null} is give, then the
+	 * @param strategy The strategy that should be used execute the join. If {@code null} is given, then the
 	 *                 optimizer will pick the join strategy.
-	 * @return A JoinOperatorSets to continue the definition of the Join transformation.
+	 * @return A InnerJoinOperatorSets to continue the definition of the Join transformation.
 	 * 
-	 * @see JoinOperatorSets
+	 * @see InnerJoinOperatorSets
 	 * @see DataSet
 	 */
-	public <R> JoinOperatorSets<T, R> join(DataSet<R> other, JoinHint strategy) {
-		return new JoinOperatorSets<T, R>(this, other, strategy);
+	public <R> InnerJoinOperatorSets<T, R> join(DataSet<R> other, JoinHint strategy) {
+		return new InnerJoinOperatorSets<T, R>(this, other, strategy);
 	}
 
 	/**
 	 * Initiates a Join transformation. <br/>
 	 * A Join transformation joins the elements of two 
 	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine 
-	 *   joining elements into one DataSet.</br>
+	 *   joining elements into one DataSet.<br/>
 	 * This method also gives the hint to the optimizer that the second DataSet to join is much
-	 *   smaller than the first one.</br>
-	 * This method returns a {@link JoinOperatorSets} on which 
-	 *   {@link JoinOperatorSets#where(String...)} needs to be called to define the join key of the first 
+	 *   smaller than the first one.<br/>
+	 * This method returns a {@link InnerJoinOperatorSets} on which
+	 *   {@link InnerJoinOperatorSets#where(String...)} needs to be called to define the join key of the first
 	 *   joining (i.e., this) DataSet.
 	 *  
 	 * @param other The other DataSet with which this DataSet is joined.
-	 * @return A JoinOperatorSets to continue the definition of the Join transformation.
+	 * @return A InnerJoinOperatorSets to continue the definition of the Join transformation.
 	 * 
-	 * @see JoinOperatorSets
+	 * @see InnerJoinOperatorSets
 	 * @see DataSet
 	 */
-	public <R> JoinOperatorSets<T, R> joinWithTiny(DataSet<R> other) {
-		return new JoinOperatorSets<T, R>(this, other, JoinHint.BROADCAST_HASH_SECOND);
+	public <R> InnerJoinOperatorSets<T, R> joinWithTiny(DataSet<R> other) {
+		return new InnerJoinOperatorSets<T, R>(this, other, JoinHint.BROADCAST_HASH_SECOND);
 	}
 	
 	/**
 	 * Initiates a Join transformation.<br/>
 	 * A Join transformation joins the elements of two 
 	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine 
-	 *   joining elements into one DataSet.</br>
+	 *   joining elements into one DataSet.<br/>
 	 * This method also gives the hint to the optimizer that the second DataSet to join is much
-	 *   larger than the first one.</br>
-	 * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
+	 *   larger than the first one.<br/>
+	 * This method returns a {@link InnerJoinOperatorSets} on which one of the {@code where} methods
 	 * can be called to define the join key of the first joining (i.e., this) DataSet.
 	 *  
 	 * @param other The other DataSet with which this DataSet is joined.
 	 * @return A JoinOperatorSet to continue the definition of the Join transformation.
 	 * 
+	 * @see InnerJoinOperatorSets
+	 * @see DataSet
+	 */
+	public <R> InnerJoinOperatorSets<T, R> joinWithHuge(DataSet<R> other) {
+		return new InnerJoinOperatorSets<T, R>(this, other, JoinHint.BROADCAST_HASH_FIRST);
+	}
+
+	/**
+	 * Initiates a Left Outer Join transformation.<br/>
+	 * An Outer Join transformation joins two elements of two
+	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
+	 *   joining elements into one DataSet.<br/>
+	 * Elements of the <b>left</b> DataSet (i.e. {@code this}) that do not have a matching
+	 *   element on the other side are joined with {@code null} and emitted to the
+	 *   resulting DataSet.
+	 *
+	 * @param other The other DataSet with which this DataSet is joined.
+	 * @return A JoinOperatorSet to continue the definition of the Join transformation.
+	 *
 	 * @see JoinOperatorSets
 	 * @see DataSet
 	 */
-	public <R> JoinOperatorSets<T, R> joinWithHuge(DataSet<R> other) {
-		return new JoinOperatorSets<T, R>(this, other, JoinHint.BROADCAST_HASH_FIRST);
+	public <R> JoinOperatorSets<T, R> leftOuterJoin(DataSet<R> other) {
+		return new JoinOperatorSets<>(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.LEFT_OUTER);
 	}
-	
+
+	/**
+	 * Initiates a Left Outer Join transformation.<br/>
+	 * An Outer Join transformation joins two elements of two
+	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
+	 *   joining elements into one DataSet.<br/>
+	 * Elements of the <b>left</b> DataSet (i.e. {@code this}) that do not have a matching
+	 *   element on the other side are joined with {@code null} and emitted to the
+	 *   resulting DataSet.
+	 *
+	 * @param other The other DataSet with which this DataSet is joined.
+	 * @param strategy The strategy that should be used execute the join. If {@code null} is given, then the
+	 *                 optimizer will pick the join strategy.
+	 * @return A JoinOperatorSet to continue the definition of the Join transformation.
+	 *
+	 * @see JoinOperatorSets
+	 * @see DataSet
+	 */
+	public <R> JoinOperatorSets<T, R> leftOuterJoin(DataSet<R> other, JoinHint strategy) {
+		return new JoinOperatorSets<>(this, other, strategy, JoinType.LEFT_OUTER);
+	}
+
+	/**
+	 * Initiates a Right Outer Join transformation.<br/>
+	 * An Outer Join transformation joins two elements of two
+	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
+	 *   joining elements into one DataSet.<br/>
+	 * Elements of the <b>right</b> DataSet (i.e. {@code other}) that do not have a matching
+	 *   element on {@code this} side are joined with {@code null} and emitted to the
+	 *   resulting DataSet.
+	 *
+	 * @param other The other DataSet with which this DataSet is joined.
+	 * @return A JoinOperatorSet to continue the definition of the Join transformation.
+	 *
+	 * @see JoinOperatorSets
+	 * @see DataSet
+	 */
+	public <R> JoinOperatorSets<T, R> rightOuterJoin(DataSet<R> other) {
+		return new JoinOperatorSets<>(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.RIGHT_OUTER);
+	}
+
+	/**
+	 * Initiates a Right Outer Join transformation.<br/>
+	 * An Outer Join transformation joins two elements of two
+	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
+	 *   joining elements into one DataSet.<br/>
+	 * Elements of the <b>right</b> DataSet (i.e. {@code other}) that do not have a matching
+	 *   element on {@code this} side are joined with {@code null} and emitted to the
+	 *   resulting DataSet.
+	 *
+	 * @param other The other DataSet with which this DataSet is joined.
+	 * @param strategy The strategy that should be used execute the join. If {@code null} is given, then the
+	 *                 optimizer will pick the join strategy.
+	 * @return A JoinOperatorSet to continue the definition of the Join transformation.
+	 *
+	 * @see JoinOperatorSets
+	 * @see DataSet
+	 */
+	public <R> JoinOperatorSets<T, R> rightOuterJoin(DataSet<R> other, JoinHint strategy) {
+		return new JoinOperatorSets<>(this, other, strategy, JoinType.RIGHT_OUTER);
+	}
+
+	/**
+	 * Initiates a Full Outer Join transformation.<br/>
+	 * An Outer Join transformation joins two elements of two
+	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
+	 *   joining elements into one DataSet.<br/>
+	 * Elements of <b>both</b> DataSets that do not have a matching
+	 *   element on the opposing side are joined with {@code null} and emitted to the
+	 *   resulting DataSet.
+	 *
+	 * @param other The other DataSet with which this DataSet is joined.
+	 * @return A JoinOperatorSet to continue the definition of the Join transformation.
+	 *
+	 * @see JoinOperatorSets
+	 * @see DataSet
+	 */
+	public <R> JoinOperatorSets<T, R> fullOuterJoin(DataSet<R> other) {
+		return new JoinOperatorSets<>(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.FULL_OUTER);
+	}
+
+	/**
+	 * Initiates a Full Outer Join transformation.<br/>
+	 * An Outer Join transformation joins two elements of two
+	 *   {@link DataSet DataSets} on key equality and provides multiple ways to combine
+	 *   joining elements into one DataSet.<br/>
+	 * Elements of <b>both</b> DataSets that do not have a matching
+	 *   element on the opposing side are joined with {@code null} and emitted to the
+	 *   resulting DataSet.
+	 *
+	 * @param other The other DataSet with which this DataSet is joined.
+	 * @param strategy The strategy that should be used execute the join. If {@code null} is given, then the
+	 *                 optimizer will pick the join strategy.
+	 * @return A JoinOperatorSet to continue the definition of the Join transformation.
+	 *
+	 * @see JoinOperatorSets
+	 * @see DataSet
+	 */
+	public <R> JoinOperatorSets<T, R> fullOuterJoin(DataSet<R> other, JoinHint strategy) {
+		return new JoinOperatorSets<>(this, other, strategy, JoinType.FULL_OUTER);
+	}
+
+
 	// --------------------------------------------------------------------------------------------
 	//  Co-Grouping
 	// --------------------------------------------------------------------------------------------
@@ -812,9 +935,9 @@ public abstract class DataSet<T> {
 	 *   two {@link DataSet DataSets} into one DataSet. It groups each DataSet individually on a key and 
 	 *   gives groups of both DataSets with equal keys together into a {@link org.apache.flink.api.common.functions.RichCoGroupFunction}.
 	 *   If a DataSet has a group with no matching key in the other DataSet, the CoGroupFunction
-	 *   is called with an empty group for the non-existing group.</br>
+	 *   is called with an empty group for the non-existing group.<br/>
 	 * The CoGroupFunction can iterate over the elements of both groups and return any number 
-	 *   of elements including none.</br>
+	 *   of elements including none.<br/>
 	 * This method returns a {@link CoGroupOperatorSets} on which one of the {@code where} methods
 	 * can be called to define the join key of the first joining (i.e., this) DataSet.
 	 * 
@@ -860,7 +983,7 @@ public abstract class DataSet<T> {
 	 * <p>
 	 * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
 	 * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
-	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
+	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.<br/>
 	 * 
 	 * @param other The other DataSet with which this DataSet is crossed. 
 	 * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
@@ -890,7 +1013,7 @@ public abstract class DataSet<T> {
 	 * <p>
 	 * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
 	 * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
-	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
+	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.<br/>
 	 * 
 	 * @param other The other DataSet with which this DataSet is crossed. 
 	 * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
@@ -920,7 +1043,7 @@ public abstract class DataSet<T> {
 	 * <p>
 	 * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
 	 * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
-	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
+	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.<br/>
 	 * 
 	 * @param other The other DataSet with which this DataSet is crossed. 
 	 * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.