You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/15 15:07:23 UTC

[4/6] flink git commit: [FLINK-2479] Refactor runtime.operators.* tests

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index 86f879a..5a4fc6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -22,10 +22,6 @@ package org.apache.flink.runtime.operators.hash;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -33,22 +29,16 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.HashTableITCase.ConstantsKeyValuePairsIterator;
 import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.RecordMatch;
-import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase
-		.RecordMatchRemovingJoin;
+import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.TupleMatch;
+import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.TupleMatchRemovingJoin;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
@@ -63,6 +53,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
 
 import static org.junit.Assert.fail;
 
@@ -86,39 +80,35 @@ public class NonReusingReOpenableHashTableITCase {
 	private IOManager ioManager;
 	private MemoryManager memoryManager;
 
-	private TypeSerializer<Record> recordSerializer;
-	private TypeComparator<Record> record1Comparator;
-	private TypeComparator<Record> record2Comparator;
-	private TypePairComparator<Record, Record> recordPairComparator;
+	private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+	private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+	private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
 
 
 
 
 	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
-	private TypeSerializer<Record> recordBuildSideAccesssor;
-	private TypeSerializer<Record> recordProbeSideAccesssor;
-	private TypeComparator<Record> recordBuildSideComparator;
-	private TypeComparator<Record> recordProbeSideComparator;
-	private TypePairComparator<Record, Record> pactRecordComparator;
+	private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
+	private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
+	private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
+	private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
+	private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
 
 	@SuppressWarnings({"unchecked", "rawtypes"})
 	@Before
 	public void beforeTest() {
-		this.recordSerializer = RecordSerializer.get();
+		this.recordSerializer = TestData.getIntStringTupleSerializer();
 
-		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {Key.class});
-		this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {Key.class});
-		this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {Key.class});
+		this.record1Comparator = TestData.getIntStringTupleComparator();
+		this.record2Comparator = TestData.getIntStringTupleComparator();
+		this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
 
-
-		final int[] keyPos = new int[] {0};
-		final Class<? extends Key>[] keyType = (Class<? extends Key>[]) new Class[] { IntValue.class };
-
-		this.recordBuildSideAccesssor = RecordSerializer.get();
-		this.recordProbeSideAccesssor = RecordSerializer.get();
-		this.recordBuildSideComparator = new RecordComparator(keyPos, keyType);
-		this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
-		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
+		this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
+		this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
+		this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
+		this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
+		this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
 
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 		this.ioManager = new IOManagerAsync();
@@ -153,11 +143,11 @@ public class NonReusingReOpenableHashTableITCase {
 		int buildSize = 1000;
 		int probeSize = 1000;
 		try {
-			Generator bgen = new Generator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TupleGenerator bgen = new TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+			TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
 
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
 			doTest(buildInput,probeInput, bgen, pgen);
 		}
 		catch (Exception e) {
@@ -175,11 +165,11 @@ public class NonReusingReOpenableHashTableITCase {
 		int buildSize = 1000;
 		int probeSize = 1000;
 		try {
-			Generator bgen = new Generator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TupleGenerator bgen = new TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
 
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
 			doTest(buildInput,probeInput, bgen, pgen);
 		}
 		catch (Exception e) {
@@ -198,11 +188,11 @@ public class NonReusingReOpenableHashTableITCase {
 		int buildSize = 1000;
 		int probeSize = 1000;
 		try {
-			Generator bgen = new Generator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TupleGenerator bgen = new TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TupleGenerator pgen = new TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
 
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
 
 			doTest(buildInput,probeInput, bgen, pgen);
 		}
@@ -212,21 +202,21 @@ public class NonReusingReOpenableHashTableITCase {
 		}
 	}
 
-	private void doTest(TestData.GeneratorIterator buildInput, TestData.GeneratorIterator probeInput, Generator bgen, Generator pgen) throws Exception {
+	private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator pgen) throws Exception {
 		// collect expected data
-		final Map<Key, Collection<RecordMatch>> expectedFirstMatchesMap = NonReusingHashMatchIteratorITCase.matchRecordValues(NonReusingHashMatchIteratorITCase.collectRecordData(buildInput), NonReusingHashMatchIteratorITCase.collectRecordData(probeInput));
+		final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = NonReusingHashMatchIteratorITCase.matchSecondTupleFields(NonReusingHashMatchIteratorITCase.collectTupleData(buildInput), NonReusingHashMatchIteratorITCase.collectTupleData(probeInput));
 
-		final List<Map<Key, Collection<RecordMatch>>> expectedNMatchesMapList = new ArrayList<Map<Key,Collection<RecordMatch>>>(NUM_PROBES);
-		final JoinFunction[] nMatcher = new RecordMatchRemovingJoin[NUM_PROBES];
+		final List<Map<Integer, Collection<TupleMatch>>> expectedNMatchesMapList = new ArrayList<>(NUM_PROBES);
+		final FlatJoinFunction[] nMatcher = new TupleMatchRemovingJoin[NUM_PROBES];
 		for(int i = 0; i < NUM_PROBES; i++) {
-			Map<Key, Collection<RecordMatch>> tmp;
+			Map<Integer, Collection<TupleMatch>> tmp;
 			expectedNMatchesMapList.add(tmp = deepCopy(expectedFirstMatchesMap));
-			nMatcher[i] = new RecordMatchRemovingJoin(tmp);
+			nMatcher[i] = new TupleMatchRemovingJoin(tmp);
 		}
 
-		final JoinFunction firstMatcher = new RecordMatchRemovingJoin(expectedFirstMatchesMap);
+		final FlatJoinFunction firstMatcher = new TupleMatchRemovingJoin(expectedFirstMatchesMap);
 
-		final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+		final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 
 		// reset the generators
 		bgen.reset();
@@ -235,8 +225,8 @@ public class NonReusingReOpenableHashTableITCase {
 		probeInput.reset();
 
 		// compare with iterator values
-		NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record> iterator =
-				new NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
+		NonReusingBuildFirstReOpenableHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new NonReusingBuildFirstReOpenableHashMatchIterator<>(
 						buildInput, probeInput, this.recordSerializer, this.record1Comparator,
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 					this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -246,7 +236,7 @@ public class NonReusingReOpenableHashTableITCase {
 		while (iterator.callWithNextKey(firstMatcher, collector));
 
 		// assert that each expected match was seen for the first input
-		for (Entry<Key, Collection<RecordMatch>> entry : expectedFirstMatchesMap.entrySet()) {
+		for (Entry<Integer, Collection<TupleMatch>> entry : expectedFirstMatchesMap.entrySet()) {
 			if (!entry.getValue().isEmpty()) {
 				Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 			}
@@ -261,7 +251,7 @@ public class NonReusingReOpenableHashTableITCase {
 			while (iterator.callWithNextKey(nMatcher[i], collector));
 
 			// assert that each expected match was seen for the second input
-			for (Entry<Key, Collection<RecordMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -277,16 +267,16 @@ public class NonReusingReOpenableHashTableITCase {
 	//
 	//
 
-	private MutableObjectIterator<Record> getProbeInput(final int numKeys,
+	private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(final int numKeys,
 			final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
-		MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
-		MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
-		MutableObjectIterator<Record> probe3 = new ConstantsKeyValuePairsIterator(repeatedValue2, 23, 5);
-		List<MutableObjectIterator<Record>> probes = new ArrayList<MutableObjectIterator<Record>>();
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = new ArrayList<>();
 		probes.add(probe1);
 		probes.add(probe2);
 		probes.add(probe3);
-		return new UnionIterator<Record>(probes);
+		return new UnionIterator<>(probes);
 	}
 
 	@Test
@@ -304,14 +294,14 @@ public class NonReusingReOpenableHashTableITCase {
 		final int PROBE_VALS_PER_KEY = 10;
 
 		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+		MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
 		builds.add(build1);
 		builds.add(build2);
 		builds.add(build3);
-		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+		MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
 
 
 
@@ -331,40 +321,40 @@ public class NonReusingReOpenableHashTableITCase {
 
 		// ----------------------------------------------------------------------------------------
 
-		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+		final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
 				memSegments, ioManager, true);
 
 		for (int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+			MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
 			if(probe == 0) {
 				join.open(buildInput, probeInput);
 			} else {
 				join.reopenProbe(probeInput);
 			}
 
-			Record record;
-			final Record recordReuse = new Record();
+			Tuple2<Integer, Integer> record;
+			final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
 
 			while (join.nextRecord()) {
 				long numBuildValues = 0;
 
-				final Record probeRec = join.getCurrentProbeRecord();
-				int key = probeRec.getField(0, IntValue.class).getValue();
+				final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+				Integer key = probeRec.f0;
 
-				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+				HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
 				if ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
 				}
 				else {
 					fail("No build side values found for a probe key.");
 				}
 				while ((record = buildSide.next(record)) != null) {
 					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
 				}
 
 				Long contained = map.get(key);
@@ -421,14 +411,14 @@ public class NonReusingReOpenableHashTableITCase {
 		final int PROBE_VALS_PER_KEY = 10;
 
 		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+		MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
 		builds.add(build1);
 		builds.add(build2);
 		builds.add(build3);
-		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+		MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
 
 
 		// allocate the memory for the HashTable
@@ -446,39 +436,39 @@ public class NonReusingReOpenableHashTableITCase {
 
 		// ----------------------------------------------------------------------------------------
 
-		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+		final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
 				memSegments, ioManager, true);
 		
 		for (int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+			MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
 			if (probe == 0) {
 				join.open(buildInput, probeInput);
 			} else {
 				join.reopenProbe(probeInput);
 			}
-			Record record;
-			final Record recordReuse = new Record();
+			Tuple2<Integer, Integer> record;
+			final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
 
 			while (join.nextRecord()) {
 				long numBuildValues = 0;
 
-				final Record probeRec = join.getCurrentProbeRecord();
-				int key = probeRec.getField(0, IntValue.class).getValue();
+				final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+				Integer key = probeRec.f0;
 
-				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+				HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
 				if ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
 				}
 				else {
 					fail("No build side values found for a probe key.");
 				}
 				while ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
 				}
 
 				Long contained = map.get(key);
@@ -515,11 +505,11 @@ public class NonReusingReOpenableHashTableITCase {
 	}
 
 
-	static Map<Key, Collection<RecordMatch>> deepCopy(Map<Key, Collection<RecordMatch>> expectedSecondMatchesMap) {
-		Map<Key, Collection<RecordMatch>> copy = new HashMap<Key, Collection<RecordMatch>>(expectedSecondMatchesMap.size());
-		for(Entry<Key, Collection<RecordMatch>> entry : expectedSecondMatchesMap.entrySet()) {
-			List<RecordMatch> matches = new ArrayList<RecordMatch>(entry.getValue().size());
-			for(RecordMatch m : entry.getValue()) {
+	static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, Collection<TupleMatch>> expectedSecondMatchesMap) {
+		Map<Integer, Collection<TupleMatch>> copy = new HashMap<>(expectedSecondMatchesMap.size());
+		for(Entry<Integer, Collection<TupleMatch>> entry : expectedSecondMatchesMap.entrySet()) {
+			List<TupleMatch> matches = new ArrayList<TupleMatch>(entry.getValue().size());
+			for(TupleMatch m : entry.getValue()) {
 				matches.add(m);
 			}
 			copy.put(entry.getKey(), matches);

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
index 4fdff76..12f4a32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
@@ -28,13 +28,11 @@ import java.util.Map.Entry;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -44,16 +42,11 @@ import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
 import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
 import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.runtime.operators.testutils.types.IntPair;
-import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
-import org.apache.flink.types.IntValue;
 import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
@@ -77,31 +70,31 @@ public class ReusingHashMatchIteratorITCase {
 	private IOManager ioManager;
 	private MemoryManager memoryManager;
 	
-	private TypeSerializer<Record> recordSerializer;
-	private TypeComparator<Record> record1Comparator;
-	private TypeComparator<Record> record2Comparator;
-	private TypePairComparator<Record, Record> recordPairComparator;
+	private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+	private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+	private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
 	
 	private TypeSerializer<IntPair> pairSerializer;
 	private TypeComparator<IntPair> pairComparator;
-	private TypePairComparator<IntPair, Record> pairRecordPairComparator;
-	private TypePairComparator<Record, IntPair> recordPairPairComparator;
+	private TypePairComparator<IntPair, Tuple2<Integer, String>> pairRecordPairComparator;
+	private TypePairComparator<Tuple2<Integer, String>, IntPair> recordPairPairComparator;
 
 
 	@SuppressWarnings("unchecked")
 	@Before
 	public void beforeTest() {
-		this.recordSerializer = RecordSerializer.get();
+		this.recordSerializer = TestData.getIntStringTupleSerializer();
 		
-		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+		this.record1Comparator = TestData.getIntStringTupleComparator();
+		this.record2Comparator = TestData.getIntStringTupleComparator();
 		
-		this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+		this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
 		
 		this.pairSerializer = new IntPairSerializer();
-		this.pairComparator = new IntPairComparator();
-		this.pairRecordPairComparator = new IntPairRecordPairComparator();
-		this.recordPairPairComparator = new RecordIntPairPairComparator();
+		this.pairComparator = new TestData.IntPairComparator();
+		this.pairRecordPairComparator = new IntPairTuplePairComparator();
+		this.recordPairPairComparator = new TupleIntPairPairComparator();
 		
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManagerAsync();
@@ -129,19 +122,19 @@ public class ReusingHashMatchIteratorITCase {
 	@Test
 	public void testBuildFirst() {
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
 			// reset the generators
 			generator1.reset();
@@ -150,8 +143,8 @@ public class ReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values
-			ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
-					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+			ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 						this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -163,7 +156,7 @@ public class ReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -187,31 +180,31 @@ public class ReusingHashMatchIteratorITCase {
 		final int DUPLICATE_KEY = 13;
 		
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
-			final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+			final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
 			
-			final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
 			inList1.add(gen1Iter);
 			inList1.add(const1Iter);
 			
-			final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 			
-			MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
-			MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+			MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+			MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
 			
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
 			// re-create the whole thing for actual processing
 			
@@ -231,14 +224,14 @@ public class ReusingHashMatchIteratorITCase {
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 	
-			input1 = new UnionIterator<Record>(inList1);
-			input2 = new UnionIterator<Record>(inList2);
+			input1 = new UnionIterator<Tuple2<Integer, String>>(inList1);
+			input2 = new UnionIterator<Tuple2<Integer, String>>(inList2);
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
-			ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
-					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+			ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 						this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -250,7 +243,7 @@ public class ReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -265,19 +258,19 @@ public class ReusingHashMatchIteratorITCase {
 	@Test
 	public void testBuildSecond() {
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
 			// reset the generators
 			generator1.reset();
@@ -286,8 +279,8 @@ public class ReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values			
-			ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
-				new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+			ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>,Tuple2<Integer, String> ,Tuple2<Integer, String> > iterator =
+				new ReusingBuildSecondHashMatchIterator<>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 					this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -299,7 +292,7 @@ public class ReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -323,31 +316,31 @@ public class ReusingHashMatchIteratorITCase {
 		final int DUPLICATE_KEY = 13;
 		
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
-			final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+			final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
 			
-			final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
 			inList1.add(gen1Iter);
 			inList1.add(const1Iter);
 			
-			final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 			
-			MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
-			MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+			MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+			MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
 			
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
 			// re-create the whole thing for actual processing
 			
@@ -367,14 +360,14 @@ public class ReusingHashMatchIteratorITCase {
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 	
-			input1 = new UnionIterator<Record>(inList1);
-			input2 = new UnionIterator<Record>(inList2);
+			input1 = new UnionIterator<>(inList1);
+			input2 = new UnionIterator<>(inList2);
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
-			ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
-				new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+			ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new ReusingBuildSecondHashMatchIterator<>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 					this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -386,7 +379,7 @@ public class ReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -403,16 +396,16 @@ public class ReusingHashMatchIteratorITCase {
 		try {
 			MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
 			
-			final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+			final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
 				collectIntPairData(input1),
-				collectRecordData(input2));
+				collectTupleData(input2));
 			
-			final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
 			// reset the generators
 			input1 = new UniformIntPairGenerator(500, 40, false);
@@ -420,8 +413,8 @@ public class ReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values
-			ReusingBuildSecondHashMatchIterator<IntPair, Record, Record> iterator =
-					new ReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
+			ReusingBuildSecondHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildSecondHashMatchIterator<>(
 						input1, input2, this.pairSerializer, this.pairComparator,
 						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
 						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
@@ -433,7 +426,7 @@ public class ReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -450,16 +443,16 @@ public class ReusingHashMatchIteratorITCase {
 		try {
 			MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
 			
-			final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+			final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
 				collectIntPairData(input1),
-				collectRecordData(input2));
+				collectTupleData(input2));
 			
-			final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
 			// reset the generators
 			input1 = new UniformIntPairGenerator(500, 40, false);
@@ -467,8 +460,8 @@ public class ReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values
-			ReusingBuildFirstHashMatchIterator<IntPair, Record, Record> iterator =
-					new ReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
+			ReusingBuildFirstHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.pairSerializer, this.pairComparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
 						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
@@ -480,7 +473,7 @@ public class ReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -498,29 +491,29 @@ public class ReusingHashMatchIteratorITCase {
 
 	
 	
-	static Map<TestData.Key, Collection<RecordMatch>> matchRecordValues(
-			Map<TestData.Key, Collection<TestData.Value>> leftMap,
-			Map<TestData.Key, Collection<TestData.Value>> rightMap)
+	static Map<Integer, Collection<TupleMatch>> matchSecondTupleFields(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
 	{
-		Map<TestData.Key, Collection<RecordMatch>> map = new HashMap<TestData.Key, Collection<RecordMatch>>();
+		Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
 
-		for (TestData.Key key : leftMap.keySet()) {
-			Collection<TestData.Value> leftValues = leftMap.get(key);
-			Collection<TestData.Value> rightValues = rightMap.get(key);
+		for (Integer key : leftMap.keySet()) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
 
 			if (rightValues == null) {
 				continue;
 			}
 
 			if (!map.containsKey(key)) {
-				map.put(key, new ArrayList<RecordMatch>());
+				map.put(key, new ArrayList<TupleMatch>());
 			}
 
-			Collection<RecordMatch> matchedValues = map.get(key);
+			Collection<TupleMatch> matchedValues = map.get(key);
 
-			for (TestData.Value leftValue : leftValues) {
-				for (TestData.Value rightValue : rightValues) {
-					matchedValues.add(new RecordMatch(leftValue, rightValue));
+			for (String leftValue : leftValues) {
+				for (String rightValue : rightValues) {
+					matchedValues.add(new TupleMatch(leftValue, rightValue));
 				}
 			}
 		}
@@ -528,32 +521,32 @@ public class ReusingHashMatchIteratorITCase {
 		return map;
 	}
 	
-	static Map<TestData.Key, Collection<RecordIntPairMatch>> matchRecordIntPairValues(
+	static Map<Integer, Collection<TupleIntPairMatch>> matchTupleIntPairValues(
 		Map<Integer, Collection<Integer>> leftMap,
-		Map<TestData.Key, Collection<TestData.Value>> rightMap)
+		Map<Integer, Collection<String>> rightMap)
 	{
-		final Map<TestData.Key, Collection<RecordIntPairMatch>> map = new HashMap<TestData.Key, Collection<RecordIntPairMatch>>();
+		final Map<Integer, Collection<TupleIntPairMatch>> map = new HashMap<>();
 	
 		for (Integer i : leftMap.keySet()) {
 			
-			final TestData.Key key = new TestData.Key(i.intValue());
+			final Integer key = new Integer(i.intValue());
 			
 			final Collection<Integer> leftValues = leftMap.get(i);
-			final Collection<TestData.Value> rightValues = rightMap.get(key);
+			final Collection<String> rightValues = rightMap.get(key);
 	
 			if (rightValues == null) {
 				continue;
 			}
 	
 			if (!map.containsKey(key)) {
-				map.put(key, new ArrayList<RecordIntPairMatch>());
+				map.put(key, new ArrayList<TupleIntPairMatch>());
 			}
 	
-			final Collection<RecordIntPairMatch> matchedValues = map.get(key);
+			final Collection<TupleIntPairMatch> matchedValues = map.get(key);
 	
 			for (Integer v : leftValues) {
-				for (TestData.Value val : rightValues) {
-					matchedValues.add(new RecordIntPairMatch(v, val));
+				for (String val : rightValues) {
+					matchedValues.add(new TupleIntPairMatch(v, val));
 				}
 			}
 		}
@@ -562,21 +555,21 @@ public class ReusingHashMatchIteratorITCase {
 	}
 
 	
-	static Map<TestData.Key, Collection<TestData.Value>> collectRecordData(MutableObjectIterator<Record> iter)
+	static Map<Integer, Collection<String>> collectTupleData(MutableObjectIterator<Tuple2<Integer, String>> iter)
 	throws Exception
 	{
-		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
-		Record pair = new Record();
+		Map<Integer, Collection<String>> map = new HashMap<>();
+		Tuple2<Integer, String> pair = new Tuple2<>();
 		
 		while ((pair = iter.next(pair)) != null) {
 
-			TestData.Key key = pair.getField(0, TestData.Key.class);
+			Integer key = pair.f0;
 			if (!map.containsKey(key)) {
-				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+				map.put(key, new ArrayList<String>());
 			}
 
-			Collection<TestData.Value> values = map.get(key);
-			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+			Collection<String> values = map.get(key);
+			values.add(pair.f1);
 		}
 
 		return map;
@@ -585,7 +578,7 @@ public class ReusingHashMatchIteratorITCase {
 	static Map<Integer, Collection<Integer>> collectIntPairData(MutableObjectIterator<IntPair> iter)
 	throws Exception
 	{
-		Map<Integer, Collection<Integer>> map = new HashMap<Integer, Collection<Integer>>();
+		Map<Integer, Collection<Integer>> map = new HashMap<>();
 		IntPair pair = new IntPair();
 		
 		while ((pair = iter.next(pair)) != null) {
@@ -606,19 +599,19 @@ public class ReusingHashMatchIteratorITCase {
 	/**
 	 * Private class used for storage of the expected matches in a hash-map.
 	 */
-	static class RecordMatch {
+	static class TupleMatch {
 		
-		private final Value left;
-		private final Value right;
+		private final String left;
+		private final String right;
 
-		public RecordMatch(Value left, Value right) {
+		public TupleMatch(String left, String right) {
 			this.left = left;
 			this.right = right;
 		}
 
 		@Override
 		public boolean equals(Object obj) {
-			RecordMatch o = (RecordMatch) obj;
+			TupleMatch o = (TupleMatch) obj;
 			return this.left.equals(o.left) && this.right.equals(o.right);
 		}
 		
@@ -636,19 +629,19 @@ public class ReusingHashMatchIteratorITCase {
 	/**
 	 * Private class used for storage of the expected matches in a hash-map.
 	 */
-	static class RecordIntPairMatch
+	static class TupleIntPairMatch
 	{
 		private final int left;
-		private final Value right;
+		private final String right;
 
-		public RecordIntPairMatch(int left, Value right) {
+		public TupleIntPairMatch(int left, String right) {
 			this.left = left;
 			this.right = right;
 		}
 
 		@Override
 		public boolean equals(Object obj) {
-			RecordIntPairMatch o = (RecordIntPairMatch) obj;
+			TupleIntPairMatch o = (TupleIntPairMatch) obj;
 			return this.left == o.left && this.right.equals(o.right);
 		}
 		
@@ -663,28 +656,28 @@ public class ReusingHashMatchIteratorITCase {
 		}
 	}
 	
-	static final class RecordMatchRemovingJoin extends JoinFunction
+	static final class TupleMatchRemovingJoin implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>
 	{
-		private final Map<TestData.Key, Collection<RecordMatch>> toRemoveFrom;
+		private final Map<Integer, Collection<TupleMatch>> toRemoveFrom;
 		
-		protected RecordMatchRemovingJoin(Map<TestData.Key, Collection<RecordMatch>> map) {
+		protected TupleMatchRemovingJoin(Map<Integer, Collection<TupleMatch>> map) {
 			this.toRemoveFrom = map;
 		}
 		
 		@Override
-		public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception
+		public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
 		{
-			TestData.Key key = rec1.getField(0, TestData.Key.class);
-			TestData.Value value1 = rec1.getField(1, TestData.Value.class);
-			TestData.Value value2 = rec2.getField(1, TestData.Value.class);
+			Integer key = rec1.f0;
+			String value1 = rec1.f1;
+			String value2 = rec2.f1;
 			//System.err.println("rec1 key = "+key+"  rec2 key= "+rec2.getField(0, TestData.Key.class));
-			Collection<RecordMatch> matches = this.toRemoveFrom.get(key);
+			Collection<TupleMatch> matches = this.toRemoveFrom.get(key);
 			if (matches == null) {
 				Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
 			}
 			
 			Assert.assertTrue("Produced match was not contained: " + key + " - " + value1 + ":" + value2,
-				matches.remove(new RecordMatch(value1, value2)));
+				matches.remove(new TupleMatch(value1, value2)));
 			
 			if (matches.isEmpty()) {
 				this.toRemoveFrom.remove(key);
@@ -692,32 +685,32 @@ public class ReusingHashMatchIteratorITCase {
 		}
 	}
 	
-	static final class RecordIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Record, Record>
+	static final class TupleIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>>
 	{
-		private final Map<TestData.Key, Collection<RecordIntPairMatch>> toRemoveFrom;
+		private final Map<Integer, Collection<TupleIntPairMatch>> toRemoveFrom;
 		
-		protected RecordIntPairMatchRemovingMatcher(Map<TestData.Key, Collection<RecordIntPairMatch>> map) {
+		protected TupleIntPairMatchRemovingMatcher(Map<Integer, Collection<TupleIntPairMatch>> map) {
 			this.toRemoveFrom = map;
 		}
 		
 		@Override
-		public void join(IntPair rec1, Record rec2, Collector<Record> out) throws Exception
+		public void join(IntPair rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
 		{
 			final int k = rec1.getKey();
 			final int v = rec1.getValue(); 
 			
-			final TestData.Key key = rec2.getField(0, TestData.Key.class);
-			final TestData.Value value = rec2.getField(1, TestData.Value.class);
+			final Integer key = rec2.f0;
+			final String value = rec2.f1;
 			
-			Assert.assertTrue("Key does not match for matching IntPair Record combination.", k == key.getKey()); 
+			Assert.assertTrue("Key does not match for matching IntPair Tuple combination.", k == key); 
 			
-			Collection<RecordIntPairMatch> matches = this.toRemoveFrom.get(key);
+			Collection<TupleIntPairMatch> matches = this.toRemoveFrom.get(key);
 			if (matches == null) {
 				Assert.fail("Match " + key + " - " + v + ":" + value + " is unexpected.");
 			}
 			
 			Assert.assertTrue("Produced match was not contained: " + key + " - " + v + ":" + value,
-				matches.remove(new RecordIntPairMatch(v, value)));
+				matches.remove(new TupleIntPairMatch(v, value)));
 			
 			if (matches.isEmpty()) {
 				this.toRemoveFrom.remove(key);
@@ -725,7 +718,7 @@ public class ReusingHashMatchIteratorITCase {
 		}
 	}
 	
-	static final class IntPairRecordPairComparator extends TypePairComparator<IntPair, Record>
+	static final class IntPairTuplePairComparator extends TypePairComparator<IntPair, Tuple2<Integer, String>>
 	{
 		private int reference;
 		
@@ -735,33 +728,31 @@ public class ReusingHashMatchIteratorITCase {
 		}
 
 		@Override
-		public boolean equalToReference(Record candidate) {
+		public boolean equalToReference(Tuple2<Integer, String> candidate) {
 			try {
-				final IntValue i = candidate.getField(0, IntValue.class);
-				return i.getValue() == this.reference;
+				return candidate.f0 == this.reference;
 			} catch (NullPointerException npex) {
 				throw new NullKeyFieldException();
 			}
 		}
 
 		@Override
-		public int compareToReference(Record candidate) {
+		public int compareToReference(Tuple2<Integer, String> candidate) {
 			try {
-				final IntValue i = candidate.getField(0, IntValue.class);
-				return i.getValue() - this.reference;
+				return candidate.f0 - this.reference;
 			} catch (NullPointerException npex) {
 				throw new NullKeyFieldException();
 			}
 		}
 	}
 	
-	static final class RecordIntPairPairComparator extends TypePairComparator<Record, IntPair>
+	static final class TupleIntPairPairComparator extends TypePairComparator<Tuple2<Integer, String>, IntPair>
 	{
 		private int reference;
 		
 		@Override
-		public void setReference(Record reference) {
-			this.reference = reference.getField(0, IntValue.class).getValue();
+		public void setReference(Tuple2<Integer, String> reference) {
+			this.reference = reference.f0;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index ba5a325..2a306ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -28,14 +28,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -43,21 +42,16 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.RecordMatch;
-import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.RecordMatchRemovingJoin;
-import org.apache.flink.runtime.operators.hash.HashTableITCase.ConstantsKeyValuePairsIterator;
+import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.TupleMatch;
+import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.TupleMatchRemovingJoin;
 import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
@@ -85,39 +79,35 @@ public class ReusingReOpenableHashTableITCase {
 	private IOManager ioManager;
 	private MemoryManager memoryManager;
 	
-	private TypeSerializer<Record> recordSerializer;
-	private TypeComparator<Record> record1Comparator;
-	private TypeComparator<Record> record2Comparator;
-	private TypePairComparator<Record, Record> recordPairComparator;
+	private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+	private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+	private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
 	
 	
 	
 	
 	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
-	private TypeSerializer<Record> recordBuildSideAccesssor;
-	private TypeSerializer<Record> recordProbeSideAccesssor;
-	private TypeComparator<Record> recordBuildSideComparator;
-	private TypeComparator<Record> recordProbeSideComparator;
-	private TypePairComparator<Record, Record> pactRecordComparator;
+	private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
+	private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
+	private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
+	private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
+	private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
 
 	@SuppressWarnings({"unchecked", "rawtypes"})
 	@Before
 	public void beforeTest() {
-		this.recordSerializer = RecordSerializer.get();
+		this.recordSerializer = TestData.getIntStringTupleSerializer();
 		
-		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+		this.record1Comparator = TestData.getIntStringTupleComparator();
+		this.record2Comparator = TestData.getIntStringTupleComparator();
+		this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
 		
-		
-		final int[] keyPos = new int[] {0};
-		final Class<? extends Key>[] keyType = (Class<? extends Key>[]) new Class[] { IntValue.class };
-		
-		this.recordBuildSideAccesssor = RecordSerializer.get();
-		this.recordProbeSideAccesssor = RecordSerializer.get();
-		this.recordBuildSideComparator = new RecordComparator(keyPos, keyType);
-		this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
-		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
+		this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
+		this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
+		this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
+		this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
+		this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
 		
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 		this.ioManager = new IOManagerAsync();
@@ -152,11 +142,11 @@ public class ReusingReOpenableHashTableITCase {
 		int buildSize = 1000;
 		int probeSize = 1000;
 		try {
-			Generator bgen = new Generator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+			TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
 			
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
 			doTest(buildInput,probeInput, bgen, pgen);
 		}
 		catch (Exception e) {
@@ -174,11 +164,11 @@ public class ReusingReOpenableHashTableITCase {
 		int buildSize = 1000;
 		int probeSize = 1000;
 		try {
-			Generator bgen = new Generator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
 			
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
 			doTest(buildInput,probeInput, bgen, pgen);
 		}
 		catch (Exception e) {
@@ -197,11 +187,11 @@ public class ReusingReOpenableHashTableITCase {
 		int buildSize = 1000;
 		int probeSize = 1000;
 		try {
-			Generator bgen = new Generator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
 			
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
 			
 			doTest(buildInput,probeInput, bgen, pgen);
 		}
@@ -211,21 +201,21 @@ public class ReusingReOpenableHashTableITCase {
 		}
 	}
 	
-	private void doTest(TestData.GeneratorIterator buildInput, TestData.GeneratorIterator probeInput, Generator bgen, Generator pgen) throws Exception {
+	private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, TestData.TupleGenerator pgen) throws Exception {
 		// collect expected data
-		final Map<TestData.Key, Collection<RecordMatch>> expectedFirstMatchesMap = ReusingHashMatchIteratorITCase.matchRecordValues(ReusingHashMatchIteratorITCase.collectRecordData(buildInput), ReusingHashMatchIteratorITCase.collectRecordData(probeInput));
+		final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = ReusingHashMatchIteratorITCase.matchSecondTupleFields(ReusingHashMatchIteratorITCase.collectTupleData(buildInput), ReusingHashMatchIteratorITCase.collectTupleData(probeInput));
 		
-		final List<Map<TestData.Key, Collection<RecordMatch>>> expectedNMatchesMapList = new ArrayList<Map<Key,Collection<RecordMatch>>>(NUM_PROBES);
-		final JoinFunction[] nMatcher = new RecordMatchRemovingJoin[NUM_PROBES];
+		final List<Map<Integer, Collection<TupleMatch>>> expectedNMatchesMapList = new ArrayList<>(NUM_PROBES);
+		final FlatJoinFunction[] nMatcher = new TupleMatchRemovingJoin[NUM_PROBES];
 		for(int i = 0; i < NUM_PROBES; i++) {
-			Map<TestData.Key, Collection<RecordMatch>> tmp;
+			Map<Integer, Collection<TupleMatch>> tmp;
 			expectedNMatchesMapList.add(tmp = deepCopy(expectedFirstMatchesMap));
-			nMatcher[i] = new RecordMatchRemovingJoin(tmp);
+			nMatcher[i] = new TupleMatchRemovingJoin(tmp);
 		}
 		
-		final JoinFunction firstMatcher = new RecordMatchRemovingJoin(expectedFirstMatchesMap);
+		final FlatJoinFunction firstMatcher = new TupleMatchRemovingJoin(expectedFirstMatchesMap);
 		
-		final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+		final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 
 		// reset the generators
 		bgen.reset();
@@ -234,8 +224,8 @@ public class ReusingReOpenableHashTableITCase {
 		probeInput.reset();
 
 		// compare with iterator values
-		ReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record> iterator =
-				new ReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
+		ReusingBuildFirstReOpenableHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new ReusingBuildFirstReOpenableHashMatchIterator<>(
 						buildInput, probeInput, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 					this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -245,7 +235,7 @@ public class ReusingReOpenableHashTableITCase {
 		while (iterator.callWithNextKey(firstMatcher, collector));
 
 		// assert that each expected match was seen for the first input
-		for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedFirstMatchesMap.entrySet()) {
+		for (Entry<Integer, Collection<TupleMatch>> entry : expectedFirstMatchesMap.entrySet()) {
 			if (!entry.getValue().isEmpty()) {
 				Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 			}
@@ -260,7 +250,7 @@ public class ReusingReOpenableHashTableITCase {
 			while (iterator.callWithNextKey(nMatcher[i], collector));
 			
 			// assert that each expected match was seen for the second input
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -276,16 +266,16 @@ public class ReusingReOpenableHashTableITCase {
 	//
 	//
 	
-	private MutableObjectIterator<Record> getProbeInput(final int numKeys,
+	private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(final int numKeys,
 			final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
-		MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
-		MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
-		MutableObjectIterator<Record> probe3 = new ConstantsKeyValuePairsIterator(repeatedValue2, 23, 5);
-		List<MutableObjectIterator<Record>> probes = new ArrayList<MutableObjectIterator<Record>>();
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = new ArrayList<>();
 		probes.add(probe1);
 		probes.add(probe2);
 		probes.add(probe3);
-		return new UnionIterator<Record>(probes);
+		return new UnionIterator<>(probes);
 	}
 	
 	@Test
@@ -302,14 +292,14 @@ public class ReusingReOpenableHashTableITCase {
 		final int PROBE_VALS_PER_KEY = 10;
 		
 		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+		MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
 		builds.add(build1);
 		builds.add(build2);
 		builds.add(build3);
-		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+		MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
 
 		// allocate the memory for the HashTable
 		List<MemorySegment> memSegments;
@@ -326,40 +316,40 @@ public class ReusingReOpenableHashTableITCase {
 		
 		// ----------------------------------------------------------------------------------------
 		
-		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+		final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
 				memSegments, ioManager, true);
 		
 		for(int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+			MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
 			if(probe == 0) {
 				join.open(buildInput, probeInput);
 			} else {
 				join.reopenProbe(probeInput);
 			}
 		
-			Record record;
-			final Record recordReuse = new Record();
+			Tuple2<Integer, Integer> record;
+			final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
 
 			while (join.nextRecord()) {
 				long numBuildValues = 0;
 		
-				final Record probeRec = join.getCurrentProbeRecord();
-				int key = probeRec.getField(0, IntValue.class).getValue();
+				final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+				Integer key = probeRec.f0;
 				
-				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+				HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
 				if ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); 
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); 
 				}
 				else {
 					fail("No build side values found for a probe key.");
 				}
 				while ((record = buildSide.next(record)) != null) {
 					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
 				}
 				
 				Long contained = map.get(key);
@@ -416,14 +406,14 @@ public class ReusingReOpenableHashTableITCase {
 		final int PROBE_VALS_PER_KEY = 10;
 		
 		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+		MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
 		builds.add(build1);
 		builds.add(build2);
 		builds.add(build3);
-		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+		MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
 	
 
 		// allocate the memory for the HashTable
@@ -441,40 +431,40 @@ public class ReusingReOpenableHashTableITCase {
 		
 		// ----------------------------------------------------------------------------------------
 		
-		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+		final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
 				memSegments, ioManager, true);
 		
 		for (int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+			MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
 			if(probe == 0) {
 				join.open(buildInput, probeInput);
 			} else {
 				join.reopenProbe(probeInput);
 			}
-			Record record;
-			final Record recordReuse = new Record();
+			Tuple2<Integer, Integer> record;
+			final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
 
 			while (join.nextRecord())
 			{	
 				long numBuildValues = 0;
 				
-				final Record probeRec = join.getCurrentProbeRecord();
-				int key = probeRec.getField(0, IntValue.class).getValue();
+				final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+				Integer key = probeRec.f0;
 				
-				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+				HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
 				if ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); 
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); 
 				}
 				else {
 					fail("No build side values found for a probe key.");
 				}
 				while ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
 				}
 				
 				Long contained = map.get(key);
@@ -511,11 +501,11 @@ public class ReusingReOpenableHashTableITCase {
 	}
 	
 	
-	static Map<Key, Collection<RecordMatch>> deepCopy(Map<Key, Collection<RecordMatch>> expectedSecondMatchesMap) {
-		Map<Key, Collection<RecordMatch>> copy = new HashMap<Key, Collection<RecordMatch>>(expectedSecondMatchesMap.size());
-		for(Map.Entry<Key, Collection<RecordMatch>> entry : expectedSecondMatchesMap.entrySet()) {
-			List<RecordMatch> matches = new ArrayList<RecordMatch>(entry.getValue().size());
-			for(RecordMatch m : entry.getValue()) {
+	static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, Collection<TupleMatch>> expectedSecondMatchesMap) {
+		Map<Integer, Collection<TupleMatch>> copy = new HashMap<>(expectedSecondMatchesMap.size());
+		for(Map.Entry<Integer, Collection<TupleMatch>> entry : expectedSecondMatchesMap.entrySet()) {
+			List<TupleMatch> matches = new ArrayList<TupleMatch>(entry.getValue().size());
+			for(TupleMatch m : entry.getValue()) {
 				matches.add(m);
 			}
 			copy.put(entry.getKey(), matches);