You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/13 17:11:57 UTC

[2/3] flink git commit: [FLINK-1085] [tests] Make the combiner tests generic. Add more coverage for oversized records.

[FLINK-1085] [tests] Make the combiner tests generic. Add more coverage for oversized records.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01c74338
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01c74338
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01c74338

Branch: refs/heads/master
Commit: 01c74338ff44ea7f3735a7eb94b2ce01ababc505
Parents: 7271881
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jul 13 15:12:04 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jul 13 16:29:38 2015 +0200

----------------------------------------------------------------------
 .../operators/GroupReduceCombineDriver.java     |  60 +--
 .../operators/CombineTaskExternalITCase.java    |  87 +++-
 .../runtime/operators/CombineTaskTest.java      | 354 ++++++++--------
 .../operators/CombinerOversizedRecordsTest.java | 236 +++++++++++
 .../operators/testutils/DelayingIterator.java   |  59 +++
 .../testutils/InfiniteIntTupleIterator.java     |  38 ++
 .../runtime/operators/testutils/TestData.java   |   2 +-
 .../testutils/UnaryOperatorTestBase.java        | 410 +++++++++++++++++++
 .../UniformIntStringTupleGenerator.java         |  77 ++++
 .../testutils/UniformIntTupleGenerator.java     |  75 ++++
 .../operators/testutils/UnionIterator.java      |  16 +-
 11 files changed, 1199 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index c426295..2bf778e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -16,13 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -33,10 +29,15 @@ import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
 import org.apache.flink.runtime.operators.sort.InMemorySorter;
 import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
 import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -44,11 +45,12 @@ import java.util.List;
  * the user supplied a RichGroupReduceFunction with a combine method. The combining is performed in memory with a
  * lazy approach which only combines elements which currently fit in the sorter. This may lead to a partial solution.
  * In the case of the RichGroupReduceFunction this partial result is transformed into a proper deterministic result.
- * The CombineGroup uses the GroupCombineFunction interface which allows to combine values of type <IN> to any type
- * of type <OUT>. In contrast, the RichGroupReduceFunction requires the combine method to have the same input and
- * output type to be able to reduce the elements after the combine from <IN> to <OUT>.
+ * The CombineGroup uses the GroupCombineFunction interface which allows to combine values of type {@code IN} 
+ * to any type of type {@code OUT}. In contrast, the RichGroupReduceFunction requires the combine method
+ * to have the same input and output type to be able to reduce the elements after the combine from 
+ * {@code IN} to {@code OUT}.
  *
- * The CombineTask uses a combining iterator over its input. The output of the iterator is emitted.
+ * <p>The CombineTask uses a combining iterator over its input. The output of the iterator is emitted.</p>
  * 
  * @param <IN> The data type consumed by the combiner.
  * @param <OUT> The data type produced by the combiner.
@@ -67,8 +69,6 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 	private GroupCombineFunction<IN, OUT> combiner;
 
 	private TypeSerializer<IN> serializer;
-
-	private TypeComparator<IN> sortingComparator;
 	
 	private TypeComparator<IN> groupingComparator;
 
@@ -78,7 +78,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 
 	private Collector<OUT> output;
 
-	private long oversizedRecordCount = 0L;
+	private long oversizedRecordCount;
 
 	private volatile boolean running = true;
 
@@ -112,9 +112,8 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 	@Override
 	public void prepare() throws Exception {
 		final DriverStrategy driverStrategy = this.taskContext.getTaskConfig().getDriverStrategy();
-		if(driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){
-			throw new Exception("Invalid strategy " + driverStrategy + " for " +
-					"group reduce combinder.");
+		if (driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){
+			throw new Exception("Invalid strategy " + driverStrategy + " for group reduce combiner.");
 		}
 
 		this.memManager = this.taskContext.getMemoryManager();
@@ -122,7 +121,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 
 		final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0);
 		this.serializer = serializerFactory.getSerializer();
-		this.sortingComparator = this.taskContext.getDriverComparator(0);
+
+		final TypeComparator<IN> sortingComparator = this.taskContext.getDriverComparator(0);
+		
 		this.groupingComparator = this.taskContext.getDriverComparator(1);
 		this.combiner = this.taskContext.getStub();
 		this.output = this.taskContext.getOutputCollector();
@@ -131,12 +132,12 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 				numMemoryPages);
 
 		// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
-		if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
+		if (sortingComparator.supportsSerializationWithKeyNormalization() &&
 				this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
 		{
-			this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory);
+			this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, sortingComparator, memory);
 		} else {
-			this.sorter = new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory);
+			this.sorter = new NormalizedKeySorter<IN>(this.serializer, sortingComparator.duplicate(), memory);
 		}
 
 		ExecutionConfig executionConfig = taskContext.getExecutionConfig();
@@ -171,10 +172,14 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 
 			// write the value again
 			if (!this.sorter.write(value)) {
+				
 				++oversizedRecordCount;
-				LOG.debug("Cannot write record to fresh sort buffer. Record too large. Oversized record count: {}", oversizedRecordCount);
-				// simply forward the record
-				this.output.collect((OUT)value);
+				LOG.debug("Cannot write record to fresh sort buffer, record is too large. " +
+								"Oversized record count: {}", oversizedRecordCount);
+				
+				// simply forward the record. We need to pass it through the combine function to convert it
+				Iterable<IN> input = Collections.singleton(value);
+				this.combiner.combine(input, this.output);
 			}
 		}
 
@@ -210,7 +215,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 
 	@Override
 	public void cleanup() throws Exception {
-		if(this.sorter != null) {
+		if (this.sorter != null) {
 			this.memManager.release(this.sorter.dispose());
 		}
 	}
@@ -218,8 +223,17 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 	@Override
 	public void cancel() {
 		this.running = false;
-		if(this.sorter != null) {
+		if (this.sorter != null) {
 			this.memManager.release(this.sorter.dispose());
 		}
 	}
+
+	/**
+	 * Gets the number of oversized records handled by this combiner.
+	 * 
+	 * @return The number of oversized records handled by this combiner.
+	 */
+	public long getOversizedRecordCount() {
+		return oversizedRecordCount;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index d957fa1..4905e57 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -16,17 +16,18 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.runtime.operators.CombineTaskTest.MockCombiningReduceStub;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
@@ -45,7 +46,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
 
 	public CombineTaskExternalITCase(ExecutionConfig config) {
 		super(config, COMBINE_MEM, 0);
@@ -161,4 +162,84 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		
 		this.outList.clear();
 	}
+	
+	// ------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+
+	@ReduceOperator.Combinable
+	public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
+		private static final long serialVersionUID = 1L;
+
+		private final IntValue theInteger = new IntValue();
+
+		@Override
+		public void reduce(Iterable<Record> records, Collector<Record> out) {
+			Record element = null;
+			int sum = 0;
+
+			for (Record next : records) {
+				element = next;
+				element.getField(1, this.theInteger);
+
+				sum += this.theInteger.getValue();
+			}
+			this.theInteger.setValue(sum);
+			element.setField(1, this.theInteger);
+			out.collect(element);
+		}
+
+		@Override
+		public void combine(Iterable<Record> records, Collector<Record> out) throws Exception {
+			reduce(records, out);
+		}
+	}
+
+	@ReduceOperator.Combinable
+	public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
+		private static final long serialVersionUID = 1L;
+
+		private int cnt = 0;
+
+		private final IntValue key = new IntValue();
+		private final IntValue value = new IntValue();
+		private final IntValue combineValue = new IntValue();
+
+		@Override
+		public void reduce(Iterable<Record> records, Collector<Record> out) {
+			Record element = null;
+			int sum = 0;
+
+			for (Record next : records) {
+				element = next;
+				element.getField(1, this.value);
+
+				sum += this.value.getValue();
+			}
+			element.getField(0, this.key);
+			this.value.setValue(sum - this.key.getValue());
+			element.setField(1, this.value);
+			out.collect(element);
+		}
+
+		@Override
+		public void combine(Iterable<Record> records, Collector<Record> out) {
+			Record element = null;
+			int sum = 0;
+
+			for (Record next : records) {
+				element = next;
+				element.getField(1, this.combineValue);
+
+				sum += this.combineValue.getValue();
+			}
+
+			if (++this.cnt >= 10) {
+				throw new ExpectedTestException();
+			}
+
+			this.combineValue.setValue(sum);
+			element.setField(1, this.combineValue);
+			out.collect(element);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 7772151..932e746 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -18,254 +18,244 @@
 
 package org.apache.flink.runtime.operators;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.operators.testutils.*;
-import org.junit.Assert;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.operators.testutils.DelayingIterator;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.InfiniteIntTupleIterator;
+import org.apache.flink.runtime.operators.testutils.UnaryOperatorTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
+
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+
 import org.junit.Test;
 
-public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Record, ?>>
-{
+import java.util.ArrayList;
+
+import static org.junit.Assert.*;
+
+public class CombineTaskTest
+		extends UnaryOperatorTestBase<RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, 
+		Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+	
 	private static final long COMBINE_MEM = 3 * 1024 * 1024;
 
 	private final double combine_frac;
 	
-	private final ArrayList<Record> outList = new ArrayList<Record>();
+	private final ArrayList<Tuple2<Integer, Integer>> outList = new ArrayList<Tuple2<Integer, Integer>>();
+
+	private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<Tuple2<Integer, Integer>>(
+			(Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class,
+			new TypeSerializer<?>[] { IntSerializer.INSTANCE, IntSerializer.INSTANCE });
 	
-	@SuppressWarnings("unchecked")
-	private final RecordComparator comparator = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+	private final TypeComparator<Tuple2<Integer, Integer>> comparator = new TupleComparator<Tuple2<Integer, Integer>>(
+			new int[]{0},
+			new TypeComparator<?>[] { new IntComparator(true) },
+			new TypeSerializer<?>[] { IntSerializer.INSTANCE });
 
+	
 	public CombineTaskTest(ExecutionConfig config) {
 		super(config, COMBINE_MEM, 0);
 
-		combine_frac = (double)COMBINE_MEM/this.getMemoryManager().getMemorySize();
+		combine_frac = (double)COMBINE_MEM / this.getMemoryManager().getMemorySize();
 	}
 	
+	
 	@Test
 	public void testCombineTask() {
-		int keyCnt = 100;
-		int valCnt = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addDriverComparator(this.comparator);
-		addDriverComparator(this.comparator);
-		setOutput(this.outList);
-
-		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-		getTaskConfig().setRelativeMemoryDriver(combine_frac);
-		getTaskConfig().setFilehandlesDriver(2);
-		
-		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
-		
 		try {
+			int keyCnt = 100;
+			int valCnt = 20;
+			
+			setInput(new UniformIntTupleGenerator(keyCnt, valCnt, false), serializer);
+			addDriverComparator(this.comparator);
+			addDriverComparator(this.comparator);
+			setOutput(this.outList, serializer);
+	
+			getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+			getTaskConfig().setRelativeMemoryDriver(combine_frac);
+			getTaskConfig().setFilehandlesDriver(2);
+			
+			final GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask =
+					new GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>();
+			
 			testDriver(testTask, MockCombiningReduceStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Invoke method caused exception.");
-		}
-		
-		int expSum = 0;
-		for (int i = 1;i < valCnt; i++) {
-			expSum += i;
-		}
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+keyCnt, this.outList.size() == keyCnt);
-		
-		for(Record record : this.outList) {
-			Assert.assertTrue("Incorrect result", record.getField(1, IntValue.class).getValue() == expSum);
+			
+			int expSum = 0;
+			for (int i = 1;i < valCnt; i++) {
+				expSum += i;
+			}
+			
+			assertTrue(this.outList.size() == keyCnt);
+			
+			for (Tuple2<Integer, Integer> record : this.outList) {
+				assertTrue(record.f1 == expSum);
+			}
+			
+			this.outList.clear();
 		}
-		
-		this.outList.clear();
-	}
-
-	@Test
-	public void testOversizedRecordCombineTask() {
-		int tenMil = 10000000;
-		Generator g = new Generator(561349061987311L, 1, tenMil);
-		//generate 10 records each of size 10MB
-		final TestData.GeneratorIterator gi = new TestData.GeneratorIterator(g, 10);
-		List<MutableObjectIterator<Record>> inputs = new ArrayList<MutableObjectIterator<Record>>();
-		inputs.add(gi);
-
-		addInput(new UnionIterator<Record>(inputs));
-		addDriverComparator(this.comparator);
-		addDriverComparator(this.comparator);
-		setOutput(this.outList);
-
-		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-		getTaskConfig().setRelativeMemoryDriver(combine_frac);
-		getTaskConfig().setFilehandlesDriver(2);
-
-		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
-
-		try {
-			testDriver(testTask, MockCombiningReduceStub.class);
-		} catch (Exception e) {
+		catch (Exception e) {
 			e.printStackTrace();
-			Assert.fail("Invoke method caused exception.");
+			fail(e.getMessage());
 		}
-
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+10, this.outList.size() == 10);
-
-		this.outList.clear();
 	}
 
 	@Test
 	public void testFailingCombineTask() {
-		int keyCnt = 100;
-		int valCnt = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addDriverComparator(this.comparator);
-		addDriverComparator(this.comparator);
-		setOutput(new DiscardingOutputCollector<Record>());
-		
-		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-		getTaskConfig().setRelativeMemoryDriver(combine_frac);
-		getTaskConfig().setFilehandlesDriver(2);
-		
-		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
-		
 		try {
-			testDriver(testTask, MockFailingCombiningReduceStub.class);
-			Assert.fail("Exception not forwarded.");
-		} catch (ExpectedTestException etex) {
-			// good!
-		} catch (Exception e) {
+			int keyCnt = 100;
+			int valCnt = 20;
+			
+			setInput(new UniformIntTupleGenerator(keyCnt, valCnt, false), serializer);
+			addDriverComparator(this.comparator);
+			addDriverComparator(this.comparator);
+			setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
+			
+			getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+			getTaskConfig().setRelativeMemoryDriver(combine_frac);
+			getTaskConfig().setFilehandlesDriver(2);
+			
+			final GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = 
+					new GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>();
+			
+			try {
+				testDriver(testTask, MockFailingCombiningReduceStub.class);
+				fail("Exception not forwarded.");
+			}
+			catch (ExpectedTestException etex) {
+				// good!
+			}
+		}
+		catch (Exception e) {
 			e.printStackTrace();
-			Assert.fail("Test failed due to an exception.");
+			fail(e.getMessage());
 		}
 	}
 
 	@Test
-	public void testCancelCombineTaskSorting()
-	{
-		addInput(new DelayingInfinitiveInputIterator(100));
-		addDriverComparator(this.comparator);
-		addDriverComparator(this.comparator);
-		setOutput(new DiscardingOutputCollector<Record>());
-		
-		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-		getTaskConfig().setRelativeMemoryDriver(combine_frac);
-		getTaskConfig().setFilehandlesDriver(2);
-		
-		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
-		
-		final AtomicBoolean success = new AtomicBoolean(false);
-		
-		Thread taskRunner = new Thread() {
-			@Override
-			public void run() {
-				try {
-					testDriver(testTask, MockFailingCombiningReduceStub.class);
-					success.set(true);
-				} catch (Exception ie) {
-					ie.printStackTrace();
-				}
-			}
-		};
-		taskRunner.start();
-		
-		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
-		tct.start();
-		
+	public void testCancelCombineTaskSorting()  {
 		try {
-			tct.join();
-			taskRunner.join();		
-		} catch(InterruptedException ie) {
-			Assert.fail("Joining threads failed");
+			MutableObjectIterator<Tuple2<Integer, Integer>> slowInfiniteInput =
+					new DelayingIterator<Tuple2<Integer, Integer>>(new InfiniteIntTupleIterator(), 1);
+			
+			setInput(slowInfiniteInput, serializer);
+			addDriverComparator(this.comparator);
+			addDriverComparator(this.comparator);
+			setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
+			
+			getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+			getTaskConfig().setRelativeMemoryDriver(combine_frac);
+			getTaskConfig().setFilehandlesDriver(2);
+			
+			final GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = 
+					new GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>();
+			
+			Thread taskRunner = new Thread() {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockFailingCombiningReduceStub.class);
+					}
+					catch (Exception e) {
+						// exceptions may happen during canceling
+					}
+				}
+			};
+			taskRunner.start();
+			
+			// give the task some time
+			Thread.sleep(500);
+			
+			// cancel
+			testTask.cancel();
+			
+			// make sure it reacts to the canceling in some time
+			taskRunner.join(5000);
+			
+			assertFalse("Task did not cancel properly within in 5 seconds.", taskRunner.isAlive());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
 		}
-		
-		Assert.assertTrue("Exception was thrown despite proper canceling.", success.get());
 	}
 	
-	@Combinable
-	public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
+	// ------------------------------------------------------------------------
+	//  Test Combiners
+	// ------------------------------------------------------------------------
+	
+	@RichGroupReduceFunction.Combinable
+	public static class MockCombiningReduceStub extends 
+			RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>
+	{
 		private static final long serialVersionUID = 1L;
-		
-		private final IntValue theInteger = new IntValue();
 
 		@Override
-		public void reduce(Iterable<Record> records, Collector<Record> out) {
-			Record element = null;
+		public void reduce(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) {
+			int key = 0;
 			int sum = 0;
-			
-			for (Record next : records) {
-				element = next;
-				element.getField(1, this.theInteger);
-				
-				sum += this.theInteger.getValue();
+
+			for (Tuple2<Integer, Integer> next : records) {
+				key = next.f0;
+				sum += next.f1;
 			}
-			this.theInteger.setValue(sum);
-			element.setField(1, this.theInteger);
-			out.collect(element);
+			
+			out.collect(new Tuple2<Integer, Integer>(key, sum));
 		}
 		
 		@Override
-		public void combine(Iterable<Record> records, Collector<Record> out) throws Exception {
+		public void combine(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) {
 			reduce(records, out);
 		}
 	}
 	
-	@Combinable
-	public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
+	@RichGroupReduceFunction.Combinable
+	public static final class MockFailingCombiningReduceStub extends 
+			RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>
+	{
 		private static final long serialVersionUID = 1L;
 		
-		private int cnt = 0;
-		
-		private final IntValue key = new IntValue();
-		private final IntValue value = new IntValue();
-		private final IntValue combineValue = new IntValue();
+		private int cnt;
 
 		@Override
-		public void reduce(Iterable<Record> records, Collector<Record> out) {
-			Record element = null;
+		public void reduce(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) {
+			int key = 0;
 			int sum = 0;
 			
-			for (Record next : records) {
-				element = next;
-				element.getField(1, this.value);
-				
-				sum += this.value.getValue();
+			for (Tuple2<Integer, Integer> next : records) {
+				key = next.f0;
+				sum += next.f1;
 			}
-			element.getField(0, this.key);
-			this.value.setValue(sum - this.key.getValue());
-			element.setField(1, this.value);
-			out.collect(element);
+			
+			int resultValue = sum - key;
+			out.collect(new Tuple2<Integer, Integer>(key, resultValue));
 		}
 		
 		@Override
-		public void combine(Iterable<Record> records, Collector<Record> out) {
-			Record element = null;
+		public void combine(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) {
+			int key = 0;
 			int sum = 0;
-			
-			for (Record next : records) {
-				element = next;
-				element.getField(1, this.combineValue);
-				
-				sum += this.combineValue.getValue();
+
+			for (Tuple2<Integer, Integer> next : records) {
+				key = next.f0;
+				sum += next.f1;
 			}
 			
 			if (++this.cnt >= 10) {
 				throw new ExpectedTestException();
 			}
-			
-			this.combineValue.setValue(sum);
-			element.setField(1, this.combineValue);
-			out.collect(element);
+
+			int resultValue = sum - key;
+			out.collect(new Tuple2<Integer, Integer>(key, resultValue));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java
new file mode 100644
index 0000000..58d1676
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.operators.testutils.UnaryOperatorTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test that checks how the combiner handles very large records that are too large to be written into
+ * a fresh sort buffer.
+ */
+public class CombinerOversizedRecordsTest
+		extends UnaryOperatorTestBase<GroupCombineFunction<Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>>,
+		Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>> {
+
+	private static final long COMBINE_MEM = 3 * 1024 * 1024;
+
+	private final double combine_frac;
+
+	private final ArrayList<Tuple3<Integer, Double, String>> outList = new ArrayList<Tuple3<Integer, Double, String>>();
+
+	private final TypeSerializer<Tuple3<Integer, Integer, String>> serializer = 
+			new TupleSerializer<Tuple3<Integer, Integer, String>>(
+				(Class<Tuple3<Integer, Integer, String>>) (Class<?>) Tuple3.class,
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE, IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+
+	private final TypeSerializer<Tuple3<Integer, Double, String>> outSerializer = 
+			new TupleSerializer<Tuple3<Integer, Double, String>>(
+					(Class<Tuple3<Integer, Double, String>>) (Class<?>) Tuple3.class,
+					new TypeSerializer<?>[] { IntSerializer.INSTANCE, DoubleSerializer.INSTANCE, StringSerializer.INSTANCE });
+
+	private final TypeComparator<Tuple3<Integer, Integer, String>> comparator = 
+			new TupleComparator<Tuple3<Integer, Integer, String>>(
+				new int[] { 0 },
+				new TypeComparator<?>[] { new IntComparator(true) },
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE });
+	
+	// ------------------------------------------------------------------------
+	
+	public CombinerOversizedRecordsTest(ExecutionConfig config) {
+		super(config, COMBINE_MEM, 0);
+		combine_frac = (double)COMBINE_MEM / getMemoryManager().getMemorySize();
+	}
+
+	@Test
+	public void testOversizedRecordCombineTask() {
+		try {
+			final int keyCnt = 100;
+			final int valCnt = 20;
+			
+			// create a long heavy string payload
+			StringBuilder bld = new StringBuilder(10 * 1024 * 1024);
+			Random rnd = new Random();
+			
+			for (int i = 0; i < 10000000; i++) {
+				bld.append((char) (rnd.nextInt(26) + 'a'));
+			}
+			
+			String longString = bld.toString();
+			bld = null;
+
+			// construct the input as a union of
+			// 1) long string
+			// 2) some random values
+			// 3) long string
+			// 4) random values
+			// 5) long string
+			
+			// random values 1
+			MutableObjectIterator<Tuple2<Integer, Integer>> gen1 = 
+				new UniformIntTupleGenerator(keyCnt, valCnt, false);
+
+			// random values 2
+			MutableObjectIterator<Tuple2<Integer, Integer>> gen2 =
+					new UniformIntTupleGenerator(keyCnt, valCnt, false);
+
+			@SuppressWarnings("unchecked")
+			MutableObjectIterator<Tuple3<Integer, Integer, String>> input = 
+					new UnionIterator<Tuple3<Integer, Integer, String>>(
+							new SingleValueIterator<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(-1, -1, longString)),
+							new StringIteratorDecorator(gen1),
+							new SingleValueIterator<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(-1, -1, longString)),
+							new StringIteratorDecorator(gen2),
+							new SingleValueIterator<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(-1, -1, longString)));
+			
+			setInput(input, serializer);
+			addDriverComparator(this.comparator);
+			addDriverComparator(this.comparator);
+			setOutput(this.outList, this.outSerializer);
+	
+			getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+			getTaskConfig().setRelativeMemoryDriver(combine_frac);
+			getTaskConfig().setFilehandlesDriver(2);
+	
+			GroupReduceCombineDriver<Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>> testTask = 
+					new GroupReduceCombineDriver<Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>>();
+			
+			testDriver(testTask, TestCombiner.class);
+
+			assertEquals(3, testTask.getOversizedRecordCount());
+			assertTrue(keyCnt + 3 == outList.size() || 2*keyCnt + 3 == outList.size());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	
+	public static final class TestCombiner 
+			implements GroupCombineFunction<Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>>
+	{
+		private static final long serialVersionUID = 1L;
+		
+
+		@Override
+		public void combine(Iterable<Tuple3<Integer, Integer, String>> values,
+							Collector<Tuple3<Integer, Double, String>> out)
+		{
+			int key = 0;
+			int sum = 0;
+			String someString = null;
+
+			for (Tuple3<Integer, Integer, String> next : values) {
+				key = next.f0;
+				sum += next.f1;
+				someString = next.f2;
+			}
+
+			out.collect(new Tuple3<Integer, Double, String>(key, (double) sum, someString));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	
+	private static class StringIteratorDecorator implements MutableObjectIterator<Tuple3<Integer, Integer, String>> {
+
+		private final MutableObjectIterator<Tuple2<Integer, Integer>> input;
+
+		private StringIteratorDecorator(MutableObjectIterator<Tuple2<Integer, Integer>> input) {
+			this.input = input;
+		}
+
+		@Override
+		public Tuple3<Integer, Integer, String> next(Tuple3<Integer, Integer, String> reuse) throws IOException {
+			Tuple2<Integer, Integer> next = input.next();
+			if (next == null) {
+				return null;
+			}
+			else {
+				reuse.f0 = next.f0;
+				reuse.f1 = next.f1;
+				reuse.f2 = "test string";
+				return reuse;
+			}
+		}
+
+		@Override
+		public Tuple3<Integer, Integer, String> next() throws IOException {
+			Tuple2<Integer, Integer> next = input.next();
+			if (next == null) {
+				return null;
+			}
+			else {
+				return new Tuple3<Integer, Integer, String>(next.f0, next.f1, "test string");
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class SingleValueIterator<T> implements MutableObjectIterator<T> {
+		
+		private final T value;
+		
+		private boolean pending = true;
+
+		private SingleValueIterator(T value) {
+			this.value = value;
+		}
+
+		@Override
+		public T next(T reuse) {
+			return next();
+		}
+
+		@Override
+		public T next() {
+			if (pending) {
+				pending = false;
+				return value;
+			} else {
+				return null;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java
new file mode 100644
index 0000000..b3d53c7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+
+public class DelayingIterator<T> implements MutableObjectIterator<T> {
+
+	private final MutableObjectIterator<T> iterator;
+	private final int delay;
+	
+	
+	public DelayingIterator(MutableObjectIterator<T> iterator, int delay) {
+		this.iterator = iterator;
+		this.delay = delay;
+	}
+	
+	@Override
+	public T next(T reuse) throws IOException {
+		try {
+			Thread.sleep(delay);
+		}
+		catch (InterruptedException e) {
+			// ignore, but restore interrupted state
+			Thread.currentThread().interrupt();
+		}
+		return iterator.next(reuse);
+	}
+
+	@Override
+	public T next() throws IOException {
+		try {
+			Thread.sleep(delay);
+		}
+		catch (InterruptedException e) {
+			// ignore, but restore interrupted state
+			Thread.currentThread().interrupt();
+		}
+		return iterator.next();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java
new file mode 100644
index 0000000..ba2181b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * A simple iterator that returns an infinite amount of (0, 0) tuples.
+ */
+public class InfiniteIntTupleIterator implements MutableObjectIterator<Tuple2<Integer, Integer>> {
+	
+	@Override
+	public Tuple2<Integer, Integer> next(Tuple2<Integer, Integer> reuse) {
+		return next();
+	}
+
+	@Override
+	public Tuple2<Integer, Integer> next() {
+		return new Tuple2<Integer, Integer>(0, 0);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
index 400e798..fd34a3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
@@ -226,7 +226,7 @@ public final class TestData {
 				length = valueLength - random.nextInt(valueLength / 3);
 			}
 
-			StringBuffer sb = new StringBuffer();
+			StringBuilder sb = new StringBuilder();
 			for (int i = 0; i < length; i++) {
 				sb.append(alpha[random.nextInt(alpha.length)]);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
new file mode 100644
index 0000000..1e25bab
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.runtime.operators.PactTaskContext;
+import org.apache.flink.runtime.operators.ResettablePactDriver;
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactTaskContext<S, OUT> {
+	
+	protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;
+	
+	protected static final int PAGE_SIZE = 32 * 1024; 
+	
+	private final IOManager ioManager;
+	
+	private final MemoryManager memManager;
+	
+	private MutableObjectIterator<IN> input;
+	
+	private TypeSerializer<IN> inputSerializer;
+	
+	private List<TypeComparator<IN>> comparators;
+	
+	private UnilateralSortMerger<IN> sorter;
+	
+	private final AbstractInvokable owner;
+
+	private final TaskConfig taskConfig;
+	
+	protected final long perSortMem;
+
+	protected final double perSortFractionMem;
+	
+	private Collector<OUT> output;
+	
+	protected int numFileHandles;
+	
+	private S stub;
+	
+	private PactDriver<S, OUT> driver;
+	
+	private volatile boolean running;
+
+	private ExecutionConfig executionConfig;
+	
+	protected UnaryOperatorTestBase(ExecutionConfig executionConfig, long memory, int maxNumSorters) {
+		this(executionConfig, memory, maxNumSorters, DEFAULT_PER_SORT_MEM);
+	}
+	
+	protected UnaryOperatorTestBase(ExecutionConfig executionConfig, long memory, int maxNumSorters, long perSortMemory) {
+		if (memory < 0 || maxNumSorters < 0 || perSortMemory < 0) {
+			throw new IllegalArgumentException();
+		}
+		
+		final long totalMem = Math.max(memory, 0) + (Math.max(maxNumSorters, 0) * perSortMemory);
+		
+		this.perSortMem = perSortMemory;
+		this.perSortFractionMem = (double)perSortMemory/totalMem;
+		this.ioManager = new IOManagerAsync();
+		this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem,1) : null;
+		this.owner = new DummyInvokable();
+
+		Configuration config = new Configuration();
+		this.taskConfig = new TaskConfig(config);
+
+		this.executionConfig = executionConfig;
+		this.comparators = new ArrayList<TypeComparator<IN>>(2);
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> getConfigurations() {
+		ExecutionConfig withReuse = new ExecutionConfig();
+		withReuse.enableObjectReuse();
+
+		ExecutionConfig withoutReuse = new ExecutionConfig();
+		withoutReuse.disableObjectReuse();
+
+		Object[] a = { withoutReuse };
+		Object[] b = { withReuse };
+		return Arrays.asList(a, b);
+	}
+
+	public void setInput(MutableObjectIterator<IN> input, TypeSerializer<IN> serializer) {
+		this.input = input;
+		this.inputSerializer = serializer;
+		this.sorter = null;
+	}
+	
+	public void addInputSorted(MutableObjectIterator<IN> input,
+								TypeSerializer<IN> serializer,
+								TypeComparator<IN> comp) throws Exception
+	{
+		this.input = null;
+		this.inputSerializer = serializer;
+		this.sorter = new UnilateralSortMerger<IN>(
+				this.memManager, this.ioManager, input, this.owner,
+				this.<IN>getInputSerializer(0),
+				comp,
+				this.perSortFractionMem, 32, 0.8f);
+	}
+	
+	public void addDriverComparator(TypeComparator<IN> comparator) {
+		this.comparators.add(comparator);
+	}
+
+	public void setOutput(Collector<OUT> output) {
+		this.output = output;
+	}
+	public void setOutput(List<OUT> output, TypeSerializer<OUT> outSerializer) {
+		this.output = new ListOutputCollector<OUT>(output, outSerializer);
+	}
+	
+	public int getNumFileHandlesForSort() {
+		return numFileHandles;
+	}
+
+	
+	public void setNumFileHandlesForSort(int numFileHandles) {
+		this.numFileHandles = numFileHandles;
+	}
+
+	@SuppressWarnings("rawtypes")
+	public void testDriver(PactDriver driver, Class stubClass) throws Exception {
+		testDriverInternal(driver, stubClass);
+	}
+
+	@SuppressWarnings({"unchecked","rawtypes"})
+	public void testDriverInternal(PactDriver driver, Class stubClass) throws Exception {
+
+		this.driver = driver;
+		driver.setup(this);
+
+		this.stub = (S)stubClass.newInstance();
+
+		// regular running logic
+		this.running = true;
+		boolean stubOpen = false;
+
+		try {
+			// run the data preparation
+			try {
+				driver.prepare();
+			}
+			catch (Throwable t) {
+				throw new Exception("The data preparation caused an error: " + t.getMessage(), t);
+			}
+
+			// open stub implementation
+			try {
+				FunctionUtils.openFunction(this.stub, getTaskConfig().getStubParameters());
+				stubOpen = true;
+			}
+			catch (Throwable t) {
+				throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
+			}
+
+			// run the user code
+			driver.run();
+
+			// close. We close here such that a regular close throwing an exception marks a task as failed.
+			if (this.running) {
+				FunctionUtils.closeFunction (this.stub);
+				stubOpen = false;
+			}
+
+			this.output.close();
+		}
+		catch (Exception ex) {
+			// close the input, but do not report any exceptions, since we already have another root cause
+			if (stubOpen) {
+				try {
+					FunctionUtils.closeFunction(this.stub);
+				}
+				catch (Throwable t) {
+					// ignore
+				}
+			}
+
+			// if resettable driver invoke tear-down
+			if (this.driver instanceof ResettablePactDriver) {
+				final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
+				try {
+					resDriver.teardown();
+				} catch (Throwable t) {
+					throw new Exception("Error while shutting down an iterative operator: " + t.getMessage(), t);
+				}
+			}
+
+			// drop exception, if the task was canceled
+			if (this.running) {
+				throw ex;
+			}
+
+		}
+		finally {
+			driver.cleanup();
+		}
+	}
+
+	@SuppressWarnings({"unchecked","rawtypes"})
+	public void testResettableDriver(ResettablePactDriver driver, Class stubClass, int iterations) throws Exception {
+		driver.setup(this);
+		
+		for (int i = 0; i < iterations; i++) {
+			if (i == 0) {
+				driver.initialize();
+			}
+			else {
+				driver.reset();
+			}
+			testDriver(driver, stubClass);
+		}
+		
+		driver.teardown();
+	}
+	
+	public void cancel() throws Exception {
+		this.running = false;
+		this.driver.cancel();
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public TaskConfig getTaskConfig() {
+		return this.taskConfig;
+	}
+
+	@Override
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+	
+	@Override
+	public ClassLoader getUserCodeClassLoader() {
+		return getClass().getClassLoader();
+	}
+
+	@Override
+	public IOManager getIOManager() {
+		return this.ioManager;
+	}
+	
+	@Override
+	public MemoryManager getMemoryManager() {
+		return this.memManager;
+	}
+
+	@Override
+	public <X> MutableObjectIterator<X> getInput(int index) {
+		MutableObjectIterator<IN> in = this.input;
+		if (in == null) {
+			// waiting from sorter
+			try {
+				in = this.sorter.getIterator();
+			}
+			catch (InterruptedException e) {
+				throw new RuntimeException("Interrupted");
+			}
+			this.input = in;
+		}
+		
+		@SuppressWarnings("unchecked")
+		MutableObjectIterator<X> input = (MutableObjectIterator<X>) this.input;
+		return input;
+	}
+
+	@Override
+	public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
+		if (index != 0) {
+			throw new IllegalArgumentException();
+		}
+		
+		@SuppressWarnings("unchecked")
+		TypeSerializer<X> ser = (TypeSerializer<X>) inputSerializer;
+		return new RuntimeSerializerFactory<X>(ser, (Class<X>) ser.createInstance().getClass());
+	}
+
+	@Override
+	public <X> TypeComparator<X> getDriverComparator(int index) {
+		@SuppressWarnings("unchecked")
+		TypeComparator<X> comparator = (TypeComparator<X>) this.comparators.get(index);
+		return comparator;
+	}
+
+	@Override
+	public S getStub() {
+		return this.stub;
+	}
+
+	@Override
+	public Collector<OUT> getOutputCollector() {
+		return this.output;
+	}
+
+	@Override
+	public AbstractInvokable getOwningNepheleTask() {
+		return this.owner;
+	}
+
+	@Override
+	public String formatLogString(String message) {
+		return "Driver Tester: " + message;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@After
+	public void shutdownAll() throws Exception {
+		// 1st, shutdown sorters
+		if (this.sorter != null) {
+			sorter.close();
+		}
+		
+		// 2nd, shutdown I/O
+		this.ioManager.shutdown();
+		Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown());
+
+		// last, verify all memory is returned and shutdown mem manager
+		MemoryManager memMan = getMemoryManager();
+		if (memMan != null) {
+			Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty());
+			memMan.shutdown();
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class ListOutputCollector<OUT> implements Collector<OUT> {
+		
+		private final List<OUT> output;
+		private final TypeSerializer<OUT> serializer;
+		
+		public ListOutputCollector(List<OUT> outputList, TypeSerializer<OUT> serializer) {
+			this.output = outputList;
+			this.serializer = serializer;
+		}
+		
+
+		@Override
+		public void collect(OUT record) {
+			this.output.add(serializer.copy(record));
+		}
+
+		@Override
+		public void close() {}
+	}
+	
+	public static final class CountingOutputCollector<OUT> implements Collector<OUT> {
+		
+		private int num;
+
+		@Override
+		public void collect(OUT record) {
+			this.num++;
+		}
+
+		@Override
+		public void close() {}
+		
+		public int getNumberOfRecords() {
+			return this.num;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java
new file mode 100644
index 0000000..451cf9e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class UniformIntStringTupleGenerator implements MutableObjectIterator<Tuple2<Integer, String>> {
+
+	private final int numKeys;
+	private final int numVals;
+	
+	private int keyCnt;
+	private int valCnt;
+	
+	private boolean repeatKey;
+	
+	
+	public UniformIntStringTupleGenerator(int numKeys, int numVals, boolean repeatKey) {
+		this.numKeys = numKeys;
+		this.numVals = numVals;
+		this.repeatKey = repeatKey;
+	}
+	
+	@Override
+	public Tuple2<Integer, String> next(Tuple2<Integer, String> target) {
+		if (!repeatKey) {
+			if(valCnt >= numVals) {
+				return null;
+			}
+			
+			target.f0 = keyCnt++;
+			target.f1 = Integer.toBinaryString(valCnt);
+			
+			if(keyCnt == numKeys) {
+				keyCnt = 0;
+				valCnt++;
+			}
+		}
+		else {
+			if (keyCnt >= numKeys) {
+				return null;
+			}
+			
+			target.f0 = keyCnt;
+			target.f1 = Integer.toBinaryString(valCnt++);
+			
+			if (valCnt == numVals) {
+				valCnt = 0;
+				keyCnt++;
+			}
+		}
+		
+		return target;
+	}
+
+	@Override
+	public Tuple2<Integer, String> next() {
+		return next(new Tuple2<Integer, String>());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java
new file mode 100644
index 0000000..457b4ad
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class UniformIntTupleGenerator implements MutableObjectIterator<Tuple2<Integer, Integer>> {
+
+	private final int numKeys;
+	private final int numVals;
+	
+	private int keyCnt = 0;
+	private int valCnt = 0;
+	private boolean repeatKey;
+	
+	public UniformIntTupleGenerator(int numKeys, int numVals, boolean repeatKey) {
+		this.numKeys = numKeys;
+		this.numVals = numVals;
+		this.repeatKey = repeatKey;
+	}
+
+	@Override
+	public Tuple2<Integer, Integer> next(Tuple2<Integer, Integer> target) {
+		if (!repeatKey) {
+			if(valCnt >= numVals) {
+				return null;
+			}
+			
+			target.f0 = keyCnt++;
+			target.f1 = valCnt;
+			
+			if (keyCnt == numKeys) {
+				keyCnt = 0;
+				valCnt++;
+			}
+		}
+		else {
+			if (keyCnt >= numKeys) {
+				return null;
+			}
+			
+			target.f0 = keyCnt;
+			target.f1 = valCnt++;
+			
+			if (valCnt == numVals) {
+				valCnt = 0;
+				keyCnt++;
+			}
+		}
+		
+		return target;
+	}
+
+	@Override
+	public Tuple2<Integer, Integer> next() {
+		return next(new Tuple2<Integer, Integer>());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
index 3a76ebd..1127fca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
@@ -16,26 +16,30 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.testutils;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.util.MutableObjectIterator;
 
-
 /**
  * An iterator that returns the union of a given set of iterators.
  */
-public class UnionIterator<E> implements MutableObjectIterator<E>
-{
+public class UnionIterator<E> implements MutableObjectIterator<E> {
+	
 	private MutableObjectIterator<E> currentSource;
 	
 	private List<MutableObjectIterator<E>> nextSources;
+
+
+	public UnionIterator(MutableObjectIterator<E>... iterators) {
+		this(new ArrayList<MutableObjectIterator<E>>(Arrays.asList(iterators)));
+	}
 	
-	public UnionIterator(List<MutableObjectIterator<E>> sources)
-	{
+	public UnionIterator(List<MutableObjectIterator<E>> sources) {
 		this.currentSource = sources.remove(0);
 		this.nextSources = sources;
 	}