You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/23 15:34:52 UTC

[2/5] flink git commit: [FLINK-1982] [record-api] Remove dependencies on Record API from flink-runtime tests

[FLINK-1982] [record-api] Remove dependencies on Record API from flink-runtime tests

Rename Match*Test to Join*Test and MapTaskTest to FlatMapTaskTest

This closes #1294


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c8a6588
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c8a6588
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c8a6588

Branch: refs/heads/master
Commit: 3c8a6588ac9bd35984d6e3b1eef916461a262fe4
Parents: 7ff071f
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Oct 22 21:10:41 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 23 13:01:37 2015 +0200

----------------------------------------------------------------------
 .../operators/CombineTaskExternalITCase.java    |   16 +-
 .../flink/runtime/operators/CrossTaskTest.java  |   41 +-
 .../runtime/operators/DataSinkTaskTest.java     |   52 +-
 .../runtime/operators/DataSourceTaskTest.java   |   16 +-
 .../runtime/operators/FlatMapTaskTest.java      |  149 +++
 .../operators/JoinTaskExternalITCase.java       |  165 +++
 .../flink/runtime/operators/JoinTaskTest.java   | 1017 +++++++++++++++++
 .../flink/runtime/operators/MapTaskTest.java    |  151 ---
 .../operators/MatchTaskExternalITCase.java      |  167 ---
 .../flink/runtime/operators/MatchTaskTest.java  | 1019 ------------------
 .../operators/ReduceTaskExternalITCase.java     |   16 +-
 .../flink/runtime/operators/ReduceTaskTest.java |   19 +-
 .../operators/chaining/ChainTaskTest.java       |   25 +-
 .../operators/testutils/TaskTestBase.java       |   18 +-
 14 files changed, 1428 insertions(+), 1443 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index 4905e57..800bca7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -22,11 +22,11 @@ import java.util.ArrayList;
 import java.util.HashMap;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -42,7 +42,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 
 	private final double combine_frac;
 	
-	private final ArrayList<Record> outList = new ArrayList<Record>();
+	private final ArrayList<Record> outList = new ArrayList<>();
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator = new RecordComparator(
@@ -69,7 +69,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
-		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
+		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<>();
 		
 		try {
 			testDriver(testTask, MockCombiningReduceStub.class);
@@ -85,7 +85,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		
 		// wee need to do the final aggregation manually in the test, because the
 		// combiner is not guaranteed to do that
-		final HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>();
+		final HashMap<IntValue, IntValue> aggMap = new HashMap<>();
 		for (Record record : this.outList) {
 			IntValue key = new IntValue();
 			IntValue value = new IntValue();
@@ -123,7 +123,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
-		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
+		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<>();
 
 		try {
 			testDriver(testTask, MockCombiningReduceStub.class);
@@ -139,7 +139,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		
 		// wee need to do the final aggregation manually in the test, because the
 		// combiner is not guaranteed to do that
-		final HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>();
+		final HashMap<IntValue, IntValue> aggMap = new HashMap<>();
 		for (Record record : this.outList) {
 			IntValue key = new IntValue();
 			IntValue value = new IntValue();
@@ -166,7 +166,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 	// ------------------------------------------------------------------------
 	// ------------------------------------------------------------------------
 
-	@ReduceOperator.Combinable
+	@Combinable
 	public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 
@@ -194,7 +194,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		}
 	}
 
-	@ReduceOperator.Combinable
+	@Combinable
 	public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
index 4c27a68..2d8838a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.Record;
 import org.junit.Test;
 
-@SuppressWarnings("deprecation")
 public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>> {
 	
 	private static final long CROSS_MEM = 1024 * 1024;
@@ -65,7 +64,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -95,7 +94,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -123,7 +122,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockFailingCrossStub.class);
@@ -153,7 +152,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockFailingCrossStub.class);
@@ -184,7 +183,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -215,7 +214,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -243,7 +242,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockFailingCrossStub.class);
@@ -272,7 +271,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockFailingCrossStub.class);
@@ -303,7 +302,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -333,7 +332,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -363,7 +362,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -393,7 +392,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -420,7 +419,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		
@@ -463,7 +462,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		
@@ -485,7 +484,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		
 		try {
 			tct.join();
-			taskRunner.join();		
+			taskRunner.join();
 		} catch(InterruptedException ie) {
 			Assert.fail("Joining threads failed");
 		}
@@ -506,7 +505,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		
@@ -549,7 +548,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		
@@ -571,7 +570,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		
 		try {
 			tct.join();
-			taskRunner.join();		
+			taskRunner.join();
 		} catch(InterruptedException ie) {
 			Assert.fail("Joining threads failed");
 		}
@@ -579,7 +578,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		Assert.assertTrue("Exception was thrown despite proper canceling.", success.get());
 	}
 	
-	public static final class MockCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
+	public static final class MockCrossStub implements CrossFunction<Record, Record, Record> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -588,7 +587,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		}
 	}
 	
-	public static final class MockFailingCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
+	public static final class MockFailingCrossStub implements CrossFunction<Record, Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private int cnt = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index e91d338..b741b64 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.junit.After;
 import org.junit.Assert;
@@ -81,7 +80,7 @@ public class DataSinkTaskTest extends TaskTestBase
 			super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 			super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 
-			DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+			DataSinkTask<Record> testTask = new DataSinkTask<>();
 
 			super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
 
@@ -94,7 +93,7 @@ public class DataSinkTaskTest extends TaskTestBase
 			fr = new FileReader(tempTestFile);
 			br = new BufferedReader(fr);
 
-			HashMap<Integer, HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt);
+			HashMap<Integer, HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt);
 
 			while (br.ready()) {
 				String line = br.readLine();
@@ -144,7 +143,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		readers[2] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 2, 0, false), 0, false);
 		readers[3] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 3, 0, false), 0, false);
 
-		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+		DataSinkTask<Record> testTask = new DataSinkTask<>();
 
 		super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
 
@@ -171,7 +170,7 @@ public class DataSinkTaskTest extends TaskTestBase
 			fr = new FileReader(tempTestFile);
 			br = new BufferedReader(fr);
 
-			HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt);
+			HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt);
 
 			while(br.ready()) {
 				String line = br.readLine();
@@ -219,12 +218,12 @@ public class DataSinkTaskTest extends TaskTestBase
 
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
 
-		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+		DataSinkTask<Record> testTask = new DataSinkTask<>();
 
 		// set sorting
 		super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
 		super.getTaskConfig().setInputComparator(
-				new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})), 0);
+				new RecordComparatorFactory(new int[]{1},(new Class[]{IntValue.class})), 0);
 		super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
@@ -248,7 +247,7 @@ public class DataSinkTaskTest extends TaskTestBase
 			fr = new FileReader(tempTestFile);
 			br = new BufferedReader(fr);
 
-			Set<Integer> keys = new HashSet<Integer>();
+			Set<Integer> keys = new HashSet<>();
 
 			int curVal = -1;
 			while(br.ready()) {
@@ -297,7 +296,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 
-		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+		DataSinkTask<Record> testTask = new DataSinkTask<>();
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
 
@@ -323,21 +322,20 @@ public class DataSinkTaskTest extends TaskTestBase
 	public void testFailingSortingDataSinkTask() {
 
 		int keyCnt = 100;
-		int valCnt = 20;;
+		int valCnt = 20;
 		double memoryFraction = 1.0;
 
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
 
-		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+		DataSinkTask<Record> testTask = new DataSinkTask<>();
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
 
 		// set sorting
 		super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
 		super.getTaskConfig().setInputComparator(
-				new RecordComparatorFactory(new int[]{1}, ((Class<? extends Key<?>>[]) new Class[]{IntValue.class})),
-				0);
+				new RecordComparatorFactory(new int[]{1}, ( new Class[]{IntValue.class})), 0);
 		super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
@@ -365,7 +363,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new InfiniteInputIterator(), 0);
 
-		final DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+		final DataSinkTask<Record> testTask = new DataSinkTask<>();
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
 
@@ -407,15 +405,14 @@ public class DataSinkTaskTest extends TaskTestBase
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new InfiniteInputIterator(), 0);
 
-		final DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+		final DataSinkTask<Record> testTask = new DataSinkTask<>();
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
 
 		// set sorting
 		super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
 		super.getTaskConfig().setInputComparator(
-				new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})),
-				0);
+				new RecordComparatorFactory(new int[]{1},(new Class[]{IntValue.class})), 0);
 		super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
@@ -447,7 +444,7 @@ public class DataSinkTaskTest extends TaskTestBase
 
 	}
 
-	public static class MockOutputFormat extends DelimitedOutputFormat {
+	public static class MockOutputFormat extends FileOutputFormat<Record> {
 		private static final long serialVersionUID = 1L;
 
 		final StringBuilder bld = new StringBuilder();
@@ -458,8 +455,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		}
 
 		@Override
-		public int serializeRecord(Record rec, byte[] target) throws Exception
-		{
+		public void writeRecord(Record rec) throws IOException {
 			IntValue key = rec.getField(0, IntValue.class);
 			IntValue value = rec.getField(1, IntValue.class);
 
@@ -467,14 +463,11 @@ public class DataSinkTaskTest extends TaskTestBase
 			this.bld.append(key.getValue());
 			this.bld.append('_');
 			this.bld.append(value.getValue());
+			this.bld.append('\n');
 
 			byte[] bytes = this.bld.toString().getBytes();
-			if (bytes.length <= target.length) {
-				System.arraycopy(bytes, 0, target, 0, bytes.length);
-				return bytes.length;
-			}
-			// else
-			return -bytes.length;
+
+			this.stream.write(bytes);
 		}
 
 	}
@@ -490,12 +483,11 @@ public class DataSinkTaskTest extends TaskTestBase
 		}
 
 		@Override
-		public int serializeRecord(Record rec, byte[] target) throws Exception
-		{
+		public void writeRecord(Record rec) throws IOException {
 			if (++this.cnt >= 10) {
 				throw new RuntimeException("Expected Test Exception");
 			}
-			return super.serializeRecord(rec, target);
+			super.writeRecord(rec);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 4548410..96ae700 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -28,11 +28,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.junit.Assert;
 
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
 import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -83,7 +83,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addOutput(this.outList);
 		
-		DataSourceTask<Record> testTask = new DataSourceTask<Record>();
+		DataSourceTask<Record> testTask = new DataSourceTask<>();
 		
 		super.registerFileInputTask(testTask, MockInputFormat.class, new File(tempTestPath).toURI().toString(), "\n");
 		
@@ -97,7 +97,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		Assert.assertTrue("Invalid output size. Expected: "+(keyCnt*valCnt)+" Actual: "+this.outList.size(),
 			this.outList.size() == keyCnt * valCnt);
 		
-		HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt);
+		HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt);
 		
 		for (Record kvp : this.outList) {
 			
@@ -138,7 +138,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addOutput(this.outList);
 		
-		DataSourceTask<Record> testTask = new DataSourceTask<Record>();
+		DataSourceTask<Record> testTask = new DataSourceTask<>();
 
 		super.registerFileInputTask(testTask, MockFailingInputFormat.class, new File(tempTestPath).toURI().toString(), "\n");
 		
@@ -172,7 +172,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 			Assert.fail("Unable to set-up test input file");
 		}
 		
-		final DataSourceTask<Record> testTask = new DataSourceTask<Record>();
+		final DataSourceTask<Record> testTask = new DataSourceTask<>();
 
 		super.registerFileInputTask(testTask, MockDelayingInputFormat.class,  new File(tempTestPath).toURI().toString(), "\n");
 		
@@ -232,7 +232,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		}
 	}
 	
-	public static class MockInputFormat extends DelimitedInputFormat {
+	public static class MockInputFormat extends DelimitedInputFormat<Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private final IntValue key = new IntValue();
@@ -257,7 +257,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		}
 	}
 	
-	public static class MockDelayingInputFormat extends DelimitedInputFormat {
+	public static class MockDelayingInputFormat extends DelimitedInputFormat<Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private final IntValue key = new IntValue();
@@ -289,7 +289,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		
 	}
 	
-	public static class MockFailingInputFormat extends DelimitedInputFormat {
+	public static class MockFailingInputFormat extends DelimitedInputFormat<Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private final IntValue key = new IntValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java
new file mode 100644
index 0000000..3d7f99e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DriverTestBase;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
+import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlatMapTaskTest extends DriverTestBase<FlatMapFunction<Record, Record>> {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(FlatMapTaskTest.class);
+	
+	private final CountingOutputCollector output = new CountingOutputCollector();
+	
+	
+	public FlatMapTaskTest(ExecutionConfig config) {
+		super(config, 0, 0);
+	}
+	
+	@Test
+	public void testMapTask() {
+		final int keyCnt = 100;
+		final int valCnt = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+		setOutput(this.output);
+		
+		final FlatMapDriver<Record, Record> testDriver = new FlatMapDriver<>();
+		
+		try {
+			testDriver(testDriver, MockMapStub.class);
+		} catch (Exception e) {
+			LOG.debug("Exception while running the test driver.", e);
+			Assert.fail("Invoke method caused exception.");
+		}
+		
+		Assert.assertEquals("Wrong result set size.", keyCnt*valCnt, this.output.getNumberOfRecords());
+	}
+	
+	@Test
+	public void testFailingMapTask() {
+		final int keyCnt = 100;
+		final int valCnt = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+		setOutput(new DiscardingOutputCollector<Record>());
+		
+		final FlatMapDriver<Record, Record> testTask = new FlatMapDriver<>();
+		try {
+			testDriver(testTask, MockFailingMapStub.class);
+			Assert.fail("Function exception was not forwarded.");
+		} catch (ExpectedTestException e) {
+			// good!
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Exception in test.");
+		}
+	}
+	
+	@Test
+	public void testCancelMapTask() {
+		addInput(new InfiniteInputIterator());
+		setOutput(new DiscardingOutputCollector<Record>());
+		
+		final FlatMapDriver<Record, Record> testTask = new FlatMapDriver<>();
+		
+		final AtomicBoolean success = new AtomicBoolean(false);
+		
+		final Thread taskRunner = new Thread() {
+			@Override
+			public void run() {
+				try {
+					testDriver(testTask, MockMapStub.class);
+					success.set(true);
+				} catch (Exception ie) {
+					ie.printStackTrace();
+				}
+			}
+		};
+		taskRunner.start();
+		
+		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
+		tct.start();
+		
+		try {
+			tct.join();
+			taskRunner.join();		
+		} catch(InterruptedException ie) {
+			Assert.fail("Joining threads failed");
+		}
+		
+		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+	}
+	
+	public static class MockMapStub extends RichFlatMapFunction<Record, Record> {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public void flatMap(Record record, Collector<Record> out) throws Exception {
+			out.collect(record);
+		}
+		
+	}
+	
+	public static class MockFailingMapStub extends RichFlatMapFunction<Record, Record> {
+		private static final long serialVersionUID = 1L;
+		
+		private int cnt = 0;
+		
+		@Override
+		public void flatMap(Record record, Collector<Record> out) throws Exception {
+			if (++this.cnt >= 10) {
+				throw new ExpectedTestException();
+			}
+			out.collect(record);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
new file mode 100644
index 0000000..5b2e6eb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.junit.Assert;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.operators.testutils.DriverTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class JoinTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
+	
+	private static final long HASH_MEM = 4*1024*1024;
+	
+	private static final long SORT_MEM = 3*1024*1024;
+	
+	private static final long BNLJN_MEM = 10 * PAGE_SIZE;
+
+	private final double bnljn_frac;
+
+	private final double hash_frac;
+
+	@SuppressWarnings("unchecked")
+	private final RecordComparator comparator1 = new RecordComparator(
+		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+	
+	@SuppressWarnings("unchecked")
+	private final RecordComparator comparator2 = new RecordComparator(
+		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+	
+	private final CountingOutputCollector output = new CountingOutputCollector();
+	
+	public JoinTaskExternalITCase(ExecutionConfig config) {
+		super(config, HASH_MEM, 2, SORT_MEM);
+		bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
+		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
+	}
+	
+	@Test
+	public void testExternalSort1MatchTask() {
+		final int keyCnt1 = 16384*4;
+		final int valCnt1 = 2;
+		
+		final int keyCnt2 = 8192;
+		final int valCnt2 = 4*2;
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		setOutput(this.output);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
+	}
+	
+	@Test
+	public void testExternalHash1MatchTask() {
+		final int keyCnt1 = 32768;
+		final int valCnt1 = 8;
+		
+		final int keyCnt2 = 65536;
+		final int valCnt2 = 8;
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.output);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
+	}
+	
+	@Test
+	public void testExternalHash2MatchTask() {
+		final int keyCnt1 = 32768;
+		final int valCnt1 = 8;
+		
+		final int keyCnt2 = 65536;
+		final int valCnt2 = 8;
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.output);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
+	}
+	
+	public static final class MockMatchStub implements FlatJoinFunction<Record, Record, Record> {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
+			out.collect(value1);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
new file mode 100644
index 0000000..ecde59e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
@@ -0,0 +1,1017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
+import org.apache.flink.runtime.operators.testutils.DriverTestBase;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
+import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class JoinTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
+	
+	private static final long HASH_MEM = 6*1024*1024;
+	
+	private static final long SORT_MEM = 3*1024*1024;
+
+	private static final int NUM_SORTER = 2;
+	
+	private static final long BNLJN_MEM = 10 * PAGE_SIZE;
+
+	private final double bnljn_frac;
+
+	private final double hash_frac;
+	
+	@SuppressWarnings("unchecked")
+	private final RecordComparator comparator1 = new RecordComparator(
+		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
+	
+	@SuppressWarnings("unchecked")
+	private final RecordComparator comparator2 = new RecordComparator(
+		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
+	
+	private final List<Record> outList = new ArrayList<>();
+	
+	
+	public JoinTaskTest(ExecutionConfig config) {
+		super(config, HASH_MEM, NUM_SORTER, SORT_MEM);
+		bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
+		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
+	}
+	
+	
+	@Test
+	public void testSortBoth1MatchTask() {
+		final int keyCnt1 = 20;
+		final int valCnt1 = 1;
+		
+		final int keyCnt2 = 10;
+		final int valCnt2 = 2;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+	}
+	
+	@Test
+	public void testSortBoth2MatchTask() {
+
+		int keyCnt1 = 20;
+		int valCnt1 = 1;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 1;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testSortBoth3MatchTask() {
+
+		int keyCnt1 = 20;
+		int valCnt1 = 1;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testSortBoth4MatchTask() {
+
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 1;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testSortBoth5MatchTask() {
+
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testSortFirstMatchTask() {
+
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testSortSecondMatchTask() {
+
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testMergeMatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testFailingMatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		setOutput(new NirvanaOutputList());
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
+		
+		try {
+			testDriver(testTask, MockFailingMatchStub.class);
+			Assert.fail("Driver did not forward Exception.");
+		} catch (ExpectedTestException e) {
+			// good!
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+	}
+	
+	@Test
+	public void testCancelMatchTaskWhileSort1() {
+		final int keyCnt = 20;
+		final int valCnt = 20;
+		
+		try {
+			setOutput(new NirvanaOutputList());
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+			getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+			setNumFileHandlesForSort(4);
+			
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+			
+			try {
+				addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+				addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+			} catch (Exception e) {
+				e.printStackTrace();
+				Assert.fail("The test caused an exception.");
+			}
+	
+			final AtomicReference<Throwable> error = new AtomicReference<>();
+
+			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort1()") {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockMatchStub.class);
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
+				}
+			};
+			taskRunner.start();
+
+			Thread.sleep(1000);
+
+			cancel();
+			taskRunner.interrupt();
+
+			taskRunner.join(60000);
+
+			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+			Throwable taskError = error.get();
+			if (taskError != null) {
+				taskError.printStackTrace();
+				fail("Error in task while canceling: " + taskError.getMessage());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCancelMatchTaskWhileSort2() {
+		final int keyCnt = 20;
+		final int valCnt = 20;
+		
+		try {
+			setOutput(new NirvanaOutputList());
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+			getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+			setNumFileHandlesForSort(4);
+			
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+			
+			try {
+				addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+				addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+			} catch (Exception e) {
+				e.printStackTrace();
+				Assert.fail("The test caused an exception.");
+			}
+
+			final AtomicReference<Throwable> error = new AtomicReference<>();
+
+			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort2()") {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockMatchStub.class);
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
+				}
+			};
+			taskRunner.start();
+
+			Thread.sleep(1000);
+
+			cancel();
+			taskRunner.interrupt();
+
+			taskRunner.join(60000);
+
+			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+			Throwable taskError = error.get();
+			if (taskError != null) {
+				taskError.printStackTrace();
+				fail("Error in task while canceling: " + taskError.getMessage());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCancelMatchTaskWhileMatching() {
+		final int keyCnt = 20;
+		final int valCnt = 20;
+		
+		try {
+			setOutput(new NirvanaOutputList());
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+			getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+			setNumFileHandlesForSort(4);
+			
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+			
+			addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+			addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+			
+			final AtomicReference<Throwable> error = new AtomicReference<>();
+			
+			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileMatching()") {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockDelayingMatchStub.class);
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
+				}
+			};
+			taskRunner.start();
+			
+			Thread.sleep(1000);
+			
+			cancel();
+			taskRunner.interrupt();
+			
+			taskRunner.join(60000);
+			
+			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+			
+			Throwable taskError = error.get();
+			if (taskError != null) {
+				taskError.printStackTrace();
+				fail("Error in task while canceling: " + taskError.getMessage());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testHash1MatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 1;
+		
+		int keyCnt2 = 10;
+		int valCnt2 = 2;
+				
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.outList);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+		this.outList.clear();
+	}
+	
+	@Test
+	public void testHash2MatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 1;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 1;
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.outList);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+		this.outList.clear();
+	}
+	
+	@Test
+	public void testHash3MatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 1;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.outList);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+		this.outList.clear();
+	}
+	
+	@Test
+	public void testHash4MatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 1;
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.outList);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+		this.outList.clear();
+	}
+	
+	@Test
+	public void testHash5MatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.outList);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+		this.outList.clear();
+	}
+	
+	@Test
+	public void testFailingHashFirstMatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(new NirvanaOutputList());
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockFailingMatchStub.class);
+			Assert.fail("Function exception was not forwarded.");
+		} catch (ExpectedTestException etex) {
+			// good!
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+	}
+	
+	@Test
+	public void testFailingHashSecondMatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(new NirvanaOutputList());
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockFailingMatchStub.class);
+			Assert.fail("Function exception was not forwarded.");
+		} catch (ExpectedTestException etex) {
+			// good!
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+	}
+	
+	@Test
+	public void testCancelHashMatchTaskWhileBuildFirst() {
+		final int keyCnt = 20;
+		final int valCnt = 20;
+
+		try {
+			addInput(new DelayingInfinitiveInputIterator(100));
+			addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+
+			setOutput(new NirvanaOutputList());
+
+			getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+			getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+			final AtomicBoolean success = new AtomicBoolean(false);
+
+			Thread taskRunner = new Thread() {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockMatchStub.class);
+						success.set(true);
+					} catch (Exception ie) {
+						ie.printStackTrace();
+					}
+				}
+			};
+			taskRunner.start();
+
+			Thread.sleep(1000);
+			cancel();
+
+			try {
+				taskRunner.join();
+			}
+			catch (InterruptedException ie) {
+				Assert.fail("Joining threads failed");
+			}
+
+			Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testHashCancelMatchTaskWhileBuildSecond() {
+		final int keyCnt = 20;
+		final int valCnt = 20;
+
+		try {
+			addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+			addInput(new DelayingInfinitiveInputIterator(100));
+
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+
+			setOutput(new NirvanaOutputList());
+
+			getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+			getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+			final AtomicBoolean success = new AtomicBoolean(false);
+
+			Thread taskRunner = new Thread() {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockMatchStub.class);
+						success.set(true);
+					} catch (Exception ie) {
+						ie.printStackTrace();
+					}
+				}
+			};
+			taskRunner.start();
+
+			Thread.sleep(1000);
+			cancel();
+
+			try {
+				taskRunner.join();
+			}
+			catch (InterruptedException ie) {
+				Assert.fail("Joining threads failed");
+			}
+
+			Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testHashFirstCancelMatchTaskWhileMatching() {
+		int keyCnt = 20;
+		int valCnt = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(new NirvanaOutputList());
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		final AtomicBoolean success = new AtomicBoolean(false);
+		
+		Thread taskRunner = new Thread() {
+			@Override
+			public void run() {
+				try {
+					testDriver(testTask, MockMatchStub.class);
+					success.set(true);
+				} catch (Exception ie) {
+					ie.printStackTrace();
+				}
+			}
+		};
+		taskRunner.start();
+		
+		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
+		tct.start();
+		
+		try {
+			tct.join();
+			taskRunner.join();
+		} catch(InterruptedException ie) {
+			Assert.fail("Joining threads failed");
+		}
+		
+		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+	}
+	
+	@Test
+	public void testHashSecondCancelMatchTaskWhileMatching() {
+		int keyCnt = 20;
+		int valCnt = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(new NirvanaOutputList());
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		final AtomicBoolean success = new AtomicBoolean(false);
+		
+		Thread taskRunner = new Thread() {
+			@Override
+			public void run() {
+				try {
+					testDriver(testTask, MockMatchStub.class);
+					success.set(true);
+				} catch (Exception ie) {
+					ie.printStackTrace();
+				}
+			}
+		};
+		taskRunner.start();
+		
+		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
+		tct.start();
+		
+		try {
+			tct.join();
+			taskRunner.join();
+		} catch(InterruptedException ie) {
+			Assert.fail("Joining threads failed");
+		}
+		
+		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+	}
+	
+	// =================================================================================================
+	
+	public static final class MockMatchStub implements FlatJoinFunction<Record, Record, Record> {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
+			out.collect(record1);
+		}
+	}
+	
+	public static final class MockFailingMatchStub implements FlatJoinFunction<Record, Record, Record> {
+		private static final long serialVersionUID = 1L;
+		
+		private int cnt = 0;
+		
+		@Override
+		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
+			if (++this.cnt >= 10) {
+				throw new ExpectedTestException();
+			}
+			out.collect(record1);
+		}
+	}
+	
+	public static final class MockDelayingMatchStub implements FlatJoinFunction<Record, Record, Record> {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
+			try {
+				Thread.sleep(100);
+			} catch (InterruptedException e) {
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
deleted file mode 100644
index bfc6c44..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
-import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class MapTaskTest extends DriverTestBase<GenericCollectorMap<Record, Record>> {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(MapTaskTest.class);
-	
-	private final CountingOutputCollector output = new CountingOutputCollector();
-	
-	
-	public MapTaskTest(ExecutionConfig config) {
-		super(config, 0, 0);
-	}
-	
-	@Test
-	public void testMapTask() {
-		final int keyCnt = 100;
-		final int valCnt = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		setOutput(this.output);
-		
-		final CollectorMapDriver<Record, Record> testDriver = new CollectorMapDriver<Record, Record>();
-		
-		try {
-			testDriver(testDriver, MockMapStub.class);
-		} catch (Exception e) {
-			LOG.debug("Exception while running the test driver.", e);
-			Assert.fail("Invoke method caused exception.");
-		}
-		
-		Assert.assertEquals("Wrong result set size.", keyCnt*valCnt, this.output.getNumberOfRecords());
-	}
-	
-	@Test
-	public void testFailingMapTask() {
-		final int keyCnt = 100;
-		final int valCnt = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		setOutput(new DiscardingOutputCollector<Record>());
-		
-		final CollectorMapDriver<Record, Record> testTask = new CollectorMapDriver<Record, Record>();
-		try {
-			testDriver(testTask, MockFailingMapStub.class);
-			Assert.fail("Function exception was not forwarded.");
-		} catch (ExpectedTestException e) {
-			// good!
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Exception in test.");
-		}
-	}
-	
-	@Test
-	public void testCancelMapTask() {
-		addInput(new InfiniteInputIterator());
-		setOutput(new DiscardingOutputCollector<Record>());
-		
-		final CollectorMapDriver<Record, Record> testTask = new CollectorMapDriver<Record, Record>();
-		
-		final AtomicBoolean success = new AtomicBoolean(false);
-		
-		final Thread taskRunner = new Thread() {
-			@Override
-			public void run() {
-				try {
-					testDriver(testTask, MockMapStub.class);
-					success.set(true);
-				} catch (Exception ie) {
-					ie.printStackTrace();
-				}
-			}
-		};
-		taskRunner.start();
-		
-		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
-		tct.start();
-		
-		try {
-			tct.join();
-			taskRunner.join();		
-		} catch(InterruptedException ie) {
-			Assert.fail("Joining threads failed");
-		}
-		
-		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
-	}
-	
-	public static class MockMapStub extends MapFunction {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			out.collect(record);
-		}
-		
-	}
-	
-	public static class MockFailingMapStub extends MapFunction {
-		private static final long serialVersionUID = 1L;
-		
-		private int cnt = 0;
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			if (++this.cnt >= 10) {
-				throw new ExpectedTestException();
-			}
-			out.collect(record);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
deleted file mode 100644
index 6f7fb21..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.junit.Assert;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
-	
-	private static final long HASH_MEM = 4*1024*1024;
-	
-	private static final long SORT_MEM = 3*1024*1024;
-	
-	private static final long BNLJN_MEM = 10 * PAGE_SIZE;
-
-	private final double bnljn_frac;
-
-	private final double hash_frac;
-
-	@SuppressWarnings("unchecked")
-	private final RecordComparator comparator1 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
-	
-	@SuppressWarnings("unchecked")
-	private final RecordComparator comparator2 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
-	
-	private final CountingOutputCollector output = new CountingOutputCollector();
-	
-	public MatchTaskExternalITCase(ExecutionConfig config) {
-		super(config, HASH_MEM, 2, SORT_MEM);
-		bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
-		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
-	}
-	
-	@Test
-	public void testExternalSort1MatchTask() {
-		final int keyCnt1 = 16384*4;
-		final int valCnt1 = 2;
-		
-		final int keyCnt2 = 8192;
-		final int valCnt2 = 4*2;
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		setOutput(this.output);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
-	}
-	
-	@Test
-	public void testExternalHash1MatchTask() {
-		final int keyCnt1 = 32768;
-		final int valCnt1 = 8;
-		
-		final int keyCnt2 = 65536;
-		final int valCnt2 = 8;
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.output);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
-	}
-	
-	@Test
-	public void testExternalHash2MatchTask() {
-		final int keyCnt1 = 32768;
-		final int valCnt1 = 8;
-		
-		final int keyCnt2 = 65536;
-		final int valCnt2 = 8;
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.output);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
-	}
-	
-	public static final class MockMatchStub extends JoinFunction {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
-			out.collect(value1);
-		}
-	}
-}