You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/19 18:01:14 UTC

[08/10] flink git commit: [FLINK-2107] Add hash-based strategies for left and right outer joins.

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/AbstractOuterJoinTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/AbstractOuterJoinTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/AbstractOuterJoinTaskExternalITCase.java
index 7c8d04e..f4fa036 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/AbstractOuterJoinTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/AbstractOuterJoinTaskExternalITCase.java
@@ -37,9 +37,9 @@ import org.junit.Test;
 
 public abstract class AbstractOuterJoinTaskExternalITCase extends BinaryOperatorTestBase<FlatJoinFunction<Tuple2<Integer, Integer>,
 		Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
-	
-	private static final long HASH_MEM = 4 * 1024 * 1024;
-	
+
+	protected static final long HASH_MEM = 4 * 1024 * 1024;
+
 	private static final long SORT_MEM = 3 * 1024 * 1024;
 	
 	private static final long BNLJN_MEM = 10 * PAGE_SIZE;
@@ -47,33 +47,30 @@ public abstract class AbstractOuterJoinTaskExternalITCase extends BinaryOperator
 	private final double bnljn_frac;
 	
 	@SuppressWarnings("unchecked")
-	private final TypeComparator<Tuple2<Integer, Integer>> comparator1 = new TupleComparator<>(
+	protected final TypeComparator<Tuple2<Integer, Integer>> comparator1 = new TupleComparator<>(
 			new int[]{0},
 			new TypeComparator<?>[]{new IntComparator(true)},
 			new TypeSerializer<?>[]{IntSerializer.INSTANCE}
 	);
 	
 	@SuppressWarnings("unchecked")
-	private final TypeComparator<Tuple2<Integer, Integer>> comparator2 = new TupleComparator<>(
+	protected final TypeComparator<Tuple2<Integer, Integer>> comparator2 = new TupleComparator<>(
 			new int[]{0},
 			new TypeComparator<?>[]{new IntComparator(true)},
 			new TypeSerializer<?>[]{IntSerializer.INSTANCE}
 	);
 	
 	@SuppressWarnings("unchecked")
-	private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<>(
+	protected final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<>(
 			(Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class,
 			new TypeSerializer<?>[]{IntSerializer.INSTANCE, IntSerializer.INSTANCE}
 	);
 	
-	private final CountingOutputCollector<Tuple2<Integer, Integer>> output = new CountingOutputCollector<>();
-	
-	private final DriverStrategy driverStrategy;
+	protected final CountingOutputCollector<Tuple2<Integer, Integer>> output = new CountingOutputCollector<>();
 	
-	public AbstractOuterJoinTaskExternalITCase(ExecutionConfig config, DriverStrategy driverStrategy) {
+	public AbstractOuterJoinTaskExternalITCase(ExecutionConfig config) {
 		super(config, HASH_MEM, 2, SORT_MEM);
 		bnljn_frac = (double) BNLJN_MEM / this.getMemoryManager().getMemorySize();
-		this.driverStrategy = driverStrategy;
 	}
 	
 	@Test
@@ -90,7 +87,7 @@ public abstract class AbstractOuterJoinTaskExternalITCase extends BinaryOperator
 		addDriverComparator(this.comparator1);
 		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
-		getTaskConfig().setDriverStrategy(driverStrategy);
+		getTaskConfig().setDriverStrategy(this.getSortStrategy());
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
@@ -106,6 +103,8 @@ public abstract class AbstractOuterJoinTaskExternalITCase extends BinaryOperator
 	protected abstract int calculateExpectedCount(int keyCnt1, int valCnt1, int keyCnt2, int valCnt2);
 	
 	protected abstract AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> getOuterJoinDriver();
+
+	protected abstract DriverStrategy getSortStrategy();
 	
 	// =================================================================================================
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/AbstractOuterJoinTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/AbstractOuterJoinTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/AbstractOuterJoinTaskTest.java
index ad784b5..b265eae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/AbstractOuterJoinTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/AbstractOuterJoinTaskTest.java
@@ -60,35 +60,32 @@ public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<F
 	
 	private final double bnljn_frac;
 	
-	private final DriverStrategy driverStrategy;
-	
 	@SuppressWarnings("unchecked")
-	private final TypeComparator<Tuple2<Integer, Integer>> comparator1 = new TupleComparator<>(
+	protected final TypeComparator<Tuple2<Integer, Integer>> comparator1 = new TupleComparator<>(
 			new int[]{0},
 			new TypeComparator<?>[]{new IntComparator(true)},
 			new TypeSerializer<?>[]{IntSerializer.INSTANCE}
 	);
 	
 	@SuppressWarnings("unchecked")
-	private final TypeComparator<Tuple2<Integer, Integer>> comparator2 = new TupleComparator<>(
+	protected final TypeComparator<Tuple2<Integer, Integer>> comparator2 = new TupleComparator<>(
 			new int[]{0},
 			new TypeComparator<?>[]{new IntComparator(true)},
 			new TypeSerializer<?>[]{IntSerializer.INSTANCE}
 	);
 	
-	private final List<Tuple2<Integer, Integer>> outList = new ArrayList<>();
+	protected final List<Tuple2<Integer, Integer>> outList = new ArrayList<>();
 	
 	@SuppressWarnings("unchecked")
-	private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<>(
+	protected final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<>(
 			(Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class,
 			new TypeSerializer<?>[]{IntSerializer.INSTANCE, IntSerializer.INSTANCE}
 	);
 	
 	
-	public AbstractOuterJoinTaskTest(ExecutionConfig config, DriverStrategy driverStrategy) {
+	public AbstractOuterJoinTaskTest(ExecutionConfig config) {
 		super(config, HASH_MEM, NUM_SORTER, SORT_MEM);
 		bnljn_frac = (double) BNLJN_MEM / this.getMemoryManager().getMemorySize();
-		this.driverStrategy = driverStrategy;
 	}
 	
 	@Test
@@ -162,7 +159,7 @@ public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<F
 		addDriverComparator(this.comparator1);
 		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
-		getTaskConfig().setDriverStrategy(this.driverStrategy);
+		getTaskConfig().setDriverStrategy(this.getSortDriverStrategy());
 		getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
@@ -191,7 +188,7 @@ public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<F
 		addDriverComparator(this.comparator1);
 		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
-		getTaskConfig().setDriverStrategy(this.driverStrategy);
+		getTaskConfig().setDriverStrategy(this.getSortDriverStrategy());
 		getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
@@ -220,7 +217,7 @@ public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<F
 		addDriverComparator(this.comparator1);
 		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
-		getTaskConfig().setDriverStrategy(this.driverStrategy);
+		getTaskConfig().setDriverStrategy(this.getSortDriverStrategy());
 		getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
@@ -249,7 +246,7 @@ public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<F
 		addDriverComparator(this.comparator1);
 		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
-		getTaskConfig().setDriverStrategy(this.driverStrategy);
+		getTaskConfig().setDriverStrategy(this.getSortDriverStrategy());
 		getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
@@ -266,7 +263,7 @@ public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<F
 		
 		this.outList.clear();
 	}
-	
+
 	@Test(expected = ExpectedTestException.class)
 	public void testFailingOuterJoinTask() throws Exception {
 		int keyCnt1 = 20;
@@ -279,7 +276,7 @@ public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<F
 		addDriverComparator(this.comparator1);
 		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
-		getTaskConfig().setDriverStrategy(this.driverStrategy);
+		getTaskConfig().setDriverStrategy(this.getSortDriverStrategy());
 		getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
@@ -297,7 +294,7 @@ public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<F
 		addDriverComparator(this.comparator1);
 		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
-		getTaskConfig().setDriverStrategy(this.driverStrategy);
+		getTaskConfig().setDriverStrategy(this.getSortDriverStrategy());
 		getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
@@ -324,7 +321,7 @@ public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<F
 		
 		cancel();
 		taskRunner.interrupt();
-		
+
 		taskRunner.join(60000);
 		
 		assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
@@ -341,7 +338,7 @@ public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<F
 		addDriverComparator(this.comparator1);
 		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
-		getTaskConfig().setDriverStrategy(this.driverStrategy);
+		getTaskConfig().setDriverStrategy(this.getSortDriverStrategy());
 		getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
@@ -385,7 +382,7 @@ public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<F
 		addDriverComparator(this.comparator1);
 		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
-		getTaskConfig().setDriverStrategy(driverStrategy);
+		getTaskConfig().setDriverStrategy(this.getSortDriverStrategy());
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
@@ -426,6 +423,8 @@ public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<F
 	protected abstract AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> getOuterJoinDriver();
 	
 	protected abstract int calculateExpectedCount(int keyCnt1, int valCnt1, int keyCnt2, int valCnt2);
+
+	protected abstract DriverStrategy getSortDriverStrategy();
 	
 	// =================================================================================================
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FullOuterJoinTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FullOuterJoinTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FullOuterJoinTaskExternalITCase.java
index d52f5fb..15881f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FullOuterJoinTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FullOuterJoinTaskExternalITCase.java
@@ -26,7 +26,7 @@ public class FullOuterJoinTaskExternalITCase extends AbstractOuterJoinTaskExtern
 	
 	
 	public FullOuterJoinTaskExternalITCase(ExecutionConfig config) {
-		super(config, DriverStrategy.FULL_OUTER_MERGE);
+		super(config);
 	}
 	
 	@Override
@@ -38,4 +38,9 @@ public class FullOuterJoinTaskExternalITCase extends AbstractOuterJoinTaskExtern
 	protected AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> getOuterJoinDriver() {
 		return new FullOuterJoinDriver<>();
 	}
+
+	@Override
+	protected DriverStrategy getSortStrategy() {
+		return DriverStrategy.FULL_OUTER_MERGE;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FullOuterJoinTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FullOuterJoinTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FullOuterJoinTaskTest.java
index d3296f6..2efedac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FullOuterJoinTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FullOuterJoinTaskTest.java
@@ -26,7 +26,12 @@ public class FullOuterJoinTaskTest extends AbstractOuterJoinTaskTest {
 	
 	
 	public FullOuterJoinTaskTest(ExecutionConfig config) {
-		super(config, DriverStrategy.FULL_OUTER_MERGE);
+		super(config);
+	}
+
+	@Override
+	protected DriverStrategy getSortDriverStrategy() {
+		return DriverStrategy.FULL_OUTER_MERGE;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskExternalITCase.java
index 89d68f2..33552bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskExternalITCase.java
@@ -21,12 +21,18 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
+import org.junit.Assert;
+import org.junit.Test;
 
 public class LeftOuterJoinTaskExternalITCase extends AbstractOuterJoinTaskExternalITCase {
-	
+
+	private final double hash_frac;
 	
 	public LeftOuterJoinTaskExternalITCase(ExecutionConfig config) {
-		super(config, DriverStrategy.LEFT_OUTER_MERGE);
+		super(config);
+		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
 	}
 	
 	@Override
@@ -38,4 +44,36 @@ public class LeftOuterJoinTaskExternalITCase extends AbstractOuterJoinTaskExtern
 	protected AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> getOuterJoinDriver() {
 		return new LeftOuterJoinDriver<>();
 	}
+
+	@Override
+	protected DriverStrategy getSortStrategy() {
+		return DriverStrategy.LEFT_OUTER_MERGE;
+	}
+
+	@Test
+	public void testExternalHashLeftOuterJoinTask() throws Exception {
+
+		final int keyCnt1 = 65536;
+		final int valCnt1 = 8;
+
+		final int keyCnt2 = 32768;
+		final int valCnt2 = 8;
+
+		final int expCnt = calculateExpectedCount(keyCnt1, valCnt1, keyCnt2, valCnt2);
+
+		setOutput(this.output);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
+		getTaskConfig().setDriverStrategy(DriverStrategy.LEFT_HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
+
+		addInputSorted(new UniformIntTupleGenerator(keyCnt1, valCnt1, false), serializer, this.comparator1.duplicate());
+		addInputSorted(new UniformIntTupleGenerator(keyCnt2, valCnt2, false), serializer, this.comparator2.duplicate());
+		testDriver(testTask, MockJoinStub.class);
+
+		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskTest.java
index 9a1ec8f..ad11768 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/LeftOuterJoinTaskTest.java
@@ -19,23 +19,245 @@
 
 package org.apache.flink.runtime.operators;
 
+import com.google.common.base.Throwables;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
+import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
+import org.apache.flink.runtime.operators.testutils.DelayingIterator;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.InfiniteIntTupleIterator;
+import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.Record;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 public class LeftOuterJoinTaskTest extends AbstractOuterJoinTaskTest {
-	
-	
+
+	private static final long HASH_MEM = 6*1024*1024;
+
+	private final double hash_frac;
+
 	public LeftOuterJoinTaskTest(ExecutionConfig config) {
-		super(config, DriverStrategy.LEFT_OUTER_MERGE);
+		super(config);
+		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
 	}
 	
 	@Override
 	protected int calculateExpectedCount(int keyCnt1, int valCnt1, int keyCnt2, int valCnt2) {
 		return valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2) + (keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : 0);
 	}
-	
+
+	@Override
+	protected DriverStrategy getSortDriverStrategy() {
+		return DriverStrategy.LEFT_OUTER_MERGE;
+	}
+
 	@Override
 	protected AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> getOuterJoinDriver() {
 		return new LeftOuterJoinDriver<>();
 	}
+
+	@Test
+	public void testHash1LeftOuterJoinTask() throws Exception {
+		final int keyCnt1 = 20;
+		final int valCnt1 = 1;
+
+		final int keyCnt2 = 10;
+		final int valCnt2 = 2;
+
+		testHashLeftOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
+	}
+
+	@Test
+	public void testHash2LeftOuterJoinTask() throws Exception {
+		final int keyCnt1 = 20;
+		final int valCnt1 = 1;
+
+		final int keyCnt2 = 20;
+		final int valCnt2 = 1;
+
+		testHashLeftOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
+	}
+
+	@Test
+	public void testHash3LeftOuterJoinTask() throws Exception {
+		int keyCnt1 = 20;
+		int valCnt1 = 1;
+
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+
+		testHashLeftOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
+	}
+
+	@Test
+	public void testHash4LeftOuterJoinTask() throws Exception {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+
+		int keyCnt2 = 20;
+		int valCnt2 = 1;
+
+		testHashLeftOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
+	}
+
+	@Test
+	public void testHash5LeftOuterJoinTask() throws Exception {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+
+		testHashLeftOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
+	}
+
+	@Test
+	public void testHash6LeftOuterJoinTask() throws Exception {
+		int keyCnt1 = 10;
+		int valCnt1 = 1;
+
+		int keyCnt2 = 20;
+		int valCnt2 = 2;
+
+		testHashLeftOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
+	}
+
+	private void testHashLeftOuterJoinTask(int keyCnt1, int valCnt1, int keyCnt2, int valCnt2) throws Exception {
+
+		setOutput(this.outList, this.serializer);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
+		getTaskConfig().setDriverStrategy(DriverStrategy.LEFT_HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
+
+		addInput(new UniformIntTupleGenerator(keyCnt1, valCnt1, false), this.serializer);
+		addInput(new UniformIntTupleGenerator(keyCnt2, valCnt2, false), this.serializer);
+		testDriver(testTask, MockJoinStub.class);
+
+		final int expCnt = calculateExpectedCount(keyCnt1, valCnt1, keyCnt2, valCnt2);
+
+		Assert.assertTrue("Result set size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt);
+
+		this.outList.clear();
+	}
+
+	@Test(expected = ExpectedTestException.class)
+	public void testFailingHashLeftOuterJoinTask() throws Exception {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+
+		setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
+		getTaskConfig().setDriverStrategy(DriverStrategy.LEFT_HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
+
+		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
+
+		addInput(new UniformIntTupleGenerator(keyCnt1, valCnt1, true), this.serializer);
+		addInput(new UniformIntTupleGenerator(keyCnt2, valCnt2, true), this.serializer);
+
+		testDriver(testTask, MockFailingJoinStub.class);
+	}
+
+	@Test
+	public void testCancelLeftOuterJoinTaskWhileBuilding() throws Exception {
+		setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
+		getTaskConfig().setDriverStrategy(DriverStrategy.LEFT_HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
+
+		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
+
+		addInput(new UniformIntTupleGenerator(100, 100, true), this.serializer);
+		addInput(new DelayingIterator<>(new InfiniteIntTupleIterator(), 100), this.serializer);
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final Thread taskRunner = new Thread("Task runner for testCancelOuterJoinTaskWhileSort1()") {
+			@Override
+			public void run() {
+				try {
+					testDriver(testTask, MockJoinStub.class);
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		taskRunner.start();
+
+		Thread.sleep(1000);
+
+		cancel();
+		taskRunner.join(60000);
+
+		assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+		final Throwable taskError = error.get();
+		if (taskError != null) {
+			fail("Error in task while canceling:\n" + Throwables.getStackTraceAsString(taskError));
+		}
+	}
+
+	@Test
+	public void testCancelLeftOuterJoinTaskWhileProbing() throws Exception {
+		setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
+		getTaskConfig().setDriverStrategy(DriverStrategy.LEFT_HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
+
+		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
+
+		addInput(new DelayingIterator<>(new InfiniteIntTupleIterator(), 100), this.serializer);
+		addInput(new UniformIntTupleGenerator(1, 1, true), this.serializer);
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final Thread taskRunner = new Thread("Task runner for testCancelOuterJoinTaskWhileSort1()") {
+			@Override
+			public void run() {
+				try {
+					testDriver(testTask, MockJoinStub.class);
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		taskRunner.start();
+
+		Thread.sleep(1000);
+
+		cancel();
+		taskRunner.join(60000);
+
+		assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+		final Throwable taskError = error.get();
+		if (taskError != null) {
+			fail("Error in task while canceling:\n" + Throwables.getStackTraceAsString(taskError));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/RightOuterJoinTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/RightOuterJoinTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/RightOuterJoinTaskExternalITCase.java
index 4e7df4b..ef725ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/RightOuterJoinTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/RightOuterJoinTaskExternalITCase.java
@@ -21,12 +21,18 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
+import org.junit.Assert;
+import org.junit.Test;
 
 public class RightOuterJoinTaskExternalITCase extends AbstractOuterJoinTaskExternalITCase {
-	
-	
+
+	private final double hash_frac;
+
 	public RightOuterJoinTaskExternalITCase(ExecutionConfig config) {
-		super(config, DriverStrategy.RIGHT_OUTER_MERGE);
+		super(config);
+		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
 	}
 	
 	@Override
@@ -38,4 +44,36 @@ public class RightOuterJoinTaskExternalITCase extends AbstractOuterJoinTaskExter
 	protected AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> getOuterJoinDriver() {
 		return new RightOuterJoinDriver<>();
 	}
+
+	@Override
+	protected DriverStrategy getSortStrategy() {
+		return DriverStrategy.RIGHT_OUTER_MERGE;
+	}
+
+	@Test
+	public void testExternalHashRightOuterJoinTask() throws Exception {
+
+		final int keyCnt1 = 32768;
+		final int valCnt1 = 8;
+
+		final int keyCnt2 = 65536;
+		final int valCnt2 = 8;
+
+		final int expCnt = calculateExpectedCount(keyCnt1, valCnt1, keyCnt2, valCnt2);
+
+		setOutput(this.output);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
+		getTaskConfig().setDriverStrategy(DriverStrategy.RIGHT_HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
+
+		addInputSorted(new UniformIntTupleGenerator(keyCnt1, valCnt1, false), serializer, this.comparator1.duplicate());
+		addInputSorted(new UniformIntTupleGenerator(keyCnt2, valCnt2, false), serializer, this.comparator2.duplicate());
+		testDriver(testTask, MockJoinStub.class);
+
+		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/RightOuterJoinTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/RightOuterJoinTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/RightOuterJoinTaskTest.java
index 506e95b..4d41031 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/RightOuterJoinTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/RightOuterJoinTaskTest.java
@@ -19,14 +19,37 @@
 
 package org.apache.flink.runtime.operators;
 
+import com.google.common.base.Throwables;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
+import org.apache.flink.runtime.operators.testutils.DelayingIterator;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.InfiniteIntTupleIterator;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 public class RightOuterJoinTaskTest extends AbstractOuterJoinTaskTest {
-	
+
+	private static final long HASH_MEM = 6*1024*1024;
+
+	private final double hash_frac;
 	
 	public RightOuterJoinTaskTest(ExecutionConfig config) {
-		super(config, DriverStrategy.RIGHT_OUTER_MERGE);
+		super(config);
+		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
+	}
+
+	@Override
+	protected DriverStrategy getSortDriverStrategy() {
+		return DriverStrategy.RIGHT_OUTER_MERGE;
 	}
 	
 	@Override
@@ -38,4 +61,200 @@ public class RightOuterJoinTaskTest extends AbstractOuterJoinTaskTest {
 	protected AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> getOuterJoinDriver() {
 		return new RightOuterJoinDriver<>();
 	}
+
+	@Test
+	public void testHash1RightOuterJoinTask() throws Exception {
+		final int keyCnt1 = 20;
+		final int valCnt1 = 1;
+
+		final int keyCnt2 = 10;
+		final int valCnt2 = 2;
+
+		testHashRightOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
+	}
+
+	@Test
+	public void testHash2RightOuterJoinTask() throws Exception {
+		final int keyCnt1 = 20;
+		final int valCnt1 = 1;
+
+		final int keyCnt2 = 20;
+		final int valCnt2 = 1;
+
+		testHashRightOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
+	}
+
+	@Test
+	public void testHash3RightOuterJoinTask() throws Exception {
+		int keyCnt1 = 20;
+		int valCnt1 = 1;
+
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+
+		testHashRightOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
+	}
+
+	@Test
+	public void testHash4RightOuterJoinTask() throws Exception {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+
+		int keyCnt2 = 20;
+		int valCnt2 = 1;
+
+		testHashRightOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
+	}
+
+	@Test
+	public void testHash5RightOuterJoinTask() throws Exception {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+
+		testHashRightOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
+	}
+
+	@Test
+	public void testHash6RightOuterJoinTask() throws Exception {
+		int keyCnt1 = 10;
+		int valCnt1 = 1;
+
+		int keyCnt2 = 20;
+		int valCnt2 = 2;
+
+		testHashRightOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
+	}
+
+	private void testHashRightOuterJoinTask(int keyCnt1, int valCnt1, int keyCnt2, int valCnt2) throws Exception {
+
+		setOutput(this.outList, this.serializer);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
+		getTaskConfig().setDriverStrategy(DriverStrategy.RIGHT_HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+		setNumFileHandlesForSort(4);
+
+		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
+
+		addInput(new UniformIntTupleGenerator(keyCnt1, valCnt1, false), this.serializer);
+		addInput(new UniformIntTupleGenerator(keyCnt2, valCnt2, false), this.serializer);
+		testDriver(testTask, MockJoinStub.class);
+
+		final int expCnt = calculateExpectedCount(keyCnt1, valCnt1, keyCnt2, valCnt2);
+
+		Assert.assertTrue("Result set size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt);
+
+		this.outList.clear();
+	}
+
+	@Test(expected = ExpectedTestException.class)
+	public void testFailingHashRightOuterJoinTask() throws Exception {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+
+		setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
+		getTaskConfig().setDriverStrategy(DriverStrategy.RIGHT_HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
+		setNumFileHandlesForSort(4);
+
+		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
+
+		addInput(new UniformIntTupleGenerator(keyCnt1, valCnt1, true), this.serializer);
+		addInput(new UniformIntTupleGenerator(keyCnt2, valCnt2, true), this.serializer);
+
+		testDriver(testTask, MockFailingJoinStub.class);
+	}
+
+	@Test
+	public void testCancelRightOuterJoinTaskWhileBuilding() throws Exception {
+		setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
+		getTaskConfig().setDriverStrategy(DriverStrategy.RIGHT_HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
+
+		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
+
+		addInput(new DelayingIterator<>(new InfiniteIntTupleIterator(), 100), this.serializer);
+		addInput(new UniformIntTupleGenerator(100, 100, true), this.serializer);
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final Thread taskRunner = new Thread("Task runner for testCancelOuterJoinTaskWhileSort1()") {
+			@Override
+			public void run() {
+				try {
+					testDriver(testTask, MockJoinStub.class);
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		taskRunner.start();
+
+		Thread.sleep(1000);
+
+		cancel();
+		taskRunner.join(60000);
+
+		assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+		final Throwable taskError = error.get();
+		if (taskError != null) {
+			fail("Error in task while canceling:\n" + Throwables.getStackTraceAsString(taskError));
+		}
+	}
+
+	@Test
+	public void testCancelRightOuterJoinTaskWhileProbing() throws Exception {
+		setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
+		getTaskConfig().setDriverStrategy(DriverStrategy.RIGHT_HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
+
+		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
+
+		addInput(new UniformIntTupleGenerator(1, 1, true), this.serializer);
+		addInput(new DelayingIterator<>(new InfiniteIntTupleIterator(), 100), this.serializer);
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final Thread taskRunner = new Thread("Task runner for testCancelOuterJoinTaskWhileSort1()") {
+			@Override
+			public void run() {
+				try {
+					testDriver(testTask, MockJoinStub.class);
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		taskRunner.start();
+
+		Thread.sleep(1000);
+
+		cancel();
+		taskRunner.join(60000);
+
+		assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+		final Throwable taskError = error.get();
+		if (taskError != null) {
+			fail("Error in task while canceling:\n" + Throwables.getStackTraceAsString(taskError));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
new file mode 100644
index 0000000..a766214
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
@@ -0,0 +1,947 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import org.apache.flink.runtime.operators.testutils.types.IntPair;
+import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+@SuppressWarnings({"serial", "deprecation"})
+public class NonReusingHashJoinIteratorITCase {
+	
+	private static final int MEMORY_SIZE = 16000000;		// total memory
+
+	private static final int INPUT_1_SIZE = 20000;
+	private static final int INPUT_2_SIZE = 1000;
+
+	private static final long SEED1 = 561349061987311L;
+	private static final long SEED2 = 231434613412342L;
+	
+	private final AbstractInvokable parentTask = new DummyInvokable();
+
+	private IOManager ioManager;
+	private MemoryManager memoryManager;
+	
+	private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+	private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+	private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
+	
+	private TypeSerializer<IntPair> pairSerializer;
+	private TypeComparator<IntPair> pairComparator;
+	private TypePairComparator<IntPair, Tuple2<Integer, String>> pairRecordPairComparator;
+	private TypePairComparator<Tuple2<Integer, String>, IntPair> recordPairPairComparator;
+
+
+	@SuppressWarnings("unchecked")
+	@Before
+	public void beforeTest() {
+		this.recordSerializer = TestData.getIntStringTupleSerializer();
+		
+		this.record1Comparator = TestData.getIntStringTupleComparator();
+		this.record2Comparator = TestData.getIntStringTupleComparator();
+		
+		this.recordPairComparator = new GenericPairComparator(record1Comparator, record2Comparator);
+		
+		this.pairSerializer = new IntPairSerializer();
+		this.pairComparator = new TestData.IntPairComparator();
+		this.pairRecordPairComparator = new IntPairTuplePairComparator();
+		this.recordPairPairComparator = new TupleIntPairPairComparator();
+		
+		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
+		this.ioManager = new IOManagerAsync();
+	}
+
+	@After
+	public void afterTest() {
+		if (this.ioManager != null) {
+			this.ioManager.shutdown();
+			if (!this.ioManager.isProperlyShutDown()) {
+				Assert.fail("I/O manager failed to properly shut down.");
+			}
+			this.ioManager = null;
+		}
+		
+		if (this.memoryManager != null) {
+			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
+				this.memoryManager.verifyEmpty());
+			this.memoryManager.shutdown();
+			this.memoryManager = null;
+		}
+	}
+
+
+	@Test
+	public void testBuildFirst() {
+		try {
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+			
+			// collect expected data
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = joinTuples(
+					collectTupleData(input1),
+					collectTupleData(input2));
+			
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			NonReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildFirstHashJoinIterator<>(
+						input1, input2, this.recordSerializer, this.record1Comparator, 
+						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+						this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+			
+			iterator.open();
+			
+			while (iterator.callWithNextKey(matcher, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildFirstWithHighNumberOfCommonKeys()
+	{
+		// the size of the left and right inputs
+		final int INPUT_1_SIZE = 200;
+		final int INPUT_2_SIZE = 100;
+		
+		final int INPUT_1_DUPLICATES = 10;
+		final int INPUT_2_DUPLICATES = 2000;
+		final int DUPLICATE_KEY = 13;
+		
+		try {
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			
+			final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+			
+			final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+			
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+			
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+			
+			MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+			MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
+			
+			
+			// collect expected data
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = joinTuples(
+					collectTupleData(input1),
+					collectTupleData(input2));
+			
+			// re-create the whole thing for actual processing
+			
+			// reset the generators and iterators
+			generator1.reset();
+			generator2.reset();
+			const1Iter.reset();
+			const2Iter.reset();
+			gen1Iter.reset();
+			gen2Iter.reset();
+			
+			inList1.clear();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+			
+			inList2.clear();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+	
+			input1 = new UnionIterator<>(inList1);
+			input2 = new UnionIterator<>(inList2);
+			
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+	
+			NonReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildFirstHashJoinIterator<>(
+						input1, input2, this.recordSerializer, this.record1Comparator, 
+						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+						this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+
+			iterator.open();
+			
+			while (iterator.callWithNextKey(matcher, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildSecond() {
+		try {
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+			
+			// collect expected data
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = joinTuples(
+					collectTupleData(input1),
+					collectTupleData(input2));
+			
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+	
+			// compare with iterator values			
+			NonReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new NonReusingBuildSecondHashJoinIterator<>(
+					input1, input2, this.recordSerializer, this.record1Comparator, 
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+
+			iterator.open();
+			
+			while (iterator.callWithNextKey(matcher, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildSecondWithHighNumberOfCommonKeys()
+	{
+		// the size of the left and right inputs
+		final int INPUT_1_SIZE = 200;
+		final int INPUT_2_SIZE = 100;
+		
+		final int INPUT_1_DUPLICATES = 10;
+		final int INPUT_2_DUPLICATES = 2000;
+		final int DUPLICATE_KEY = 13;
+		
+		try {
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			
+			final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+			
+			final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+			
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+			
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+			
+			MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+			MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
+			
+			
+			// collect expected data
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = joinTuples(
+					collectTupleData(input1),
+					collectTupleData(input2));
+			
+			// re-create the whole thing for actual processing
+			
+			// reset the generators and iterators
+			generator1.reset();
+			generator2.reset();
+			const1Iter.reset();
+			const2Iter.reset();
+			gen1Iter.reset();
+			gen2Iter.reset();
+			
+			inList1.clear();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+			
+			inList2.clear();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+	
+			input1 = new UnionIterator<>(inList1);
+			input2 = new UnionIterator<>(inList2);
+			
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+
+			NonReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new NonReusingBuildSecondHashJoinIterator<>(
+					input1, input2, this.recordSerializer, this.record1Comparator, 
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+			
+			iterator.open();
+			
+			while (iterator.callWithNextKey(matcher, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildFirstWithMixedDataTypes() {
+		try {
+			MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
+			
+			final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+			
+			// collect expected data
+			final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = joinIntPairs(
+					collectIntPairData(input1),
+					collectTupleData(input2));
+			
+			final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+	
+			// reset the generators
+			input1 = new UniformIntPairGenerator(500, 40, false);
+			generator2.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			NonReusingBuildSecondHashJoinIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildSecondHashJoinIterator<>(
+						input1, input2, this.pairSerializer, this.pairComparator,
+						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true);
+			
+			iterator.open();
+			
+			while (iterator.callWithNextKey(matcher, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildSecondWithMixedDataTypes() {
+		try {
+			MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
+			
+			final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+			
+			// collect expected data
+			final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = joinIntPairs(
+					collectIntPairData(input1),
+					collectTupleData(input2));
+			
+			final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+	
+			// reset the generators
+			input1 = new UniformIntPairGenerator(500, 40, false);
+			generator2.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			NonReusingBuildFirstHashJoinIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildFirstHashJoinIterator<>(
+						input1, input2, this.pairSerializer, this.pairComparator, 
+						this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true);
+			
+			iterator.open();
+			
+			while (iterator.callWithNextKey(matcher, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testBuildFirstJoinWithEmptyBuild() {
+		try {
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+			// collect expected data
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = rightOuterJoinTuples(
+					collectTupleData(input1),
+					collectTupleData(input2));
+
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+
+			// compare with iterator values
+			NonReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildFirstHashJoinIterator<>(
+							input1, input2, this.recordSerializer, this.record1Comparator,
+							this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+							this.memoryManager, ioManager, this.parentTask, 1.0, true, false);
+
+			iterator.open();
+
+			while (iterator.callWithNextKey(matcher, collector));
+
+			iterator.close();
+
+			// assert that each expected match was seen
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testBuildSecondJoinWithEmptyBuild() {
+		try {
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+			// collect expected data
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = leftOuterJoinTuples(
+					collectTupleData(input1),
+					collectTupleData(input2));
+
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+
+			// compare with iterator values
+			NonReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildSecondHashJoinIterator<>(
+							input1, input2, this.recordSerializer, this.record1Comparator,
+							this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+							this.memoryManager, ioManager, this.parentTask, 1.0, true, false);
+
+			iterator.open();
+
+			while (iterator.callWithNextKey(matcher, collector));
+
+			iterator.close();
+
+			// assert that each expected match was seen
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                                    Utilities
+	// --------------------------------------------------------------------------------------------
+
+	
+	
+	public static Map<Integer, Collection<TupleMatch>> joinTuples(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
+	{
+		Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
+
+		for (Integer key : leftMap.keySet()) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
+
+			if (rightValues == null) {
+				continue;
+			}
+
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<TupleMatch>());
+			}
+
+			Collection<TupleMatch> matchedValues = map.get(key);
+
+			for (String leftValue : leftValues) {
+				for (String rightValue : rightValues) {
+					matchedValues.add(new TupleMatch(leftValue, rightValue));
+				}
+			}
+		}
+
+		return map;
+	}
+
+	public static Map<Integer, Collection<TupleMatch>> leftOuterJoinTuples(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
+	{
+		Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
+
+		for (Integer key : leftMap.keySet()) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
+
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<TupleMatch>());
+			}
+
+			Collection<TupleMatch> matchedValues = map.get(key);
+
+			for (String leftValue : leftValues) {
+				if(rightValues != null) {
+					for (String rightValue : rightValues) {
+						matchedValues.add(new TupleMatch(leftValue, rightValue));
+					}
+				}
+				else {
+					matchedValues.add(new TupleMatch(leftValue, null));
+				}
+			}
+		}
+
+		return map;
+	}
+
+	public static Map<Integer, Collection<TupleMatch>> rightOuterJoinTuples(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
+	{
+		Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
+
+		for (Integer key : rightMap.keySet()) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
+
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<TupleMatch>());
+			}
+
+			Collection<TupleMatch> matchedValues = map.get(key);
+
+			for (String rightValue : rightValues) {
+				if(leftValues != null) {
+					for (String leftValue : leftValues) {
+						matchedValues.add(new TupleMatch(leftValue, rightValue));
+					}
+				}
+				else {
+					matchedValues.add(new TupleMatch(null, rightValue));
+				}
+			}
+		}
+
+		return map;
+	}
+	
+	public static Map<Integer, Collection<TupleIntPairMatch>> joinIntPairs(
+			Map<Integer, Collection<Integer>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
+	{
+		final Map<Integer, Collection<TupleIntPairMatch>> map = new HashMap<>();
+	
+		for (Integer i : leftMap.keySet()) {
+			
+			final Collection<Integer> leftValues = leftMap.get(i);
+			final Collection<String> rightValues = rightMap.get(i);
+	
+			if (rightValues == null) {
+				continue;
+			}
+	
+			if (!map.containsKey(i)) {
+				map.put(i, new ArrayList<TupleIntPairMatch>());
+			}
+	
+			final Collection<TupleIntPairMatch> matchedValues = map.get(i);
+	
+			for (Integer v : leftValues) {
+				for (String val : rightValues) {
+					matchedValues.add(new TupleIntPairMatch(v, val));
+				}
+			}
+		}
+	
+		return map;
+	}
+
+	
+	public static Map<Integer, Collection<String>> collectTupleData(MutableObjectIterator<Tuple2<Integer, String>> iter)
+	throws Exception
+	{
+		Map<Integer, Collection<String>> map = new HashMap<>();
+		Tuple2<Integer, String> pair = new Tuple2<>();
+		
+		while ((pair = iter.next(pair)) != null) {
+
+			Integer key = pair.f0;
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<String>());
+			}
+
+			Collection<String> values = map.get(key);
+			values.add(pair.f1);
+		}
+
+		return map;
+	}
+	
+	public static Map<Integer, Collection<Integer>> collectIntPairData(MutableObjectIterator<IntPair> iter)
+	throws Exception
+	{
+		Map<Integer, Collection<Integer>> map = new HashMap<>();
+		IntPair pair = new IntPair();
+		
+		while ((pair = iter.next(pair)) != null) {
+
+			final int key = pair.getKey();
+			final int value = pair.getValue();
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<Integer>());
+			}
+
+			Collection<Integer> values = map.get(key);
+			values.add(value);
+		}
+
+		return map;
+	}
+
+	/**
+	 * Class used for storage of the expected matches in a hash-map.
+	 */
+	public static class TupleMatch {
+		
+		private final String left;
+		private final String right;
+
+		public TupleMatch(String left, String right) {
+			this.left = left;
+			this.right = right;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			TupleMatch o = (TupleMatch) obj;
+
+			if(left != null && o.left != null && right != null && o.right != null) {
+				return this.left.equals(o.left) && this.right.equals(o.right);
+			}
+			else if(left == null && o.left == null) {
+				return this.right.equals(o.right);
+			}
+			else if(right == null && o.right == null) {
+				return this.left.equals(o.left);
+			}
+			else if(left == null && o.left == null && right == null && o.right == null) {
+				return true;
+			}
+			else {
+				return false;
+			}
+		}
+		
+		@Override
+		public int hashCode() {
+			int hc = this.left != null ? this.left.hashCode() : 23;
+			hc = hc ^ (this.right != null ? this.right.hashCode() : 41);
+			return hc;
+		}
+
+		@Override
+		public String toString() {
+			String s = left == null ? "<null>" : left;
+			s += ", " + right == null ? "<null>" : right;
+			return s;
+		}
+	}
+	
+	/**
+	 * Private class used for storage of the expected matches in a hash-map.
+	 */
+	public static class TupleIntPairMatch
+	{
+		private final int left;
+		private final String right;
+
+		public TupleIntPairMatch(int left, String right) {
+			this.left = left;
+			this.right = new String(right);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			TupleIntPairMatch o = (TupleIntPairMatch) obj;
+			return this.left == o.left && this.right.equals(o.right);
+		}
+		
+		@Override
+		public int hashCode() {
+			return this.left ^ this.right.hashCode();
+		}
+
+		@Override
+		public String toString() {
+			return left + ", " + right;
+		}
+	}
+	
+	static final class TupleMatchRemovingJoin implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>
+	{
+		private final Map<Integer, Collection<TupleMatch>> toRemoveFrom;
+		
+		protected TupleMatchRemovingJoin(Map<Integer, Collection<TupleMatch>> map) {
+			this.toRemoveFrom = map;
+		}
+		
+		@Override
+		public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
+		{
+
+			int key = rec1 != null ? rec1.f0 : rec2.f0;
+			String value1 = rec1 != null ? rec1.f1 : null;
+			String value2 = rec2 != null ? rec2.f1 : null;
+
+			//System.err.println("rec1 key = "+key+"  rec2 key= "+rec2.f0);
+			Collection<TupleMatch> matches = this.toRemoveFrom.get(key);
+			if (matches == null) {
+				Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
+			}
+			
+			Assert.assertTrue("Produced match was not contained: " + key + " - " + value1 + ":" + value2,
+				matches.remove(new TupleMatch(value1, value2)));
+			
+			if (matches.isEmpty()) {
+				this.toRemoveFrom.remove(key);
+			}
+		}
+	}
+	
+	static final class TupleIntPairMatchRemovingMatcher implements FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>>
+	{
+		private final Map<Integer, Collection<TupleIntPairMatch>> toRemoveFrom;
+		
+		protected TupleIntPairMatchRemovingMatcher(Map<Integer, Collection<TupleIntPairMatch>> map) {
+			this.toRemoveFrom = map;
+		}
+		
+		@Override
+		public void join(IntPair rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
+		{
+			final int k = rec1.getKey();
+			final int v = rec1.getValue(); 
+			
+			final Integer key = rec2.f0;
+			final String value = rec2.f1;
+
+			Assert.assertTrue("Key does not match for matching IntPair Tuple combination.", k == key);
+			
+			Collection<TupleIntPairMatch> matches = this.toRemoveFrom.get(key);
+			if (matches == null) {
+				Assert.fail("Match " + key + " - " + v + ":" + value + " is unexpected.");
+			}
+			
+			Assert.assertTrue("Produced match was not contained: " + key + " - " + v + ":" + value,
+				matches.remove(new TupleIntPairMatch(v, value)));
+			
+			if (matches.isEmpty()) {
+				this.toRemoveFrom.remove(key);
+			}
+		}
+	}
+	
+	static final class IntPairTuplePairComparator extends TypePairComparator<IntPair, Tuple2<Integer, String>>
+	{
+		private int reference;
+		
+		@Override
+		public void setReference(IntPair reference) {
+			this.reference = reference.getKey();	
+		}
+
+		@Override
+		public boolean equalToReference(Tuple2<Integer, String> candidate) {
+			try {
+				return candidate.f0 == this.reference;
+			} catch (NullPointerException npex) {
+				throw new NullKeyFieldException();
+			}
+		}
+
+		@Override
+		public int compareToReference(Tuple2<Integer, String> candidate) {
+			try {
+				return candidate.f0 - this.reference;
+			} catch (NullPointerException npex) {
+				throw new NullKeyFieldException();
+			}
+		}
+	}
+	
+	static final class TupleIntPairPairComparator extends TypePairComparator<Tuple2<Integer, String>, IntPair>
+	{
+		private int reference;
+		
+		@Override
+		public void setReference(Tuple2<Integer, String> reference) {
+			this.reference = reference.f0;
+		}
+
+		@Override
+		public boolean equalToReference(IntPair candidate) {
+			return this.reference == candidate.getKey();
+		}
+
+		@Override
+		public int compareToReference(IntPair candidate) {
+			return candidate.getKey() - this.reference;
+		}
+	}
+}