You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/15 15:07:21 UTC

[2/6] flink git commit: [FLINK-2479] Refactor runtime.operators.* tests

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
index 2427edd..d12307a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
@@ -18,13 +18,24 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
-import java.util.Comparator;
+import java.io.IOException;
 import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.java.tuple.Tuple;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.testutils.types.IntPair;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**
@@ -38,250 +49,6 @@ public final class TestData {
 	private TestData() {}
 
 	/**
-	 * Key comparator.
-	 */
-	public static class KeyComparator implements Comparator<Key> {
-		@Override
-		public int compare(Key k1, Key k2) {
-			return k1.compareTo(k2);
-		}
-	};
-
-	/**
-	 * Key implementation.
-	 */
-	public static class Key extends IntValue {
-		private static final long serialVersionUID = 1L;
-		
-		public Key() {
-			super();
-		}
-
-		public Key(int k) {
-			super(k);
-		}
-
-		public int getKey() {
-			return getValue();
-		}
-		
-		public void setKey(int key) {
-			setValue(key);
-		}
-	}
-
-	/**
-	 * Value implementation.
-	 */
-	public static class Value extends StringValue {
-		
-		private static final long serialVersionUID = 1L;
-
-		public Value() {
-			super();
-		}
-
-		public Value(String v) {
-			super(v);
-		}
-		
-		@Override
-		public boolean equals(final Object obj) {
-			if (this == obj) {
-				return true;
-			}
-			
-			if (obj.getClass() == TestData.Value.class) {
-				final StringValue other = (StringValue) obj;
-				int len = this.length();
-				
-				if (len == other.length()) {
-					final char[] tc = this.getCharArray();
-					final char[] oc = other.getCharArray();
-					int i = 0, j = 0;
-					
-					while (len-- != 0) {
-						if (tc[i++] != oc[j++]) {
-							return false;
-						}
-					}
-					return true;
-				}
-			}
-			return false;
-		}
-	}
-
-	/**
-	 * Pair generator.
-	 */
-	public static class Generator implements MutableObjectIterator<Record> {
-		
-		public enum KeyMode {
-			SORTED, RANDOM
-		};
-
-		public enum ValueMode {
-			FIX_LENGTH, RANDOM_LENGTH, CONSTANT
-		};
-
-		private static char[] alpha = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'a', 'b', 'c',
-			'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm' };
-
-		private final long seed;
-
-		private final int keyMax;
-
-		private final int valueLength;
-
-		private final KeyMode keyMode;
-
-		private final ValueMode valueMode;
-
-		private Random random;
-
-		private int counter;
-
-		private Key key;
-		private Value value;
-
-		public Generator(long seed, int keyMax, int valueLength) {
-			this(seed, keyMax, valueLength, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-		}
-
-		public Generator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode) {
-			this(seed, keyMax, valueLength, keyMode, valueMode, null);
-		}
-		
-		public Generator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode, Value constant) {
-			this.seed = seed;
-			this.keyMax = keyMax;
-			this.valueLength = valueLength;
-			this.keyMode = keyMode;
-			this.valueMode = valueMode;
-
-			this.random = new Random(seed);
-			this.counter = 0;
-			
-			this.key = new Key();
-			this.value = constant == null ? new Value() : constant;
-		}
-
-		public Record next(Record reuse) {
-			this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1);
-			if (this.valueMode != ValueMode.CONSTANT) {
-				this.value.setValue(randomString());
-			}
-			reuse.setField(0, this.key);
-			reuse.setField(1, this.value);
-			return reuse;
-		}
-
-		public Record next() {
-			return next(new Record(2));
-		}
-
-		public boolean next(org.apache.flink.types.Value[] target) {
-			this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1);
-			// TODO change this to something proper
-			((IntValue)target[0]).setValue(this.key.getValue());
-			((IntValue)target[1]).setValue(random.nextInt());
-			return true;
-		}
-
-		public int sizeOf(Record rec) {
-			// key
-			int valueLength = Integer.SIZE / 8;
-
-			// value
-			String text = rec.getField(1, Value.class).getValue();
-			int strlen = text.length();
-			int utflen = 0;
-			int c;
-			for (int i = 0; i < strlen; i++) {
-				c = text.charAt(i);
-				if ((c >= 0x0001) && (c <= 0x007F)) {
-					utflen++;
-				} else if (c > 0x07FF) {
-					utflen += 3;
-				} else {
-					utflen += 2;
-				}
-			}
-			valueLength += 2 + utflen;
-
-			return valueLength;
-		}
-
-		public void reset() {
-			this.random = new Random(seed);
-			this.counter = 0;
-		}
-
-		private String randomString() {
-			int length;
-
-			if (valueMode == ValueMode.FIX_LENGTH) {
-				length = valueLength;
-			} else {
-				length = valueLength - random.nextInt(valueLength / 3);
-			}
-
-			StringBuilder sb = new StringBuilder();
-			for (int i = 0; i < length; i++) {
-				sb.append(alpha[random.nextInt(alpha.length)]);
-			}
-			return sb.toString();
-		}
-
-	}
-	
-	/**
-	 * Record reader mock.
-	 */
-	public static class GeneratorIterator implements MutableObjectIterator<Record> {
-		
-		private final Generator generator;
-
-		private final int numberOfRecords;
-
-		private int counter;
-
-		public GeneratorIterator(Generator generator, int numberOfRecords) {
-			this.generator = generator;
-			this.generator.reset();
-			this.numberOfRecords = numberOfRecords;
-			this.counter = 0;
-		}
-
-		@Override
-		public Record next(Record target) {
-			if (counter < numberOfRecords) {
-				counter++;
-				return generator.next(target);
-			}
-			else {
-				return null;
-			}
-		}
-
-		@Override
-		public Record next() {
-			if (counter < numberOfRecords) {
-				counter++;
-				return generator.next();
-			}
-			else {
-				return null;
-			}
-		}
-		
-		public void reset() {
-			this.counter = 0;
-		}
-	}
-
-	/**
 	 * Tuple2<Integer, String> generator.
 	 */
 	public static class TupleGenerator implements MutableObjectIterator<Tuple2<Integer, String>> {
@@ -398,9 +165,8 @@ public final class TestData {
 
 	}
 
-
 	/**
-	 * Record reader mock.
+	 * Tuple reader mock.
 	 */
 	public static class TupleGeneratorIterator implements MutableObjectIterator<Tuple2<Integer, String>> {
 
@@ -443,35 +209,31 @@ public final class TestData {
 			this.counter = 0;
 		}
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static class ConstantValueIterator implements MutableObjectIterator<Record> {
-		
-		private final Key key;
-		private final Value value;
-		
+
+	public static class TupleConstantValueIterator implements MutableObjectIterator<Tuple2<Integer, String>> {
+
+		private int key;
+		private String value;
+
 		private final String valueValue;
-		
-		
+
+
 		private final int numPairs;
-		
+
 		private int pos;
-		
-		
-		public ConstantValueIterator(int keyValue, String valueValue, int numPairs) {
-			this.key = new Key(keyValue);
-			this.value = new Value();
+
+
+		public TupleConstantValueIterator(int keyValue, String valueValue, int numPairs) {
+			this.key = keyValue;
 			this.valueValue = valueValue;
 			this.numPairs = numPairs;
 		}
-		
+
 		@Override
-		public Record next(Record reuse) {
+		public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) {
 			if (pos < this.numPairs) {
-				this.value.setValue(this.valueValue + ' ' + pos);
-				reuse.setField(0, this.key);
-				reuse.setField(1, this.value);
+				this.value = this.valueValue + ' ' + pos;
+				reuse.setFields(this.key, this.value);
 				pos++;
 				return reuse;
 			}
@@ -481,8 +243,8 @@ public final class TestData {
 		}
 
 		@Override
-		public Record next() {
-			return next(new Record(2));
+		public Tuple2<Integer, String> next() {
+			return next(new Tuple2<Integer, String>());
 		}
 
 		public void reset() {
@@ -490,45 +252,307 @@ public final class TestData {
 		}
 	}
 
-	public static class TupleConstantValueIterator implements MutableObjectIterator<Tuple2<Integer, String>> {
+	/**
+	 * An iterator that returns the Key/Value pairs with identical value a given number of times.
+	 */
+	public static final class ConstantIntIntTuplesIterator implements MutableObjectIterator<Tuple2<Integer, Integer>> {
 
-		private int key;
-		private String value;
+		private final int key;
+		private final int value;
 
-		private final String valueValue;
+		private int numLeft;
 
+		public ConstantIntIntTuplesIterator(int key, int value, int count) {
+			this.key = key;
+			this.value = value;
+			this.numLeft = count;
+		}
 
-		private final int numPairs;
+		@Override
+		public Tuple2<Integer, Integer> next(Tuple2<Integer, Integer> reuse) {
+			if (this.numLeft > 0) {
+				this.numLeft--;
+				reuse.setField(this.key, 0);
+				reuse.setField(this.value, 1);
+				return reuse;
+			} else {
+				return null;
+			}
+		}
 
-		private int pos;
+		@Override
+		public Tuple2<Integer, Integer> next() {
+			return next(new Tuple2<>(0, 0));
+		}
+	}
 
+	//----Tuple2<Integer, String>
+	private static final TupleTypeInfo<Tuple2<Integer, String>> typeInfoIntString = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class);
 
-		public TupleConstantValueIterator(int keyValue, String valueValue, int numPairs) {
-			this.key = keyValue;
-			this.valueValue = valueValue;
-			this.numPairs = numPairs;
+	private static final TypeSerializerFactory<Tuple2<Integer, String>> serializerFactoryIntString = new MockTupleSerializerFactory(typeInfoIntString);
+
+	public static TupleTypeInfo<Tuple2<Integer, String>> getIntStringTupleTypeInfo() {
+		return typeInfoIntString;
+	}
+
+	public static TypeSerializerFactory<Tuple2<Integer, String>> getIntStringTupleSerializerFactory() {
+		return serializerFactoryIntString;
+	}
+
+	public static TypeSerializer<Tuple2<Integer, String>> getIntStringTupleSerializer() {
+		return serializerFactoryIntString.getSerializer();
+	}
+
+	public static TypeComparator<Tuple2<Integer, String>> getIntStringTupleComparator() {
+		return getIntStringTupleTypeInfo().createComparator(new int[]{0}, new boolean[]{true}, 0, null);
+	}
+
+	public static MockTuple2Reader<Tuple2<Integer, String>> getIntStringTupleReader() {
+		return new MockTuple2Reader<Tuple2<Integer, String>>();
+	}
+
+	//----Tuple2<Integer, Integer>
+	private static final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfoIntInt = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class);
+
+	private static final TypeSerializerFactory<Tuple2<Integer, Integer>> serializerFactoryIntInt = new MockTupleSerializerFactory(typeInfoIntInt);
+
+	public static TupleTypeInfo<Tuple2<Integer, Integer>> getIntIntTupleTypeInfo() {
+		return typeInfoIntInt;
+	}
+
+	public static TypeSerializerFactory<Tuple2<Integer, Integer>> getIntIntTupleSerializerFactory() {
+		return serializerFactoryIntInt;
+	}
+
+	public static TypeSerializer<Tuple2<Integer, Integer>> getIntIntTupleSerializer() {
+		return getIntIntTupleSerializerFactory().getSerializer();
+	}
+
+	public static TypeComparator<Tuple2<Integer, Integer>> getIntIntTupleComparator() {
+		return getIntIntTupleTypeInfo().createComparator(new int[]{0}, new boolean[]{true}, 0, null);
+	}
+
+	public static MockTuple2Reader<Tuple2<Integer, Integer>> getIntIntTupleReader() {
+		return new MockTuple2Reader<>();
+	}
+
+	//----Tuple2<?, ?>
+	private static class MockTupleSerializerFactory<T extends Tuple> implements TypeSerializerFactory<T> {
+		private final TupleTypeInfo<T> info;
+
+		public MockTupleSerializerFactory(TupleTypeInfo<T> info) {
+			this.info = info;
 		}
 
 		@Override
-		public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) {
-			if (pos < this.numPairs) {
-				this.value = this.valueValue + ' ' + pos;
-				reuse.setFields(this.key, this.value);
-				pos++;
+		public void writeParametersToConfig(Configuration config) {
+			throw new UnsupportedOperationException("Not supported yet.");
+		}
+
+		@Override
+		public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {
+			throw new UnsupportedOperationException("Not supported yet.");
+		}
+
+		@Override
+		public TypeSerializer<T> getSerializer() {
+			return info.createSerializer(null);
+		}
+
+		@Override
+		public Class<T> getDataType() {
+			return info.getTypeClass();
+		}
+	}
+
+	public static class MockTuple2Reader<T extends Tuple2> implements MutableObjectIterator<T> {
+		private final Tuple2 SENTINEL = new Tuple2();
+
+		private final BlockingQueue<Tuple2> queue;
+
+		public MockTuple2Reader() {
+			this.queue = new ArrayBlockingQueue<Tuple2>(32, false);
+		}
+
+		public MockTuple2Reader(int size) {
+			this.queue = new ArrayBlockingQueue<Tuple2>(size, false);
+		}
+
+		@Override
+		public T next(T reuse) {
+			Tuple2 r = null;
+			while (r == null) {
+				try {
+					r = queue.take();
+				} catch (InterruptedException iex) {
+					throw new RuntimeException("Reader was interrupted.");
+				}
+			}
+
+			if (r.equals(SENTINEL)) {
+				// put the sentinel back, to ensure that repeated calls do not block
+				try {
+					queue.put(r);
+				} catch (InterruptedException e) {
+					throw new RuntimeException("Reader was interrupted.");
+				}
+				return null;
+			} else {
+				reuse.setField(r.getField(0), 0);
+				reuse.setField(r.getField(1), 1);
 				return reuse;
 			}
-			else {
+		}
+
+		@Override
+		public T next() {
+			Tuple2 r = null;
+			while (r == null) {
+				try {
+					r = queue.take();
+				} catch (InterruptedException iex) {
+					throw new RuntimeException("Reader was interrupted.");
+				}
+			}
+
+			if (r.equals(SENTINEL)) {
+				// put the sentinel back, to ensure that repeated calls do not block
+				try {
+					queue.put(r);
+				} catch (InterruptedException e) {
+					throw new RuntimeException("Reader was interrupted.");
+				}
 				return null;
+			} else {
+				Tuple2 result = new Tuple2(r.f0, r.f1);
+				return (T) result;
+			}
+		}
+
+		public void emit(Tuple2 element) throws InterruptedException {
+			queue.put(new Tuple2(element.f0, element.f1));
+		}
+
+		public void close() {
+			try {
+				queue.put(SENTINEL);
+			} catch (InterruptedException e) {
+				throw new RuntimeException(e);
 			}
 		}
+	}
+	
+	public static class IntPairComparator extends TypeComparator<IntPair> {
+
+		private static final long serialVersionUID = 1L;
+
+		private int reference;
+
+		private final TypeComparator[] comparators = new TypeComparator[]{new IntComparator(true)};
 
 		@Override
-		public Tuple2<Integer, String> next() {
-			return next(new Tuple2<Integer, String>());
+		public int hash(IntPair object) {
+			return comparators[0].hash(object.getKey());
 		}
 
-		public void reset() {
-			this.pos = 0;
+		@Override
+		public void setReference(IntPair toCompare) {
+			this.reference = toCompare.getKey();
+		}
+
+		@Override
+		public boolean equalToReference(IntPair candidate) {
+			return candidate.getKey() == this.reference;
+		}
+
+		@Override
+		public int compareToReference(TypeComparator<IntPair> referencedAccessors) {
+			final IntPairComparator comp = (IntPairComparator) referencedAccessors;
+			return comp.reference - this.reference;
+		}
+
+		@Override
+		public int compare(IntPair first, IntPair second) {
+			return first.getKey() - second.getKey();
+		}
+
+		@Override
+		public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
+			return source1.readInt() - source2.readInt();
+		}
+
+		@Override
+		public boolean supportsNormalizedKey() {
+			return true;
+		}
+
+		@Override
+		public int getNormalizeKeyLen() {
+			return 4;
+		}
+
+		@Override
+		public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+			return keyBytes < 4;
+		}
+
+		@Override
+		public void putNormalizedKey(IntPair record, MemorySegment target, int offset, int len) {
+			// see IntValue for a documentation of the logic
+			final int value = record.getKey() - Integer.MIN_VALUE;
+
+			if (len == 4) {
+				target.putIntBigEndian(offset, value);
+			} else if (len <= 0) {
+			} else if (len < 4) {
+				for (int i = 0; len > 0; len--, i++) {
+					target.put(offset + i, (byte) ((value >>> ((3 - i) << 3)) & 0xff));
+				}
+			} else {
+				target.putIntBigEndian(offset, value);
+				for (int i = 4; i < len; i++) {
+					target.put(offset + i, (byte) 0);
+				}
+			}
+		}
+
+		@Override
+		public boolean invertNormalizedKey() {
+			return false;
+		}
+
+		@Override
+		public IntPairComparator duplicate() {
+			return new IntPairComparator();
+		}
+
+		@Override
+		public int extractKeys(Object record, Object[] target, int index) {
+			target[index] = ((IntPair) record).getKey();
+			return 1;
+		}
+
+		@Override
+		public TypeComparator[] getFlatComparators() {
+			return comparators;
+		}
+
+		@Override
+		public boolean supportsSerializationWithKeyNormalization() {
+			return true;
+		}
+
+		@Override
+		public void writeWithKeyNormalization(IntPair record, DataOutputView target) throws IOException {
+			target.writeInt(record.getKey() - Integer.MIN_VALUE);
+			target.writeInt(record.getValue());
+		}
+
+		@Override
+		public IntPair readWithKeyDenormalization(IntPair reuse, DataInputView source) throws IOException {
+			reuse.setKey(source.readInt() + Integer.MIN_VALUE);
+			reuse.setValue(source.readInt());
+			return reuse;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index f112ff8..4c5a07e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.runtime.operators.util;
 
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -37,10 +36,8 @@ import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
@@ -78,21 +75,21 @@ public class HashVsSortMiniBenchmark {
 	private IOManager ioManager;
 	private MemoryManager memoryManager;
 	
-	private TypeSerializerFactory<Record> serializer1;
-	private TypeSerializerFactory<Record> serializer2;
-	private TypeComparator<Record> comparator1;
-	private TypeComparator<Record> comparator2;
-	private TypePairComparator<Record, Record> pairComparator11;
+	private TypeSerializerFactory<Tuple2<Integer, String>> serializer1;
+	private TypeSerializerFactory<Tuple2<Integer, String>> serializer2;
+	private TypeComparator<Tuple2<Integer, String>> comparator1;
+	private TypeComparator<Tuple2<Integer, String>> comparator2;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator11;
 
 
 	@SuppressWarnings("unchecked")
 	@Before
 	public void beforeTest() {
-		this.serializer1 = RecordSerializerFactory.get();
-		this.serializer2 = RecordSerializerFactory.get();
-		this.comparator1 = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.comparator2 = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.pairComparator11 = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+		this.serializer1 = TestData.getIntStringTupleSerializerFactory();
+		this.serializer2 = TestData.getIntStringTupleSerializerFactory();
+		this.comparator1 = TestData.getIntStringTupleComparator();
+		this.comparator2 = TestData.getIntStringTupleComparator();
+		this.pairComparator11 = new GenericPairComparator(this.comparator1, this.comparator2);
 		
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 		this.ioManager = new IOManagerAsync();
@@ -120,31 +117,31 @@ public class HashVsSortMiniBenchmark {
 	public void testSortBothMerge() {
 		try {
 			
-			Generator generator1 = new Generator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final JoinFunction matcher = new NoOpMatcher();
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new NoOpMatcher();
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 			
 			long start = System.nanoTime();
 			
-			final UnilateralSortMerger<Record> sorter1 = new UnilateralSortMerger<Record>(
+			final UnilateralSortMerger<Tuple2<Integer, String>> sorter1 = new UnilateralSortMerger<>(
 					this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1, 
 					this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
 			
-			final UnilateralSortMerger<Record> sorter2 = new UnilateralSortMerger<Record>(
+			final UnilateralSortMerger<Tuple2<Integer, String>> sorter2 = new UnilateralSortMerger<>(
 					this.memoryManager, this.ioManager, input2, this.parentTask, this.serializer2, 
 					this.comparator2.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
 			
-			final MutableObjectIterator<Record> sortedInput1 = sorter1.getIterator();
-			final MutableObjectIterator<Record> sortedInput2 = sorter2.getIterator();
+			final MutableObjectIterator<Tuple2<Integer, String>> sortedInput1 = sorter1.getIterator();
+			final MutableObjectIterator<Tuple2<Integer, String>> sortedInput2 = sorter2.getIterator();
 			
 			// compare with iterator values
-			ReusingMergeInnerJoinIterator<Record, Record, Record> iterator =
-				new ReusingMergeInnerJoinIterator<Record, Record, Record>(sortedInput1, sortedInput2,
+			ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new ReusingMergeInnerJoinIterator<>(sortedInput1, sortedInput2,
 						this.serializer1.getSerializer(), this.comparator1, this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
 						this.memoryManager, this.ioManager, MEMORY_PAGES_FOR_MERGE, this.parentTask);
 			
@@ -170,21 +167,21 @@ public class HashVsSortMiniBenchmark {
 	@Test
 	public void testBuildFirst() {
 		try {
-			Generator generator1 = new Generator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final JoinFunction matcher = new NoOpMatcher();
+			final FlatJoinFunction matcher = new NoOpMatcher();
 			
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 			
 			long start = System.nanoTime();
 			
 			// compare with iterator values
-			final ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
-					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+			final ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 							this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
 							this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
@@ -209,21 +206,21 @@ public class HashVsSortMiniBenchmark {
 	@Test
 	public void testBuildSecond() {
 		try {
-			Generator generator1 = new Generator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final JoinFunction matcher = new NoOpMatcher();
+			final FlatJoinFunction matcher = new NoOpMatcher();
 			
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 			
 			long start = System.nanoTime();
 			
 			// compare with iterator values
-			ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
-					new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+			ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildSecondHashMatchIterator<>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 						this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
 						this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
@@ -246,11 +243,11 @@ public class HashVsSortMiniBenchmark {
 	}
 	
 	
-	private static final class NoOpMatcher extends JoinFunction {
+	private static final class NoOpMatcher implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
-		public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception {
+		public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception {
 		}
 	}
 }