You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/19 18:01:16 UTC

[10/10] flink git commit: [FLINK-2107] Add hash-based strategies for left and right outer joins.

[FLINK-2107] Add hash-based strategies for left and right outer joins.

This closes #1262


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5671c77c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5671c77c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5671c77c

Branch: refs/heads/master
Commit: 5671c77c3afbb84ed89427bbab9b1355d0b1f8cc
Parents: c900577
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Oct 15 10:58:58 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Oct 19 16:17:42 2015 +0200

----------------------------------------------------------------------
 docs/apis/dataset_transformations.md            |  58 +-
 .../java/org/apache/flink/api/java/DataSet.java |  29 +-
 .../operator/FullOuterJoinOperatorTest.java     | 250 +++++
 .../operator/LeftOuterJoinOperatorTest.java     | 251 +++++
 .../operator/RightOuterJoinOperatorTest.java    | 250 +++++
 .../flink/optimizer/costs/CostEstimator.java    |   2 +
 .../flink/optimizer/dag/OuterJoinNode.java      |  97 +-
 .../HashLeftOuterJoinBuildSecondDescriptor.java |  68 ++
 .../HashRightOuterJoinBuildFirstDescriptor.java |  68 ++
 .../AbstractCachedBuildSideJoinDriver.java      |  28 +-
 .../operators/AbstractOuterJoinDriver.java      |  11 +-
 .../flink/runtime/operators/DriverStrategy.java |   8 +-
 .../runtime/operators/FullOuterJoinDriver.java  |   6 +-
 .../flink/runtime/operators/JoinDriver.java     |  20 +-
 .../runtime/operators/LeftOuterJoinDriver.java  |  28 +-
 .../runtime/operators/RightOuterJoinDriver.java |  28 +-
 .../operators/hash/HashJoinIteratorBase.java    |  56 ++
 .../operators/hash/HashMatchIteratorBase.java   |  56 --
 .../NonReusingBuildFirstHashJoinIterator.java   | 166 ++++
 .../NonReusingBuildFirstHashMatchIterator.java  | 152 ---
 ...ingBuildFirstReOpenableHashJoinIterator.java |  87 ++
 ...ngBuildFirstReOpenableHashMatchIterator.java |  86 --
 .../NonReusingBuildSecondHashJoinIterator.java  | 164 ++++
 .../NonReusingBuildSecondHashMatchIterator.java | 150 ---
 ...ngBuildSecondReOpenableHashJoinIterator.java |  88 ++
 ...gBuildSecondReOpenableHashMatchIterator.java |  86 --
 .../hash/ReusingBuildFirstHashJoinIterator.java | 168 ++++
 .../ReusingBuildFirstHashMatchIterator.java     | 154 ---
 ...ingBuildFirstReOpenableHashJoinIterator.java |  87 ++
 ...ngBuildFirstReOpenableHashMatchIterator.java |  86 --
 .../ReusingBuildSecondHashJoinIterator.java     | 166 ++++
 .../ReusingBuildSecondHashMatchIterator.java    | 152 ---
 ...ngBuildSecondReOpenableHashJoinIterator.java |  87 ++
 ...gBuildSecondReOpenableHashMatchIterator.java |  85 --
 .../AbstractOuterJoinTaskExternalITCase.java    |  23 +-
 .../operators/AbstractOuterJoinTaskTest.java    |  35 +-
 .../FullOuterJoinTaskExternalITCase.java        |   7 +-
 .../operators/FullOuterJoinTaskTest.java        |   7 +-
 .../LeftOuterJoinTaskExternalITCase.java        |  42 +-
 .../operators/LeftOuterJoinTaskTest.java        | 230 ++++-
 .../RightOuterJoinTaskExternalITCase.java       |  44 +-
 .../operators/RightOuterJoinTaskTest.java       | 223 ++++-
 .../hash/NonReusingHashJoinIteratorITCase.java  | 947 +++++++++++++++++++
 .../hash/NonReusingHashMatchIteratorITCase.java | 766 ---------------
 .../NonReusingReOpenableHashTableITCase.java    |  12 +-
 .../hash/ReusingHashJoinIteratorITCase.java     | 709 ++++++++++++++
 .../hash/ReusingHashMatchIteratorITCase.java    | 768 ---------------
 .../hash/ReusingReOpenableHashTableITCase.java  |  15 +-
 .../operators/util/HashVsSortMiniBenchmark.java |  16 +-
 .../org/apache/flink/api/scala/DataSet.scala    |  28 +-
 .../test/javaApiOperators/OuterJoinITCase.java  |  61 +-
 51 files changed, 4524 insertions(+), 2687 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/docs/apis/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/apis/dataset_transformations.md b/docs/apis/dataset_transformations.md
index cc7e742..feed121 100644
--- a/docs/apis/dataset_transformations.md
+++ b/docs/apis/dataset_transformations.md
@@ -1408,25 +1408,25 @@ Not supported.
 
 The following hints are available:
 
-* OPTIMIZER_CHOOSES: Equivalent to not giving a hint at all, leaves the choice to the system.
+* `OPTIMIZER_CHOOSES`: Equivalent to not giving a hint at all, leaves the choice to the system.
 
-* BROADCAST_HASH_FIRST: Broadcasts the first input and builds a hash table from it, which is
+* `BROADCAST_HASH_FIRST`: Broadcasts the first input and builds a hash table from it, which is
   probed by the second input. A good strategy if the first input is very small.
 
-* BROADCAST_HASH_SECOND: Broadcasts the second input and builds a hash table from it, which is
+* `BROADCAST_HASH_SECOND`: Broadcasts the second input and builds a hash table from it, which is
   probed by the first input. A good strategy if the second input is very small.
 
-* REPARTITION_HASH_FIRST: The system partitions (shuffles) each input (unless the input is already
+* `REPARTITION_HASH_FIRST`: The system partitions (shuffles) each input (unless the input is already
   partitioned) and builds a hash table from the first input. This strategy is good if the first
   input is smaller than the second, but both inputs are still large.
   *Note:* This is the default fallback strategy that the system uses if no size estimates can be made
   and no pre-existing partitiongs and sort-orders can be re-used.
 
-* REPARTITION_HASH_SECOND: The system partitions (shuffles) each input (unless the input is already
+* `REPARTITION_HASH_SECOND`: The system partitions (shuffles) each input (unless the input is already
   partitioned) and builds a hash table from the second input. This strategy is good if the second
   input is smaller than the first, but both inputs are still large.
 
-* REPARTITION_SORT_MERGE: The system partitions (shuffles) each input (unless the input is already
+* `REPARTITION_SORT_MERGE`: The system partitions (shuffles) each input (unless the input is already
   partitioned) and sorts each input (unless it is already sorted). The inputs are joined by
   a streamed merge of the sorted inputs. This strategy is good if one or both of the inputs are
   already sorted.
@@ -1558,9 +1558,13 @@ to manually pick a strategy, in case you want to enforce a specific way of execu
 DataSet<SomeType> input1 = // [...]
 DataSet<AnotherType> input2 = // [...]
 
-DataSet<Tuple2<SomeType, AnotherType> result =
+DataSet<Tuple2<SomeType, AnotherType> result1 =
       input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE)
             .where("id").equalTo("key");
+
+DataSet<Tuple2<SomeType, AnotherType> result2 =
+      input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST)
+            .where("id").equalTo("key");
 ~~~
 
 </div>
@@ -1573,6 +1577,8 @@ val input2: DataSet[AnotherType] = // [...]
 // hint that the second DataSet is very small
 val result1 = input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE).where("id").equalTo("key")
 
+val result2 = input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")
+
 ~~~
 
 </div>
@@ -1585,15 +1591,47 @@ Not supported.
 </div>
 </div>
 
-**NOTE:** Right now, outer joins can only be executed using the `REPARTITION_SORT_MERGE` strategy. Further execution strategies will be added in the future.
+The following hints are available.
 
-* OPTIMIZER_CHOOSES: Equivalent to not giving a hint at all, leaves the choice to the system.
+* `OPTIMIZER_CHOOSES`: Equivalent to not giving a hint at all, leaves the choice to the system.
 
-* REPARTITION_SORT_MERGE: The system partitions (shuffles) each input (unless the input is already
+* `BROADCAST_HASH_FIRST`: Broadcasts the first input and builds a hash table from it, which is
+  probed by the second input. A good strategy if the first input is very small.
+
+* `BROADCAST_HASH_SECOND`: Broadcasts the second input and builds a hash table from it, which is
+  probed by the first input. A good strategy if the second input is very small.
+
+* `REPARTITION_HASH_FIRST`: The system partitions (shuffles) each input (unless the input is already
+  partitioned) and builds a hash table from the first input. This strategy is good if the first
+  input is smaller than the second, but both inputs are still large.
+  
+* `REPARTITION_HASH_SECOND`: The system partitions (shuffles) each input (unless the input is already
+  partitioned) and builds a hash table from the second input. This strategy is good if the second
+  input is smaller than the first, but both inputs are still large.
+
+* `REPARTITION_SORT_MERGE`: The system partitions (shuffles) each input (unless the input is already
   partitioned) and sorts each input (unless it is already sorted). The inputs are joined by
   a streamed merge of the sorted inputs. This strategy is good if one or both of the inputs are
   already sorted.
 
+**NOTE:** Not all execution strategies are supported by every outer join type, yet.
+
+* `LeftOuterJoin` supports:
+  * `OPTIMIZER_CHOOSES`
+  * `BROADCAST_HASH_SECOND`
+  * `REPARTITION_HASH_SECOND`
+  * `REPARTITION_SORT_MERGE`
+
+* `RightOuterJoin` supports:
+  * `OPTIMIZER_CHOOSES`
+  * `BROADCAST_HASH_FIRST`
+  * `REPARTITION_HASH_FIRST`
+  * `REPARTITION_SORT_MERGE`
+
+* `FullOuterJoin` supports:
+  * `OPTIMIZER_CHOOSES`
+  * `REPARTITION_SORT_MERGE`
+
 
 ### Cross
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 164b4af..3101f35 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -841,7 +841,16 @@ public abstract class DataSet<T> {
 	 * @see DataSet
 	 */
 	public <R> JoinOperatorSetsBase<T, R> leftOuterJoin(DataSet<R> other, JoinHint strategy) {
-		return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER);
+		switch(strategy) {
+			case OPTIMIZER_CHOOSES:
+			case REPARTITION_SORT_MERGE:
+			case REPARTITION_HASH_SECOND:
+			case BROADCAST_HASH_SECOND:
+				return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER);
+			default:
+				throw new InvalidProgramException("Invalid JoinHint for LeftOuterJoin: "+strategy);
+		}
+
 	}
 
 	/**
@@ -881,7 +890,15 @@ public abstract class DataSet<T> {
 	 * @see DataSet
 	 */
 	public <R> JoinOperatorSetsBase<T, R> rightOuterJoin(DataSet<R> other, JoinHint strategy) {
-		return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER);
+		switch(strategy) {
+			case OPTIMIZER_CHOOSES:
+			case REPARTITION_SORT_MERGE:
+			case REPARTITION_HASH_FIRST:
+			case BROADCAST_HASH_FIRST:
+				return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER);
+			default:
+			throw new InvalidProgramException("Invalid JoinHint for RightOuterJoin: "+strategy);
+		}
 	}
 
 	/**
@@ -921,7 +938,13 @@ public abstract class DataSet<T> {
 	 * @see DataSet
 	 */
 	public <R> JoinOperatorSetsBase<T, R> fullOuterJoin(DataSet<R> other, JoinHint strategy) {
-		return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER);
+		switch(strategy) {
+			case OPTIMIZER_CHOOSES:
+			case REPARTITION_SORT_MERGE:
+				return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER);
+			default:
+			throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: "+strategy);
+		}
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
new file mode 100644
index 0000000..fe362f0
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FullOuterJoinOperatorTest {
+
+	// TUPLE DATA
+	private static final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+		new ArrayList<>();
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+		TupleTypeInfo<>(
+		BasicTypeInfo.INT_TYPE_INFO,
+		BasicTypeInfo.LONG_TYPE_INFO,
+		BasicTypeInfo.STRING_TYPE_INFO,
+		BasicTypeInfo.LONG_TYPE_INFO,
+		BasicTypeInfo.INT_TYPE_INFO
+	);
+
+	@Test
+	public void testFullOuter1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.fullOuterJoin(ds2)
+			.where(0).equalTo(4)
+			.with(new DummyJoin());
+	}
+
+	@Test
+	public void testFullOuter2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.fullOuterJoin(ds2)
+				.where("f1").equalTo("f3")
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testFullOuter3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.fullOuterJoin(ds2)
+				.where(new IntKeySelector()).equalTo(new IntKeySelector())
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testFullOuter4() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.fullOuterJoin(ds2)
+				.where(0).equalTo(new IntKeySelector())
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testFullOuter5() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.fullOuterJoin(ds2)
+				.where(new IntKeySelector()).equalTo("f4")
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testFullOuter6() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.fullOuterJoin(ds2)
+				.where("f0").equalTo(4)
+				.with(new DummyJoin());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testFullOuter7() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// invalid key position
+		ds1.fullOuterJoin(ds2)
+				.where(5).equalTo(0)
+				.with(new DummyJoin());
+	}
+
+	@Test(expected = CompositeType.InvalidFieldReferenceException.class)
+	public void testFullOuter8() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// invalid key reference
+		ds1.fullOuterJoin(ds2)
+				.where(1).equalTo("f5")
+				.with(new DummyJoin());
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testFullOuter9() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// key types do not match
+		ds1.fullOuterJoin(ds2)
+				.where(0).equalTo(1)
+				.with(new DummyJoin());
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testFullOuter10() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// key types do not match
+		ds1.fullOuterJoin(ds2)
+				.where(new IntKeySelector()).equalTo(new LongKeySelector())
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testFullOuterStrategy1() {
+		this.testFullOuterStrategies(JoinHint.OPTIMIZER_CHOOSES);
+	}
+
+	@Test
+	public void testFullOuterStrategy2() {
+		this.testFullOuterStrategies(JoinHint.REPARTITION_SORT_MERGE);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testFullOuterStrategy3() {
+		this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_SECOND);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testFullOuterStrategy4() {
+		this.testFullOuterStrategies(JoinHint.BROADCAST_HASH_SECOND);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testFullOuterStrategy5() {
+		this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_FIRST);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testFullOuterStrategy6() {
+		this.testFullOuterStrategies(JoinHint.BROADCAST_HASH_FIRST);
+	}
+
+
+	private void testFullOuterStrategies(JoinHint hint) {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.fullOuterJoin(ds2, hint)
+				.where(0).equalTo(4)
+				.with(new DummyJoin());
+	}
+
+	
+	/*
+	 * ####################################################################
+	 */
+
+	@SuppressWarnings("serial")
+	public static class DummyJoin implements
+			JoinFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>, Long> {
+
+		@Override
+		public Long join(Tuple5<Integer, Long, String, Long, Integer> v1, Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+			return 1L;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static class IntKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Integer> {
+
+		@Override
+		public Integer getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
+			return v.f0;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static class LongKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Long> {
+
+		@Override
+		public Long getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
+			return v.f1;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
new file mode 100644
index 0000000..06b0c13
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LeftOuterJoinOperatorTest {
+
+	// TUPLE DATA
+	private static final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+		new ArrayList<>();
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+		TupleTypeInfo<>(
+		BasicTypeInfo.INT_TYPE_INFO,
+		BasicTypeInfo.LONG_TYPE_INFO,
+		BasicTypeInfo.STRING_TYPE_INFO,
+		BasicTypeInfo.LONG_TYPE_INFO,
+		BasicTypeInfo.INT_TYPE_INFO
+	);
+
+	@Test
+	public void testLeftOuter1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.leftOuterJoin(ds2)
+			.where(0).equalTo(4)
+			.with(new DummyJoin());
+	}
+
+	@Test
+	public void testLeftOuter2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.leftOuterJoin(ds2)
+				.where("f1").equalTo("f3")
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testLeftOuter3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.leftOuterJoin(ds2)
+				.where(new IntKeySelector()).equalTo(new IntKeySelector())
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testLeftOuter4() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.leftOuterJoin(ds2)
+				.where(0).equalTo(new IntKeySelector())
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testLeftOuter5() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.leftOuterJoin(ds2)
+				.where(new IntKeySelector()).equalTo("f4")
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testLeftOuter6() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.leftOuterJoin(ds2)
+				.where("f0").equalTo(4)
+				.with(new DummyJoin());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testLeftOuter7() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// invalid key position
+		ds1.leftOuterJoin(ds2)
+				.where(5).equalTo(0)
+				.with(new DummyJoin());
+	}
+
+	@Test(expected = CompositeType.InvalidFieldReferenceException.class)
+	public void testLeftOuter8() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// invalid key reference
+		ds1.leftOuterJoin(ds2)
+				.where(1).equalTo("f5")
+				.with(new DummyJoin());
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testLeftOuter9() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// key types do not match
+		ds1.leftOuterJoin(ds2)
+				.where(0).equalTo(1)
+				.with(new DummyJoin());
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testLeftOuter10() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// key types do not match
+		ds1.leftOuterJoin(ds2)
+				.where(new IntKeySelector()).equalTo(new LongKeySelector())
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testLeftOuterStrategy1() {
+		this.testLeftOuterStrategies(JoinHint.OPTIMIZER_CHOOSES);
+	}
+
+	@Test
+	public void testLeftOuterStrategy2() {
+		this.testLeftOuterStrategies(JoinHint.REPARTITION_SORT_MERGE);
+	}
+
+	@Test
+	public void testLeftOuterStrategy3() {
+		this.testLeftOuterStrategies(JoinHint.REPARTITION_HASH_SECOND);
+	}
+
+	@Test
+	public void testLeftOuterStrategy4() {
+		this.testLeftOuterStrategies(JoinHint.BROADCAST_HASH_SECOND);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testLeftOuterStrategy5() {
+		this.testLeftOuterStrategies(JoinHint.REPARTITION_HASH_FIRST);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testLeftOuterStrategy6() {
+		this.testLeftOuterStrategies(JoinHint.BROADCAST_HASH_FIRST);
+	}
+
+
+	private void testLeftOuterStrategies(JoinHint hint) {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.leftOuterJoin(ds2, hint)
+				.where(0).equalTo(4)
+				.with(new DummyJoin());
+	}
+
+	
+	/*
+	 * ####################################################################
+	 */
+
+	@SuppressWarnings("serial")
+	public static class DummyJoin implements
+			JoinFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>, Long> {
+
+		@Override
+		public Long join(Tuple5<Integer, Long, String, Long, Integer> v1, Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+			return 1L;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static class IntKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Integer> {
+
+		@Override
+		public Integer getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
+			return v.f0;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static class LongKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Long> {
+
+		@Override
+		public Long getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
+			return v.f1;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
new file mode 100644
index 0000000..0e407ca
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RightOuterJoinOperatorTest {
+
+	// TUPLE DATA
+	private static final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+		new ArrayList<>();
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+		TupleTypeInfo<>(
+		BasicTypeInfo.INT_TYPE_INFO,
+		BasicTypeInfo.LONG_TYPE_INFO,
+		BasicTypeInfo.STRING_TYPE_INFO,
+		BasicTypeInfo.LONG_TYPE_INFO,
+		BasicTypeInfo.INT_TYPE_INFO
+	);
+
+	@Test
+	public void testRightOuter1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.rightOuterJoin(ds2)
+			.where(0).equalTo(4)
+			.with(new DummyJoin());
+	}
+
+	@Test
+	public void testRightOuter2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.rightOuterJoin(ds2)
+				.where("f1").equalTo("f3")
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testRightOuter3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.rightOuterJoin(ds2)
+				.where(new IntKeySelector()).equalTo(new IntKeySelector())
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testRightOuter4() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.rightOuterJoin(ds2)
+				.where(0).equalTo(new IntKeySelector())
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testRightOuter5() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.rightOuterJoin(ds2)
+				.where(new IntKeySelector()).equalTo("f4")
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testRightOuter6() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.rightOuterJoin(ds2)
+				.where("f0").equalTo(4)
+				.with(new DummyJoin());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testRightOuter7() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// invalid key position
+		ds1.rightOuterJoin(ds2)
+				.where(5).equalTo(0)
+				.with(new DummyJoin());
+	}
+
+	@Test(expected = CompositeType.InvalidFieldReferenceException.class)
+	public void testRightOuter8() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// invalid key reference
+		ds1.rightOuterJoin(ds2)
+				.where(1).equalTo("f5")
+				.with(new DummyJoin());
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testRightOuter9() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// key types do not match
+		ds1.rightOuterJoin(ds2)
+				.where(0).equalTo(1)
+				.with(new DummyJoin());
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testRightOuter10() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// key types do not match
+		ds1.rightOuterJoin(ds2)
+				.where(new IntKeySelector()).equalTo(new LongKeySelector())
+				.with(new DummyJoin());
+	}
+
+	@Test
+	public void testRightOuterStrategy1() {
+		this.testRightOuterStrategies(JoinHint.OPTIMIZER_CHOOSES);
+	}
+
+	@Test
+	public void testRightOuterStrategy2() {
+		this.testRightOuterStrategies(JoinHint.REPARTITION_SORT_MERGE);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testRightOuterStrategy3() {
+		this.testRightOuterStrategies(JoinHint.REPARTITION_HASH_SECOND);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testRightOuterStrategy4() {
+		this.testRightOuterStrategies(JoinHint.BROADCAST_HASH_SECOND);
+	}
+
+	@Test
+	public void testRightOuterStrategy5() {
+		this.testRightOuterStrategies(JoinHint.REPARTITION_HASH_FIRST);
+	}
+
+	@Test
+	public void testRightOuterStrategy6() {
+		this.testRightOuterStrategies(JoinHint.BROADCAST_HASH_FIRST);
+	}
+
+
+	private void testRightOuterStrategies(JoinHint hint) {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		ds1.rightOuterJoin(ds2, hint)
+				.where(0).equalTo(4)
+				.with(new DummyJoin());
+	}
+
+	
+	/*
+	 * ####################################################################
+	 */
+
+	@SuppressWarnings("serial")
+	public static class DummyJoin implements
+			JoinFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>, Long> {
+
+		@Override
+		public Long join(Tuple5<Integer, Long, String, Long, Integer> v1, Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+			return 1L;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static class IntKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Integer> {
+
+		@Override
+		public Integer getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
+			return v.f0;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static class LongKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Long> {
+
+		@Override
+		public Long getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
+			return v.f1;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
index 553c127..5ff9eaf 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
@@ -202,9 +202,11 @@ public abstract class CostEstimator {
 			addLocalMergeCost(firstInput, secondInput, driverCosts, costWeight);
 			break;
 		case HYBRIDHASH_BUILD_FIRST:
+		case RIGHT_HYBRIDHASH_BUILD_FIRST:
 			addHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
 			break;
 		case HYBRIDHASH_BUILD_SECOND:
+		case LEFT_HYBRIDHASH_BUILD_SECOND:
 			addHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
 			break;
 		case HYBRIDHASH_BUILD_FIRST_CACHED:

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
index ebdfcc8..0784de3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoi
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
+import org.apache.flink.optimizer.operators.HashLeftOuterJoinBuildSecondDescriptor;
+import org.apache.flink.optimizer.operators.HashRightOuterJoinBuildFirstDescriptor;
 import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
 import org.apache.flink.optimizer.operators.SortMergeFullOuterJoinDescriptor;
 import org.apache.flink.optimizer.operators.SortMergeLeftOuterJoinDescriptor;
@@ -56,20 +58,19 @@ public class OuterJoinNode extends TwoInputNode {
 		JoinHint joinHint = operator.getJoinHint();
 		joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;
 
-		List<OperatorDescriptorDual> list = new ArrayList<>();
-		switch (joinHint) {
-			case OPTIMIZER_CHOOSES:
-				list.add(getSortMergeDescriptor(type, true));
+		List<OperatorDescriptorDual> list;
+		switch (type) {
+			case LEFT:
+				list = createLeftOuterJoinDescriptors(joinHint);
 				break;
-			case REPARTITION_SORT_MERGE:
-				list.add(getSortMergeDescriptor(type, false));
+			case RIGHT:
+				list = createRightOuterJoinDescriptors(joinHint);
+				break;
+			case FULL:
+				list = createFullOuterJoinDescriptors(joinHint);
 				break;
-			case REPARTITION_HASH_FIRST:
-			case REPARTITION_HASH_SECOND:
-			case BROADCAST_HASH_FIRST:
-			case BROADCAST_HASH_SECOND:
 			default:
-				throw new CompilerException("Invalid join hint: " + joinHint + " for outer join type: " + type);
+				throw new CompilerException("Unknown outer join type: " + type);
 		}
 
 		Partitioner<?> customPartitioner = operator.getCustomPartitioner();
@@ -81,14 +82,74 @@ public class OuterJoinNode extends TwoInputNode {
 		return list;
 	}
 
-	private OperatorDescriptorDual getSortMergeDescriptor(OuterJoinType type, boolean broadcastAllowed) {
-		if (type == OuterJoinType.FULL) {
-			return new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2);
-		} else if (type == OuterJoinType.LEFT) {
-			return new SortMergeLeftOuterJoinDescriptor(this.keys1, this.keys2, broadcastAllowed);
-		} else {
-			return new SortMergeRightOuterJoinDescriptor(this.keys1, this.keys2, broadcastAllowed);
+	private List<OperatorDescriptorDual> createLeftOuterJoinDescriptors(JoinHint hint) {
+
+		List<OperatorDescriptorDual> list = new ArrayList<>();
+		switch (hint) {
+			case OPTIMIZER_CHOOSES:
+				list.add(new SortMergeLeftOuterJoinDescriptor(this.keys1, this.keys2, true));
+				list.add(new HashLeftOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, true, true));
+				break;
+			case REPARTITION_SORT_MERGE:
+				list.add(new SortMergeLeftOuterJoinDescriptor(this.keys1, this.keys2, false));
+				break;
+			case REPARTITION_HASH_SECOND:
+				list.add(new HashLeftOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, false, true));
+				break;
+			case BROADCAST_HASH_SECOND:
+				list.add(new HashLeftOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, true, false));
+				break;
+			case BROADCAST_HASH_FIRST:
+			case REPARTITION_HASH_FIRST:
+			default:
+				throw new CompilerException("Invalid join hint: " + hint + " for left outer join");
 		}
+		return list;
+	}
+
+	private List<OperatorDescriptorDual> createRightOuterJoinDescriptors(JoinHint hint) {
+
+		List<OperatorDescriptorDual> list = new ArrayList<>();
+		switch (hint) {
+			case OPTIMIZER_CHOOSES:
+				list.add(new SortMergeRightOuterJoinDescriptor(this.keys1, this.keys2, true));
+				list.add(new HashRightOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, true, true));
+				break;
+			case REPARTITION_SORT_MERGE:
+				list.add(new SortMergeRightOuterJoinDescriptor(this.keys1, this.keys2, false));
+				break;
+			case REPARTITION_HASH_FIRST:
+				list.add(new HashRightOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, false, true));
+				break;
+			case BROADCAST_HASH_FIRST:
+				list.add(new HashRightOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, true, false));
+				break;
+			case BROADCAST_HASH_SECOND:
+			case REPARTITION_HASH_SECOND:
+			default:
+				throw new CompilerException("Invalid join hint: " + hint + " for right outer join");
+		}
+		return list;
+	}
+
+	private List<OperatorDescriptorDual> createFullOuterJoinDescriptors(JoinHint hint) {
+
+		List<OperatorDescriptorDual> list = new ArrayList<>();
+		switch (hint) {
+			case OPTIMIZER_CHOOSES:
+				list.add(new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2));
+				break;
+			case REPARTITION_SORT_MERGE:
+				list.add(new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2));
+				break;
+			case REPARTITION_HASH_SECOND:
+			case BROADCAST_HASH_SECOND:
+			case BROADCAST_HASH_FIRST:
+			case REPARTITION_HASH_FIRST:
+			default:
+				throw new CompilerException("Invalid join hint: " + hint + " for full outer join");
+		}
+		return list;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java
new file mode 100644
index 0000000..8ed7969
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+public class HashLeftOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor {
+
+	public HashLeftOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2,
+													boolean broadcastSecondAllowed, boolean repartitionAllowed) {
+		super(keys1, keys2, false, broadcastSecondAllowed, repartitionAllowed);
+	}
+
+	@Override
+	public DriverStrategy getStrategy() {
+		return DriverStrategy.LEFT_HYBRIDHASH_BUILD_SECOND;
+	}
+
+	@Override
+	protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+		// all properties are possible
+		return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
+	}
+	
+	@Override
+	public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
+			LocalProperties produced1, LocalProperties produced2) {
+		return true;
+	}
+
+	@Override
+	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+
+		String nodeName = "LeftOuterJoin("+node.getOperator().getName()+")";
+		return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
+	}
+	
+	@Override
+	public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
+		return new LocalProperties();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java
new file mode 100644
index 0000000..5ddba1c
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+public class HashRightOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor {
+
+	public HashRightOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2,
+													boolean broadcastFirstAllowed, boolean repartitionAllowed) {
+		super(keys1, keys2, broadcastFirstAllowed, false, repartitionAllowed);
+	}
+
+	@Override
+	public DriverStrategy getStrategy() {
+		return DriverStrategy.RIGHT_HYBRIDHASH_BUILD_FIRST;
+	}
+
+	@Override
+	protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+		// all properties are possible
+		return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
+	}
+	
+	@Override
+	public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
+			LocalProperties produced1, LocalProperties produced2) {
+		return true;
+	}
+
+	@Override
+	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+
+		String nodeName = "RightOuterJoin("+node.getOperator().getName()+")";
+		return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
+	}
+	
+	@Override
+	public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
+		return new LocalProperties();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
index 8f72754..9ac2ed6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -25,10 +25,10 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
@@ -85,7 +85,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 		if (objectReuseEnabled) {
 			if (buildSideIndex == 0 && probeSideIndex == 1) {
 
-				matchIterator = new ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
+				matchIterator = new ReusingBuildFirstReOpenableHashJoinIterator<IT1, IT2, OT>(
 						input1, input2,
 						serializer1, comparator1,
 						serializer2, comparator2,
@@ -94,12 +94,13 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 						this.taskContext.getIOManager(),
 						this.taskContext.getOwningNepheleTask(),
 						availableMemory,
+						false,
 						hashJoinUseBitMaps);
 
 
 			} else if (buildSideIndex == 1 && probeSideIndex == 0) {
 
-				matchIterator = new ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
+				matchIterator = new ReusingBuildSecondReOpenableHashJoinIterator<IT1, IT2, OT>(
 						input1, input2,
 						serializer1, comparator1,
 						serializer2, comparator2,
@@ -108,6 +109,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 						this.taskContext.getIOManager(),
 						this.taskContext.getOwningNepheleTask(),
 						availableMemory,
+						false,
 						hashJoinUseBitMaps);
 
 			} else {
@@ -116,7 +118,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 		} else {
 			if (buildSideIndex == 0 && probeSideIndex == 1) {
 
-				matchIterator = new NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
+				matchIterator = new NonReusingBuildFirstReOpenableHashJoinIterator<IT1, IT2, OT>(
 						input1, input2,
 						serializer1, comparator1,
 						serializer2, comparator2,
@@ -125,12 +127,13 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 						this.taskContext.getIOManager(),
 						this.taskContext.getOwningNepheleTask(),
 						availableMemory,
+						false,
 						hashJoinUseBitMaps);
 
 
 			} else if (buildSideIndex == 1 && probeSideIndex == 0) {
 
-				matchIterator = new NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
+				matchIterator = new NonReusingBuildSecondReOpenableHashJoinIterator<IT1, IT2, OT>(
 						input1, input2,
 						serializer1, comparator1,
 						serializer2, comparator2,
@@ -139,6 +142,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 						this.taskContext.getIOManager(),
 						this.taskContext.getOwningNepheleTask(),
 						availableMemory,
+						false,
 						hashJoinUseBitMaps);
 
 			} else {
@@ -173,20 +177,20 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 
 		if (objectReuseEnabled) {
 			if (buildSideIndex == 0 && probeSideIndex == 1) {
-				final ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+				final ReusingBuildFirstReOpenableHashJoinIterator<IT1, IT2, OT> matchIterator = (ReusingBuildFirstReOpenableHashJoinIterator<IT1, IT2, OT>) this.matchIterator;
 
 				matchIterator.reopenProbe(input2);
 			} else {
-				final ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+				final ReusingBuildSecondReOpenableHashJoinIterator<IT1, IT2, OT> matchIterator = (ReusingBuildSecondReOpenableHashJoinIterator<IT1, IT2, OT>) this.matchIterator;
 				matchIterator.reopenProbe(input1);
 			}
 		} else {
 			if (buildSideIndex == 0 && probeSideIndex == 1) {
-				final NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+				final NonReusingBuildFirstReOpenableHashJoinIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildFirstReOpenableHashJoinIterator<IT1, IT2, OT>) this.matchIterator;
 
 				matchIterator.reopenProbe(input2);
 			} else {
-				final NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+				final NonReusingBuildSecondReOpenableHashJoinIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildSecondReOpenableHashJoinIterator<IT1, IT2, OT>) this.matchIterator;
 				matchIterator.reopenProbe(input1);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
index 8c964d4..2589ca5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
@@ -81,8 +81,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
 		final IOManager ioManager = this.taskContext.getIOManager();
 		
 		// set up memory and I/O parameters
-		final double fractionAvailableMemory = config.getRelativeMemoryDriver();
-		final int numPages = memoryManager.computeNumberOfPages(fractionAvailableMemory);
+		final double driverMemFraction = config.getRelativeMemoryDriver();
 		
 		final DriverStrategy ls = config.getDriverStrategy();
 		
@@ -121,7 +120,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
 					pairComparatorFactory,
 					memoryManager,
 					ioManager,
-					numPages
+					driverMemFraction
 			);
 		} else {
 			this.outerJoinIterator = getNonReusingOuterJoinIterator(
@@ -135,7 +134,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
 					pairComparatorFactory,
 					memoryManager,
 					ioManager,
-					numPages
+					driverMemFraction
 			);
 		}
 		
@@ -183,7 +182,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
 			TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
 			MemoryManager memoryManager,
 			IOManager ioManager,
-			int numPages
+			double driverMemFraction
 	) throws Exception;
 	
 	protected abstract JoinTaskIterator<IT1, IT2, OT> getNonReusingOuterJoinIterator(
@@ -197,6 +196,6 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
 			TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
 			MemoryManager memoryManager,
 			IOManager ioManager,
-			int numPages
+			double driverMemFraction
 	) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index bc7bee5..b069f12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -73,11 +73,8 @@ public enum DriverStrategy {
 
 	// both inputs are merged, but materialized to the side for block-nested-loop-join among values with equal key
 	INNER_MERGE(JoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
-
 	LEFT_OUTER_MERGE(LeftOuterJoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
-
 	RIGHT_OUTER_MERGE(RightOuterJoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
-
 	FULL_OUTER_MERGE(FullOuterJoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
 
 	// co-grouping inputs
@@ -94,6 +91,11 @@ public enum DriverStrategy {
 	HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2),
 	//  cached variant of HYBRIDHASH_BUILD_SECOND, that can only be used inside of iterations
 	HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2),
+
+	// right outer join, the first input is build side, the second side is probe side of a hybrid hash table
+	RIGHT_HYBRIDHASH_BUILD_FIRST(RightOuterJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2),
+	// left outer join, the second input is build side, the first side is probe side of a hybrid hash table
+	LEFT_HYBRIDHASH_BUILD_SECOND(LeftOuterJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2),
 	
 	// the second input is inner loop, the first input is outer loop and block-wise processed
 	NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, 0),

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
index d942b72..2c01fec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
@@ -47,10 +47,11 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 			TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
 			MemoryManager memoryManager,
 			IOManager ioManager,
-			int numPages
+			double driverMemFraction
 	) throws Exception {
 		switch (driverStrategy) {
 			case FULL_OUTER_MERGE:
+				int numPages = memoryManager.computeNumberOfPages(driverMemFraction);
 				return new ReusingMergeOuterJoinIterator<>(
 						OuterJoinType.FULL,
 						in1,
@@ -82,10 +83,11 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 			TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
 			MemoryManager memoryManager,
 			IOManager ioManager,
-			int numPages
+			double driverMemFraction
 	) throws Exception {
 		switch (driverStrategy) {
 			case FULL_OUTER_MERGE:
+				int numPages = memoryManager.computeNumberOfPages(driverMemFraction);
 				return new NonReusingMergeOuterJoinIterator<>(
 						OuterJoinType.FULL,
 						in1,

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
index 7a9c8e6..c55843a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -26,10 +26,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator;
 import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
@@ -133,23 +133,25 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
 							memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
 					break;
 				case HYBRIDHASH_BUILD_FIRST:
-					this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2,
+					this.joinIterator = new ReusingBuildFirstHashJoinIterator<>(in1, in2,
 							serializer1, comparator1,
 							serializer2, comparator2,
 							pairComparatorFactory.createComparator21(comparator1, comparator2),
 							memoryManager, ioManager,
 							this.taskContext.getOwningNepheleTask(),
 							fractionAvailableMemory,
+							false,
 							hashJoinUseBitMaps);
 					break;
 				case HYBRIDHASH_BUILD_SECOND:
-					this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2,
+					this.joinIterator = new ReusingBuildSecondHashJoinIterator<>(in1, in2,
 							serializer1, comparator1,
 							serializer2, comparator2,
 							pairComparatorFactory.createComparator12(comparator1, comparator2),
 							memoryManager, ioManager,
 							this.taskContext.getOwningNepheleTask(),
 							fractionAvailableMemory,
+							false,
 							hashJoinUseBitMaps);
 					break;
 				default:
@@ -166,23 +168,25 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
 
 					break;
 				case HYBRIDHASH_BUILD_FIRST:
-					this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2,
+					this.joinIterator = new NonReusingBuildFirstHashJoinIterator<>(in1, in2,
 							serializer1, comparator1,
 							serializer2, comparator2,
 							pairComparatorFactory.createComparator21(comparator1, comparator2),
 							memoryManager, ioManager,
 							this.taskContext.getOwningNepheleTask(),
 							fractionAvailableMemory,
+							false,
 							hashJoinUseBitMaps);
 					break;
 				case HYBRIDHASH_BUILD_SECOND:
-					this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2,
+					this.joinIterator = new NonReusingBuildSecondHashJoinIterator<>(in1, in2,
 							serializer1, comparator1,
 							serializer2, comparator2,
 							pairComparatorFactory.createComparator12(comparator1, comparator2),
 							memoryManager, ioManager,
 							this.taskContext.getOwningNepheleTask(),
 							fractionAvailableMemory,
+							false,
 							hashJoinUseBitMaps);
 					break;
 				default:

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
index ae05d1e..49d3648 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator;
 import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
@@ -47,10 +49,11 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 			TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
 			MemoryManager memoryManager,
 			IOManager ioManager,
-			int numPages
+			double driverMemFraction
 	) throws Exception {
 		switch (driverStrategy) {
 			case LEFT_OUTER_MERGE:
+				int numPages = memoryManager.computeNumberOfPages(driverMemFraction);
 				return new ReusingMergeOuterJoinIterator<>(
 						OuterJoinType.LEFT,
 						in1,
@@ -65,6 +68,16 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 						numPages,
 						super.taskContext.getOwningNepheleTask()
 				);
+			case LEFT_HYBRIDHASH_BUILD_SECOND:
+				return new ReusingBuildSecondHashJoinIterator<>(in1, in2,
+						serializer1, comparator1,
+						serializer2, comparator2,
+						pairComparatorFactory.createComparator12(comparator1, comparator2),
+						memoryManager, ioManager,
+						this.taskContext.getOwningNepheleTask(),
+						driverMemFraction,
+						true,
+						false);
 			default:
 				throw new Exception("Unsupported driver strategy for left outer join driver: " + driverStrategy.name());
 		}
@@ -82,10 +95,11 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 			TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
 			MemoryManager memoryManager,
 			IOManager ioManager,
-			int numPages
+			double driverMemFraction
 	) throws Exception {
 		switch (driverStrategy) {
 			case LEFT_OUTER_MERGE:
+				int numPages = memoryManager.computeNumberOfPages(driverMemFraction);
 				return new NonReusingMergeOuterJoinIterator<>(
 						OuterJoinType.LEFT,
 						in1,
@@ -100,6 +114,16 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 						numPages,
 						super.taskContext.getOwningNepheleTask()
 				);
+			case LEFT_HYBRIDHASH_BUILD_SECOND:
+				return new NonReusingBuildSecondHashJoinIterator<>(in1, in2,
+						serializer1, comparator1,
+						serializer2, comparator2,
+						pairComparatorFactory.createComparator12(comparator1, comparator2),
+						memoryManager, ioManager,
+						this.taskContext.getOwningNepheleTask(),
+						driverMemFraction,
+						true,
+						false);
 			default:
 				throw new Exception("Unsupported driver strategy for left outer join driver: " + driverStrategy.name());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
index 6fc8abd..1b67397 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator;
 import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
@@ -47,10 +49,11 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
 			TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
 			MemoryManager memoryManager,
 			IOManager ioManager,
-			int numPages
+			double driverMemFraction
 	) throws Exception {
 		switch (driverStrategy) {
 			case RIGHT_OUTER_MERGE:
+				int numPages = memoryManager.computeNumberOfPages(driverMemFraction);
 				return new ReusingMergeOuterJoinIterator<>(
 						OuterJoinType.RIGHT,
 						in1,
@@ -65,6 +68,16 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
 						numPages,
 						super.taskContext.getOwningNepheleTask()
 				);
+			case RIGHT_HYBRIDHASH_BUILD_FIRST:
+				return new ReusingBuildFirstHashJoinIterator<>(in1, in2,
+						serializer1, comparator1,
+						serializer2, comparator2,
+						pairComparatorFactory.createComparator21(comparator1, comparator2),
+						memoryManager, ioManager,
+						this.taskContext.getOwningNepheleTask(),
+						driverMemFraction,
+						true,
+						false);
 			default:
 				throw new Exception("Unsupported driver strategy for right outer join driver: " + driverStrategy.name());
 		}
@@ -82,10 +95,11 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
 			TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
 			MemoryManager memoryManager,
 			IOManager ioManager,
-			int numPages
+			double driverMemFraction
 	) throws Exception {
 		switch (driverStrategy) {
 			case RIGHT_OUTER_MERGE:
+				int numPages = memoryManager.computeNumberOfPages(driverMemFraction);
 				return new NonReusingMergeOuterJoinIterator<>(
 						OuterJoinType.RIGHT,
 						in1,
@@ -100,6 +114,16 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
 						numPages,
 						super.taskContext.getOwningNepheleTask()
 				);
+			case RIGHT_HYBRIDHASH_BUILD_FIRST:
+				return new NonReusingBuildFirstHashJoinIterator<>(in1, in2,
+						serializer1, comparator1,
+						serializer2, comparator2,
+						pairComparatorFactory.createComparator21(comparator1, comparator2),
+						memoryManager, ioManager,
+						this.taskContext.getOwningNepheleTask(),
+						driverMemFraction,
+						true,
+						false);
 			default:
 				throw new Exception("Unsupported driver strategy for right outer join driver: " + driverStrategy.name());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashJoinIteratorBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashJoinIteratorBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashJoinIteratorBase.java
new file mode 100644
index 0000000..30b1df2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashJoinIteratorBase.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+
+import java.util.List;
+
+/**
+ * Common methods for all Hash Join Iterators.
+ */
+public class HashJoinIteratorBase {
+	
+	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+			TypeSerializer<BT> buildSideSerializer,
+			TypeComparator<BT> buildSideComparator,
+			TypeSerializer<PT> probeSideSerializer,
+			TypeComparator<PT> probeSideComparator,
+			TypePairComparator<PT, BT> pairComparator,
+			MemoryManager memManager,
+			IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean useBloomFilters) throws MemoryAllocationException {
+
+		final int numPages = memManager.computeNumberOfPages(memoryFraction);
+		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
+		
+		return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager,
+				useBloomFilters);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
deleted file mode 100644
index 3b12c68..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.operators.hash;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-
-import java.util.List;
-
-/**
- * Common methods for all Hash Join Iterators.
- */
-public class HashMatchIteratorBase {
-	
-	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
-			TypeSerializer<BT> buildSideSerializer,
-			TypeComparator<BT> buildSideComparator,
-			TypeSerializer<PT> probeSideSerializer,
-			TypeComparator<PT> probeSideComparator,
-			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager,
-			IOManager ioManager,
-			AbstractInvokable ownerTask,
-			double memoryFraction,
-			boolean useBloomFilters) throws MemoryAllocationException {
-
-		final int numPages = memManager.computeNumberOfPages(memoryFraction);
-		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		
-		return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
-				buildSideComparator, probeSideComparator, pairComparator,
-				memorySegments, ioManager,
-				useBloomFilters);
-	}
-}