You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/15 15:07:24 UTC

[5/6] flink git commit: [FLINK-2479] Refactor runtime.operators.* tests

[FLINK-2479] Refactor runtime.operators.* tests

This closes #1160


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fbc18b96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fbc18b96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fbc18b96

Branch: refs/heads/master
Commit: fbc18b96a86bc54da189f713ed01370524558249
Parents: d9e32da
Author: zentol <s....@web.de>
Authored: Sun Sep 20 19:10:26 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Oct 15 11:30:42 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/io/disk/ChannelViewsTest.java | 136 +++--
 .../runtime/io/disk/SpillingBufferTest.java     | 141 ++---
 .../hash/NonReusingHashMatchIteratorITCase.java | 343 +++++------
 .../NonReusingReOpenableHashTableITCase.java    | 190 +++---
 .../hash/ReusingHashMatchIteratorITCase.java    | 335 +++++-----
 .../hash/ReusingReOpenableHashTableITCase.java  | 188 +++---
 .../CombiningUnilateralSortMergerITCase.java    | 144 +++--
 .../operators/sort/ExternalSortITCase.java      | 120 ++--
 .../operators/sort/MergeIteratorTest.java       |  98 ++-
 ...onReusingSortMergeCoGroupIteratorITCase.java | 128 ++--
 .../operators/sort/NormalizedKeySorterTest.java | 171 +++---
 .../ReusingSortMergeCoGroupIteratorITCase.java  | 129 ++--
 .../runtime/operators/testutils/TestData.java   | 608 ++++++++++---------
 .../operators/util/HashVsSortMiniBenchmark.java |  93 ++-
 14 files changed, 1374 insertions(+), 1450 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index a44916a..ba86131 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -36,11 +36,10 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -102,19 +101,19 @@ public class ChannelViewsTest
 	@Test
 	public void testWriteReadSmallRecords() throws Exception
 	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		final FileIOChannel.ID channel = this.ioManager.createChannel();
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
 		final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
-		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		this.memoryManager.release(outView.close());
 		
@@ -125,18 +124,18 @@ public class ChannelViewsTest
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(inView.close());
@@ -146,8 +145,9 @@ public class ChannelViewsTest
 	@Test
 	public void testWriteAndReadLongRecords() throws Exception
 	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		final FileIOChannel.ID channel = this.ioManager.createChannel();
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -155,10 +155,10 @@ public class ChannelViewsTest
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_LONG; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		this.memoryManager.release(outView.close());
 		
@@ -169,15 +169,15 @@ public class ChannelViewsTest
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_LONG; i++) {
 			generator.next(rec);
-			readRec.read(inView);
-			final Key k1 = rec.getField(0, Key.class);
-			final Value v1 = rec.getField(1, Value.class);
-			final Key k2 = readRec.getField(0, Key.class);
-			final Value v2 = readRec.getField(1, Value.class);
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			serializer.deserialize(readRec, inView);
+			final int k1 = rec.f0;
+			final String v1 = rec.f1;
+			final int k2 = readRec.f0;
+			final String v2 = readRec.f1;
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(inView.close());
@@ -187,8 +187,9 @@ public class ChannelViewsTest
 	@Test
 	public void testReadTooMany() throws Exception
 	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		final FileIOChannel.ID channel = this.ioManager.createChannel();
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -196,10 +197,10 @@ public class ChannelViewsTest
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		this.memoryManager.release(outView.close());
 
@@ -211,15 +212,15 @@ public class ChannelViewsTest
 
 		// read and re-generate all records and compare them
 		try {
-			final Record readRec = new Record();
+			final Tuple2<Integer, String> readRec = new Tuple2<>();
 			for (int i = 0; i < NUM_PAIRS_SHORT + 1; i++) {
 				generator.next(rec);
-				readRec.read(inView);
-				final Key k1 = rec.getField(0, Key.class);
-				final Value v1 = rec.getField(1, Value.class);
-				final Key k2 = readRec.getField(0, Key.class);
-				final Value v2 = readRec.getField(1, Value.class);
-				Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+				serializer.deserialize(readRec, inView);
+				final int k1 = rec.f0;
+				final String v1 = rec.f1;
+				final int k2 = readRec.f0;
+				final String v2 = readRec.f1;
+				Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 			}
 			Assert.fail("Expected an EOFException which did not occur.");
 		}
@@ -238,8 +239,9 @@ public class ChannelViewsTest
 	@Test
 	public void testReadWithoutKnownBlockCount() throws Exception
 	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		final FileIOChannel.ID channel = this.ioManager.createChannel();
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -247,10 +249,10 @@ public class ChannelViewsTest
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		this.memoryManager.release(outView.close());
 		
@@ -261,18 +263,18 @@ public class ChannelViewsTest
 		generator.reset();
 		
 		// read and re-generate all records and cmpare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(inView.close());
@@ -282,8 +284,9 @@ public class ChannelViewsTest
 	@Test
 	public void testWriteReadOneBufferOnly() throws Exception
 	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		final FileIOChannel.ID channel = this.ioManager.createChannel();
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, 1);
@@ -291,10 +294,10 @@ public class ChannelViewsTest
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		this.memoryManager.release(outView.close());
 		
@@ -305,18 +308,18 @@ public class ChannelViewsTest
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(inView.close());
@@ -326,8 +329,9 @@ public class ChannelViewsTest
 	@Test
 	public void testWriteReadNotAll() throws Exception
 	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		final FileIOChannel.ID channel = this.ioManager.createChannel();
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -335,10 +339,10 @@ public class ChannelViewsTest
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		this.memoryManager.release(outView.close());
 		
@@ -349,18 +353,18 @@ public class ChannelViewsTest
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT / 2; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(inView.close());

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index 0b1e0c3..538c416 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -27,11 +27,10 @@ import org.apache.flink.runtime.memory.ListMemorySegmentSource;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -90,7 +89,8 @@ public class SpillingBufferTest {
 	
 	@Test
 	public void testWriteReadInMemory() throws Exception {
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -99,10 +99,10 @@ public class SpillingBufferTest {
 							new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		
 		// create the reader input view
@@ -110,18 +110,18 @@ public class SpillingBufferTest {
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		// re-read the data
@@ -131,15 +131,15 @@ public class SpillingBufferTest {
 		// read and re-generate all records and compare them
 		for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(outView.close());
@@ -148,8 +148,9 @@ public class SpillingBufferTest {
 	
 	@Test
 	public void testWriteReadTooMuchInMemory() throws Exception {
-		final TestData.Generator generator = new TestData.Generator(
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(
 				SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -158,10 +159,10 @@ public class SpillingBufferTest {
 							new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		
 		// create the reader input view
@@ -169,19 +170,19 @@ public class SpillingBufferTest {
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		try {
 			for (int i = 0; i < NUM_PAIRS_INMEM + 1; i++) {
 				generator.next(rec);
-				readRec.read(inView);
+				serializer.deserialize(readRec, inView);
 				
-				Key k1 = rec.getField(0, Key.class);
-				Value v1 = rec.getField(1, Value.class);
+				int k1 = rec.f0;
+				String v1 = rec.f1;
 				
-				Key k2 = readRec.getField(0, Key.class);
-				Value v2 = readRec.getField(1, Value.class);
+				int k2 = readRec.f0;
+				String v2 = readRec.f1;
 				
-				Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+				Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 			}
 			Assert.fail("Read too much, expected EOFException.");
 		}
@@ -196,15 +197,15 @@ public class SpillingBufferTest {
 		// read and re-generate all records and compare them
 		for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(outView.close());
@@ -215,8 +216,9 @@ public class SpillingBufferTest {
 	
 	@Test
 	public void testWriteReadExternal() throws Exception {
-		final TestData.Generator generator = new TestData.Generator(
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(
 				SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -225,10 +227,10 @@ public class SpillingBufferTest {
 							new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		
 		// create the reader input view
@@ -236,18 +238,18 @@ public class SpillingBufferTest {
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		// re-read the data
@@ -257,15 +259,15 @@ public class SpillingBufferTest {
 		// read and re-generate all records and compare them
 		for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(outView.close());
@@ -274,8 +276,9 @@ public class SpillingBufferTest {
 
 	@Test
 	public void testWriteReadTooMuchExternal() throws Exception {
-		final TestData.Generator generator = new TestData.Generator(
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(
 				SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -284,10 +287,10 @@ public class SpillingBufferTest {
 							new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		
 		// create the reader input view
@@ -295,19 +298,19 @@ public class SpillingBufferTest {
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		try {
 			for (int i = 0; i < NUM_PAIRS_EXTERNAL + 1; i++) {
 				generator.next(rec);
-				readRec.read(inView);
+				serializer.deserialize(readRec, inView);
 				
-				Key k1 = rec.getField(0, Key.class);
-				Value v1 = rec.getField(1, Value.class);
+				int k1 = rec.f0;
+				String v1 = rec.f1;
 				
-				Key k2 = readRec.getField(0, Key.class);
-				Value v2 = readRec.getField(1, Value.class);
+				int k2 = readRec.f0;
+				String v2 = readRec.f1;
 				
-				Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+				Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 			}
 			Assert.fail("Read too much, expected EOFException.");
 		}
@@ -322,15 +325,15 @@ public class SpillingBufferTest {
 		// read and re-generate all records and compare them
 		for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(outView.close());

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
index 2da97e9..1795062 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
@@ -19,15 +19,10 @@
 
 package org.apache.flink.runtime.operators.hash;
 
-import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -35,18 +30,14 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
 import org.apache.flink.runtime.operators.testutils.UnionIterator;
 import org.apache.flink.runtime.operators.testutils.types.IntPair;
-import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
-import org.apache.flink.types.IntValue;
 import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
@@ -60,6 +51,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
 
 @SuppressWarnings({"serial", "deprecation"})
 public class NonReusingHashMatchIteratorITCase {
@@ -77,31 +70,31 @@ public class NonReusingHashMatchIteratorITCase {
 	private IOManager ioManager;
 	private MemoryManager memoryManager;
 	
-	private TypeSerializer<Record> recordSerializer;
-	private TypeComparator<Record> record1Comparator;
-	private TypeComparator<Record> record2Comparator;
-	private TypePairComparator<Record, Record> recordPairComparator;
+	private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+	private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+	private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
 	
 	private TypeSerializer<IntPair> pairSerializer;
 	private TypeComparator<IntPair> pairComparator;
-	private TypePairComparator<IntPair, Record> pairRecordPairComparator;
-	private TypePairComparator<Record, IntPair> recordPairPairComparator;
+	private TypePairComparator<IntPair, Tuple2<Integer, String>> pairRecordPairComparator;
+	private TypePairComparator<Tuple2<Integer, String>, IntPair> recordPairPairComparator;
 
 
 	@SuppressWarnings("unchecked")
 	@Before
 	public void beforeTest() {
-		this.recordSerializer = RecordSerializer.get();
+		this.recordSerializer = TestData.getIntStringTupleSerializer();
 		
-		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+		this.record1Comparator = TestData.getIntStringTupleComparator();
+		this.record2Comparator = TestData.getIntStringTupleComparator();
 		
-		this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+		this.recordPairComparator = new GenericPairComparator(record1Comparator, record2Comparator);
 		
 		this.pairSerializer = new IntPairSerializer();
-		this.pairComparator = new IntPairComparator();
-		this.pairRecordPairComparator = new IntPairRecordPairComparator();
-		this.recordPairPairComparator = new RecordIntPairPairComparator();
+		this.pairComparator = new TestData.IntPairComparator();
+		this.pairRecordPairComparator = new IntPairTuplePairComparator();
+		this.recordPairPairComparator = new TupleIntPairPairComparator();
 		
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManagerAsync();
@@ -129,19 +122,19 @@ public class NonReusingHashMatchIteratorITCase {
 	@Test
 	public void testBuildFirst() {
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
 	
 			// reset the generators
 			generator1.reset();
@@ -150,8 +143,8 @@ public class NonReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values
-			NonReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
-					new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+			NonReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 						this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -163,7 +156,7 @@ public class NonReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -187,31 +180,31 @@ public class NonReusingHashMatchIteratorITCase {
 		final int DUPLICATE_KEY = 13;
 		
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
-			final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+			final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
 			
-			final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
 			inList1.add(gen1Iter);
 			inList1.add(const1Iter);
 			
-			final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 			
-			MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
-			MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+			MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+			MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
 			
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
 			// re-create the whole thing for actual processing
 			
@@ -231,14 +224,14 @@ public class NonReusingHashMatchIteratorITCase {
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 	
-			input1 = new UnionIterator<Record>(inList1);
-			input2 = new UnionIterator<Record>(inList2);
+			input1 = new UnionIterator<>(inList1);
+			input2 = new UnionIterator<>(inList2);
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
-			NonReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
-					new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+			NonReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 						this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -250,7 +243,7 @@ public class NonReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -265,19 +258,19 @@ public class NonReusingHashMatchIteratorITCase {
 	@Test
 	public void testBuildSecond() {
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
 			// reset the generators
 			generator1.reset();
@@ -286,8 +279,8 @@ public class NonReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values			
-			NonReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
-				new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+			NonReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new NonReusingBuildSecondHashMatchIterator<>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 					this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -299,7 +292,7 @@ public class NonReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -323,31 +316,31 @@ public class NonReusingHashMatchIteratorITCase {
 		final int DUPLICATE_KEY = 13;
 		
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
-			final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+			final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
 			
-			final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
 			inList1.add(gen1Iter);
 			inList1.add(const1Iter);
 			
-			final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 			
-			MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
-			MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+			MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+			MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
 			
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
 			// re-create the whole thing for actual processing
 			
@@ -367,14 +360,14 @@ public class NonReusingHashMatchIteratorITCase {
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 	
-			input1 = new UnionIterator<Record>(inList1);
-			input2 = new UnionIterator<Record>(inList2);
+			input1 = new UnionIterator<>(inList1);
+			input2 = new UnionIterator<>(inList2);
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 
-			NonReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
-				new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+			NonReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new NonReusingBuildSecondHashMatchIterator<>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 					this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -386,7 +379,7 @@ public class NonReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -403,16 +396,16 @@ public class NonReusingHashMatchIteratorITCase {
 		try {
 			MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
 			
-			final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+			final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
 				collectIntPairData(input1),
-				collectRecordData(input2));
+				collectTupleData(input2));
 			
-			final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
 	
 			// reset the generators
 			input1 = new UniformIntPairGenerator(500, 40, false);
@@ -420,8 +413,8 @@ public class NonReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values
-			NonReusingBuildSecondHashMatchIterator<IntPair, Record, Record> iterator =
-					new NonReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
+			NonReusingBuildSecondHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildSecondHashMatchIterator<>(
 						input1, input2, this.pairSerializer, this.pairComparator,
 						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
 						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
@@ -433,7 +426,7 @@ public class NonReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -450,16 +443,16 @@ public class NonReusingHashMatchIteratorITCase {
 		try {
 			MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
 			
-			final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+			final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
 				collectIntPairData(input1),
-				collectRecordData(input2));
+				collectTupleData(input2));
 			
-			final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
 			// reset the generators
 			input1 = new UniformIntPairGenerator(500, 40, false);
@@ -467,8 +460,8 @@ public class NonReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values
-			NonReusingBuildFirstHashMatchIterator<IntPair, Record, Record> iterator =
-					new NonReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
+			NonReusingBuildFirstHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.pairSerializer, this.pairComparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
 						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
@@ -480,7 +473,7 @@ public class NonReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -498,29 +491,29 @@ public class NonReusingHashMatchIteratorITCase {
 
 	
 	
-	static Map<TestData.Key, Collection<RecordMatch>> matchRecordValues(
-			Map<TestData.Key, Collection<TestData.Value>> leftMap,
-			Map<TestData.Key, Collection<TestData.Value>> rightMap)
+	static Map<Integer, Collection<TupleMatch>> matchSecondTupleFields(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
 	{
-		Map<TestData.Key, Collection<RecordMatch>> map = new HashMap<TestData.Key, Collection<RecordMatch>>();
+		Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
 
-		for (TestData.Key key : leftMap.keySet()) {
-			Collection<TestData.Value> leftValues = leftMap.get(key);
-			Collection<TestData.Value> rightValues = rightMap.get(key);
+		for (Integer key : leftMap.keySet()) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
 
 			if (rightValues == null) {
 				continue;
 			}
 
 			if (!map.containsKey(key)) {
-				map.put(key, new ArrayList<RecordMatch>());
+				map.put(key, new ArrayList<TupleMatch>());
 			}
 
-			Collection<RecordMatch> matchedValues = map.get(key);
+			Collection<TupleMatch> matchedValues = map.get(key);
 
-			for (TestData.Value leftValue : leftValues) {
-				for (TestData.Value rightValue : rightValues) {
-					matchedValues.add(new RecordMatch(leftValue, rightValue));
+			for (String leftValue : leftValues) {
+				for (String rightValue : rightValues) {
+					matchedValues.add(new TupleMatch(leftValue, rightValue));
 				}
 			}
 		}
@@ -528,32 +521,30 @@ public class NonReusingHashMatchIteratorITCase {
 		return map;
 	}
 	
-	static Map<TestData.Key, Collection<RecordIntPairMatch>> matchRecordIntPairValues(
+	static Map<Integer, Collection<TupleIntPairMatch>> matchTupleIntPairValues(
 		Map<Integer, Collection<Integer>> leftMap,
-		Map<TestData.Key, Collection<TestData.Value>> rightMap)
+		Map<Integer, Collection<String>> rightMap)
 	{
-		final Map<TestData.Key, Collection<RecordIntPairMatch>> map = new HashMap<TestData.Key, Collection<RecordIntPairMatch>>();
+		final Map<Integer, Collection<TupleIntPairMatch>> map = new HashMap<>();
 	
 		for (Integer i : leftMap.keySet()) {
 			
-			final TestData.Key key = new TestData.Key(i.intValue());
-			
 			final Collection<Integer> leftValues = leftMap.get(i);
-			final Collection<TestData.Value> rightValues = rightMap.get(key);
+			final Collection<String> rightValues = rightMap.get(i);
 	
 			if (rightValues == null) {
 				continue;
 			}
 	
-			if (!map.containsKey(key)) {
-				map.put(key, new ArrayList<RecordIntPairMatch>());
+			if (!map.containsKey(i)) {
+				map.put(i, new ArrayList<TupleIntPairMatch>());
 			}
 	
-			final Collection<RecordIntPairMatch> matchedValues = map.get(key);
+			final Collection<TupleIntPairMatch> matchedValues = map.get(i);
 	
 			for (Integer v : leftValues) {
-				for (TestData.Value val : rightValues) {
-					matchedValues.add(new RecordIntPairMatch(v, val));
+				for (String val : rightValues) {
+					matchedValues.add(new TupleIntPairMatch(v, val));
 				}
 			}
 		}
@@ -562,21 +553,21 @@ public class NonReusingHashMatchIteratorITCase {
 	}
 
 	
-	static Map<TestData.Key, Collection<TestData.Value>> collectRecordData(MutableObjectIterator<Record> iter)
+	static Map<Integer, Collection<String>> collectTupleData(MutableObjectIterator<Tuple2<Integer, String>> iter)
 	throws Exception
 	{
-		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
-		Record pair = new Record();
+		Map<Integer, Collection<String>> map = new HashMap<>();
+		Tuple2<Integer, String> pair = new Tuple2<>();
 		
 		while ((pair = iter.next(pair)) != null) {
 
-			TestData.Key key = pair.getField(0, TestData.Key.class);
+			Integer key = pair.f0;
 			if (!map.containsKey(key)) {
-				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+				map.put(key, new ArrayList<String>());
 			}
 
-			Collection<TestData.Value> values = map.get(key);
-			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+			Collection<String> values = map.get(key);
+			values.add(pair.f1);
 		}
 
 		return map;
@@ -606,19 +597,19 @@ public class NonReusingHashMatchIteratorITCase {
 	/**
 	 * Private class used for storage of the expected matches in a hash-map.
 	 */
-	static class RecordMatch {
+	static class TupleMatch {
 		
-		private final Value left;
-		private final Value right;
+		private final String left;
+		private final String right;
 
-		public RecordMatch(Value left, Value right) {
+		public TupleMatch(String left, String right) {
 			this.left = left;
 			this.right = right;
 		}
 
 		@Override
 		public boolean equals(Object obj) {
-			RecordMatch o = (RecordMatch) obj;
+			TupleMatch o = (TupleMatch) obj;
 			return this.left.equals(o.left) && this.right.equals(o.right);
 		}
 		
@@ -636,19 +627,19 @@ public class NonReusingHashMatchIteratorITCase {
 	/**
 	 * Private class used for storage of the expected matches in a hash-map.
 	 */
-	static class RecordIntPairMatch
+	static class TupleIntPairMatch
 	{
 		private final int left;
-		private final Value right;
+		private final String right;
 
-		public RecordIntPairMatch(int left, Value right) {
+		public TupleIntPairMatch(int left, String right) {
 			this.left = left;
-			this.right = right;
+			this.right = new String(right);
 		}
 
 		@Override
 		public boolean equals(Object obj) {
-			RecordIntPairMatch o = (RecordIntPairMatch) obj;
+			TupleIntPairMatch o = (TupleIntPairMatch) obj;
 			return this.left == o.left && this.right.equals(o.right);
 		}
 		
@@ -663,28 +654,28 @@ public class NonReusingHashMatchIteratorITCase {
 		}
 	}
 	
-	static final class RecordMatchRemovingJoin extends JoinFunction
+	static final class TupleMatchRemovingJoin implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>
 	{
-		private final Map<TestData.Key, Collection<RecordMatch>> toRemoveFrom;
+		private final Map<Integer, Collection<TupleMatch>> toRemoveFrom;
 		
-		protected RecordMatchRemovingJoin(Map<TestData.Key, Collection<RecordMatch>> map) {
+		protected TupleMatchRemovingJoin(Map<Integer, Collection<TupleMatch>> map) {
 			this.toRemoveFrom = map;
 		}
 		
 		@Override
-		public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception
+		public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
 		{
-			TestData.Key key = rec1.getField(0, TestData.Key.class);
-			TestData.Value value1 = rec1.getField(1, TestData.Value.class);
-			TestData.Value value2 = rec2.getField(1, TestData.Value.class);
-			//System.err.println("rec1 key = "+key+"  rec2 key= "+rec2.getField(0, TestData.Key.class));
-			Collection<RecordMatch> matches = this.toRemoveFrom.get(key);
+			int key = rec1.f0;
+			String value1 = rec1.f1;
+			String value2 = rec2.f1;
+			//System.err.println("rec1 key = "+key+"  rec2 key= "+rec2.f0);
+			Collection<TupleMatch> matches = this.toRemoveFrom.get(key);
 			if (matches == null) {
 				Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
 			}
 			
 			Assert.assertTrue("Produced match was not contained: " + key + " - " + value1 + ":" + value2,
-				matches.remove(new RecordMatch(value1, value2)));
+				matches.remove(new TupleMatch(value1, value2)));
 			
 			if (matches.isEmpty()) {
 				this.toRemoveFrom.remove(key);
@@ -692,32 +683,32 @@ public class NonReusingHashMatchIteratorITCase {
 		}
 	}
 	
-	static final class RecordIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Record, Record>
+	static final class TupleIntPairMatchRemovingMatcher implements FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>>
 	{
-		private final Map<TestData.Key, Collection<RecordIntPairMatch>> toRemoveFrom;
+		private final Map<Integer, Collection<TupleIntPairMatch>> toRemoveFrom;
 		
-		protected RecordIntPairMatchRemovingMatcher(Map<TestData.Key, Collection<RecordIntPairMatch>> map) {
+		protected TupleIntPairMatchRemovingMatcher(Map<Integer, Collection<TupleIntPairMatch>> map) {
 			this.toRemoveFrom = map;
 		}
 		
 		@Override
-		public void join(IntPair rec1, Record rec2, Collector<Record> out) throws Exception
+		public void join(IntPair rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
 		{
 			final int k = rec1.getKey();
 			final int v = rec1.getValue(); 
 			
-			final TestData.Key key = rec2.getField(0, TestData.Key.class);
-			final TestData.Value value = rec2.getField(1, TestData.Value.class);
-			
-			Assert.assertTrue("Key does not match for matching IntPair Record combination.", k == key.getKey()); 
+			final Integer key = rec2.f0;
+			final String value = rec2.f1;
+
+			Assert.assertTrue("Key does not match for matching IntPair Tuple combination.", k == key);
 			
-			Collection<RecordIntPairMatch> matches = this.toRemoveFrom.get(key);
+			Collection<TupleIntPairMatch> matches = this.toRemoveFrom.get(key);
 			if (matches == null) {
 				Assert.fail("Match " + key + " - " + v + ":" + value + " is unexpected.");
 			}
 			
 			Assert.assertTrue("Produced match was not contained: " + key + " - " + v + ":" + value,
-				matches.remove(new RecordIntPairMatch(v, value)));
+				matches.remove(new TupleIntPairMatch(v, value)));
 			
 			if (matches.isEmpty()) {
 				this.toRemoveFrom.remove(key);
@@ -725,7 +716,7 @@ public class NonReusingHashMatchIteratorITCase {
 		}
 	}
 	
-	static final class IntPairRecordPairComparator extends TypePairComparator<IntPair, Record>
+	static final class IntPairTuplePairComparator extends TypePairComparator<IntPair, Tuple2<Integer, String>>
 	{
 		private int reference;
 		
@@ -735,33 +726,31 @@ public class NonReusingHashMatchIteratorITCase {
 		}
 
 		@Override
-		public boolean equalToReference(Record candidate) {
+		public boolean equalToReference(Tuple2<Integer, String> candidate) {
 			try {
-				final IntValue i = candidate.getField(0, IntValue.class);
-				return i.getValue() == this.reference;
+				return candidate.f0 == this.reference;
 			} catch (NullPointerException npex) {
 				throw new NullKeyFieldException();
 			}
 		}
 
 		@Override
-		public int compareToReference(Record candidate) {
+		public int compareToReference(Tuple2<Integer, String> candidate) {
 			try {
-				final IntValue i = candidate.getField(0, IntValue.class);
-				return i.getValue() - this.reference;
+				return candidate.f0 - this.reference;
 			} catch (NullPointerException npex) {
 				throw new NullKeyFieldException();
 			}
 		}
 	}
 	
-	static final class RecordIntPairPairComparator extends TypePairComparator<Record, IntPair>
+	static final class TupleIntPairPairComparator extends TypePairComparator<Tuple2<Integer, String>, IntPair>
 	{
 		private int reference;
 		
 		@Override
-		public void setReference(Record reference) {
-			this.reference = reference.getField(0, IntValue.class).getValue();
+		public void setReference(Tuple2<Integer, String> reference) {
+			this.reference = reference.f0;
 		}
 
 		@Override