You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/15 15:07:22 UTC

[3/6] flink git commit: [FLINK-2479] Refactor runtime.operators.* tests

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index e1e2c0a..7b7b940 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.IOException;
-import java.util.Comparator;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -30,9 +29,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -40,12 +39,9 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
@@ -72,9 +68,11 @@ public class CombiningUnilateralSortMergerITCase {
 
 	private MemoryManager memoryManager;
 
-	private TypeSerializerFactory<Record> serializerFactory;
+	private TypeSerializerFactory<Tuple2<Integer, String>> serializerFactory1;
+	private TypeSerializerFactory<Tuple2<Integer, Integer>> serializerFactory2;
 	
-	private TypeComparator<Record> comparator;
+	private TypeComparator<Tuple2<Integer, String>> comparator1;
+	private TypeComparator<Tuple2<Integer, Integer>> comparator2;
 
 	@SuppressWarnings("unchecked")
 	@Before
@@ -82,8 +80,11 @@ public class CombiningUnilateralSortMergerITCase {
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManagerAsync();
 		
-		this.serializerFactory = RecordSerializerFactory.get();
-		this.comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+		this.serializerFactory1 = TestData.getIntStringTupleSerializerFactory();
+		this.comparator1 = TestData.getIntStringTupleComparator();
+
+		this.serializerFactory2 = TestData.getIntIntTupleSerializerFactory();
+		this.comparator2 = TestData.getIntIntTupleComparator();
 	}
 
 	@After
@@ -107,32 +108,30 @@ public class CombiningUnilateralSortMergerITCase {
 		int noKeys = 100;
 		int noKeyCnt = 10000;
 
-		MockRecordReader reader = new MockRecordReader();
+		TestData.MockTuple2Reader<Tuple2<Integer, Integer>> reader = TestData.getIntIntTupleReader();
 
 		LOG.debug("initializing sortmerger");
 		
 		TestCountCombiner comb = new TestCountCombiner();
 		
-		Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, 
-				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
+		Sorter<Tuple2<Integer, Integer>> merger = new CombiningUnilateralSortMerger<>(comb,
+				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2,
 				0.25, 64, 0.7f, false);
 
-		final Record rec = new Record();
-		rec.setField(1, new IntValue(1));
-		final TestData.Key key = new TestData.Key();
+		final Tuple2<Integer, Integer> rec = new Tuple2<>();
+		rec.setField(1, 1);
 		
 		for (int i = 0; i < noKeyCnt; i++) {
 			for (int j = 0; j < noKeys; j++) {
-				key.setKey(j);
-				rec.setField(0, key);
+				rec.setField(j, 0);
 				reader.emit(rec);
 			}
 		}
 		reader.close();
 		
-		MutableObjectIterator<Record> iterator = merger.getIterator();
+		MutableObjectIterator<Tuple2<Integer, Integer>> iterator = merger.getIterator();
 
-		Iterator<Integer> result = getReducingIterator(iterator, serializerFactory.getSerializer(), comparator.duplicate());
+		Iterator<Integer> result = getReducingIterator(iterator, serializerFactory2.getSerializer(), comparator2.duplicate());
 		while (result.hasNext()) {
 			Assert.assertEquals(noKeyCnt, result.next().intValue());
 		}
@@ -148,32 +147,30 @@ public class CombiningUnilateralSortMergerITCase {
 		int noKeys = 100;
 		int noKeyCnt = 10000;
 
-		MockRecordReader reader = new MockRecordReader();
+		TestData.MockTuple2Reader<Tuple2<Integer, Integer>> reader = TestData.getIntIntTupleReader();
 
 		LOG.debug("initializing sortmerger");
 		
 		TestCountCombiner comb = new TestCountCombiner();
 		
-		Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, 
-				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
+		Sorter<Tuple2<Integer, Integer>> merger = new CombiningUnilateralSortMerger<>(comb,
+				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2,
 				0.01, 64, 0.005f, true);
 
-		final Record rec = new Record();
-		rec.setField(1, new IntValue(1));
-		final TestData.Key key = new TestData.Key();
+		final Tuple2<Integer, Integer> rec = new Tuple2<>();
+		rec.setField(1, 1);
 		
 		for (int i = 0; i < noKeyCnt; i++) {
 			for (int j = 0; j < noKeys; j++) {
-				key.setKey(j);
-				rec.setField(0, key);
+				rec.setField(j, 0);
 				reader.emit(rec);
 			}
 		}
 		reader.close();
 		
-		MutableObjectIterator<Record> iterator = merger.getIterator();
+		MutableObjectIterator<Tuple2<Integer, Integer>> iterator = merger.getIterator();
 
-		Iterator<Integer> result = getReducingIterator(iterator, serializerFactory.getSerializer(), comparator.duplicate());
+		Iterator<Integer> result = getReducingIterator(iterator, serializerFactory2.getSerializer(), comparator2.duplicate());
 		while (result.hasNext()) {
 			Assert.assertEquals(noKeyCnt, result.next().intValue());
 		}
@@ -187,65 +184,60 @@ public class CombiningUnilateralSortMergerITCase {
 	@Test
 	public void testSortAndValidate() throws Exception
 	{
-		final Hashtable<TestData.Key, Integer> countTable = new Hashtable<TestData.Key, Integer>(KEY_MAX);
+		final Hashtable<Integer, Integer> countTable = new Hashtable<>(KEY_MAX);
 		for (int i = 1; i <= KEY_MAX; i++) {
-			countTable.put(new TestData.Key(i), 0);
+			countTable.put(i, 0);
 		}
 
 		// comparator
-		final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+		final TypeComparator<Integer> keyComparator = new IntComparator(true);
 
 		// reader
-		MockRecordReader reader = new MockRecordReader();
+		TestData.MockTuple2Reader<Tuple2<Integer, String>> reader = TestData.getIntStringTupleReader();
 
 		// merge iterator
 		LOG.debug("initializing sortmerger");
 		
 		TestCountCombiner2 comb = new TestCountCombiner2();
 		
-		Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, 
-				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
+		Sorter<Tuple2<Integer, String>> merger = new CombiningUnilateralSortMerger<>(comb,
+				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory1, this.comparator1,
 				0.25, 2, 0.7f, false);
 
 		// emit data
 		LOG.debug("emitting data");
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-		Record rec = new Record();
-		final TestData.Value value = new TestData.Value("1");
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+		Tuple2<Integer, String> rec = new Tuple2<>();
 		
 		for (int i = 0; i < NUM_PAIRS; i++) {
 			Assert.assertTrue((rec = generator.next(rec)) != null);
-			final TestData.Key key = rec.getField(0, TestData.Key.class);
-			rec.setField(1, value);
+			final Integer key = rec.f0;
+			rec.setField("1", 1);
 			reader.emit(rec);
 			
-			countTable.put(new TestData.Key(key.getKey()), countTable.get(key) + 1);
+			countTable.put(key, countTable.get(key) + 1);
 		}
 		reader.close();
-		rec = null;
 
 		// check order
-		MutableObjectIterator<Record> iterator = merger.getIterator();
+		MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
 		
 		LOG.debug("checking results");
 		
-		Record rec1 = new Record();
-		Record rec2 = new Record();
+		Tuple2<Integer, String> rec1 = new Tuple2<>();
+		Tuple2<Integer, String> rec2 = new Tuple2<>();
 		
 		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
-		countTable.put(new TestData.Key(rec1.getField(0, TestData.Key.class).getKey()), countTable.get(rec1.getField(0, TestData.Key.class)) - (Integer.parseInt(rec1.getField(1, TestData.Value.class).toString())));
+		countTable.put(rec1.f0, countTable.get(rec1.f0) - (Integer.parseInt(rec1.f1)));
 
 		while ((rec2 = iterator.next(rec2)) != null) {
-			final Key k1 = rec1.getField(0, TestData.Key.class);
-			final Key k2 = rec2.getField(0, TestData.Key.class);
+			int k1 = rec1.f0;
+			int k2 = rec2.f0;
 			
 			Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
-			countTable.put(new TestData.Key(k2.getKey()), countTable.get(k2) - (Integer.parseInt(rec2.getField(1, TestData.Value.class).toString())));
+			countTable.put(k2, countTable.get(k2) - (Integer.parseInt(rec2.f1)));
 			
-			Record tmp = rec1;
 			rec1 = rec2;
-			k1.setKey(k2.getKey());
-			rec2 = tmp;
 		}
 
 		for (Integer cnt : countTable.values()) {
@@ -260,10 +252,10 @@ public class CombiningUnilateralSortMergerITCase {
 
 	// --------------------------------------------------------------------------------------------
 	
-	public static class TestCountCombiner extends RichGroupReduceFunction<Record, Record> {
+	public static class TestCountCombiner extends RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
 		private static final long serialVersionUID = 1L;
 		
-		private final IntValue count = new IntValue();
+		private Integer count = 0;
 		
 		public volatile boolean opened = false;
 		
@@ -271,21 +263,21 @@ public class CombiningUnilateralSortMergerITCase {
 		
 		
 		@Override
-		public void combine(Iterable<Record> values, Collector<Record> out) {
-			Record rec = null;
+		public void combine(Iterable<Tuple2<Integer, Integer>> values, Collector<Tuple2<Integer, Integer>> out) {
+			Tuple2<Integer, Integer> rec = new Tuple2<>();
 			int cnt = 0;
-			for (Record next : values) {
+			for (Tuple2<Integer, Integer> next : values) {
 				rec = next;
-				cnt += rec.getField(1, IntValue.class).getValue();
+				cnt += rec.f1;
 			}
 			
-			this.count.setValue(cnt);
-			rec.setField(1, this.count);
+			this.count = cnt;
+			rec.setField(this.count, 1);
 			out.collect(rec);
 		}
 
 		@Override
-		public void reduce(Iterable<Record> values, Collector<Record> out) {}
+		public void reduce(Iterable<Tuple2<Integer, Integer>> values, Collector<Tuple2<Integer, Integer>> out) {}
 		
 		@Override
 		public void open(Configuration parameters) throws Exception {
@@ -298,7 +290,7 @@ public class CombiningUnilateralSortMergerITCase {
 		}
 	}
 
-	public static class TestCountCombiner2 extends RichGroupReduceFunction<Record, Record> {
+	public static class TestCountCombiner2 extends RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
 		private static final long serialVersionUID = 1L;
 		
 		public volatile boolean opened = false;
@@ -306,19 +298,19 @@ public class CombiningUnilateralSortMergerITCase {
 		public volatile boolean closed = false;
 		
 		@Override
-		public void combine(Iterable<Record> values, Collector<Record> out) {
-			Record rec = null;
+		public void combine(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) {
+			Tuple2<Integer, String> rec = new Tuple2<>();
 			int cnt = 0;
-			for (Record next : values) {
+			for (Tuple2<Integer, String> next : values) {
 				rec = next;
-				cnt += Integer.parseInt(rec.getField(1, TestData.Value.class).toString());
+				cnt += Integer.parseInt(rec.f1);
 			}
 
-			out.collect(new Record(rec.getField(0, Key.class), new TestData.Value(cnt + "")));
+			out.collect(new Tuple2(rec.f0, cnt + ""));
 		}
 
 		@Override
-		public void reduce(Iterable<Record> values, Collector<Record> out) {
+		public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) {
 			// yo, nothing, mon
 		}
 		
@@ -333,9 +325,9 @@ public class CombiningUnilateralSortMergerITCase {
 		}
 	}
 	
-	private static Iterator<Integer> getReducingIterator(MutableObjectIterator<Record> data, TypeSerializer<Record> serializer, TypeComparator<Record> comparator) {
+	private static Iterator<Integer> getReducingIterator(MutableObjectIterator<Tuple2<Integer, Integer>> data, TypeSerializer<Tuple2<Integer, Integer>> serializer, TypeComparator<Tuple2<Integer, Integer>>  comparator) {
 		
-		final ReusingKeyGroupedIterator<Record> groupIter = new ReusingKeyGroupedIterator<Record>(data, serializer, comparator);
+		final ReusingKeyGroupedIterator<Tuple2<Integer, Integer>>  groupIter = new ReusingKeyGroupedIterator<> (data, serializer, comparator);
 		
 		return new Iterator<Integer>() {
 			
@@ -360,13 +352,13 @@ public class CombiningUnilateralSortMergerITCase {
 				if (hasNext()) {
 					hasNext = false;
 					
-					Iterator<Record> values = groupIter.getValues();
+					Iterator<Tuple2<Integer, Integer>> values = groupIter.getValues();
 					
-					Record rec = null;
+					Tuple2<Integer, Integer> rec;
 					int cnt = 0;
 					while (values.hasNext()) {
 						rec = values.next();
-						cnt += rec.getField(1, IntValue.class).getValue();
+						cnt += rec.f1;
 					}
 					
 					return cnt;

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index 9f0b3d9..b19591b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import java.util.Comparator;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -31,14 +30,10 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.RandomIntPairGenerator;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.runtime.operators.testutils.types.IntPair;
-import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
 import org.junit.Assert;
@@ -58,7 +53,7 @@ public class ExternalSortITCase {
 
 	private static final int VALUE_LENGTH = 114;
 	
-	private static final Value VAL = new Value("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+	private static final String VAL = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
 
 	private static final int NUM_PAIRS = 200000;
 
@@ -70,9 +65,9 @@ public class ExternalSortITCase {
 
 	private MemoryManager memoryManager;
 	
-	private TypeSerializerFactory<Record> pactRecordSerializer;
+	private TypeSerializerFactory<Tuple2<Integer, String>> pactRecordSerializer;
 	
-	private TypeComparator<Record> pactRecordComparator;
+	private TypeComparator<Tuple2<Integer, String>> pactRecordComparator;
 	
 	private boolean testSuccess;
 
@@ -84,8 +79,8 @@ public class ExternalSortITCase {
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManagerAsync();
 		
-		this.pactRecordSerializer = RecordSerializerFactory.get();
-		this.pactRecordComparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+		this.pactRecordSerializer = TestData.getIntStringTupleSerializerFactory();
+		this.pactRecordComparator = TestData.getIntStringTupleComparator();
 	}
 
 	@After
@@ -109,15 +104,15 @@ public class ExternalSortITCase {
 	public void testInMemorySort() {
 		try {
 			// comparator
-			final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+			final TypeComparator<Integer> keyComparator = new IntComparator(true);
 			
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS);
+			final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
+			final MutableObjectIterator<Tuple2<Integer, String>> source = new TestData.TupleGeneratorIterator(generator, NUM_PAIRS);
 	
 			// merge iterator
 			LOG.debug("Initializing sortmerger...");
 			
-			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
+			Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
 				source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
 					(double)64/78, 2, 0.9f, true);
 	
@@ -125,26 +120,22 @@ public class ExternalSortITCase {
 			LOG.debug("Reading and sorting data...");
 	
 			// check order
-			MutableObjectIterator<Record> iterator = merger.getIterator();
+			MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
 			
 			LOG.debug("Checking results...");
 			int pairsEmitted = 1;
 	
-			Record rec1 = new Record();
-			Record rec2 = new Record();
+			Tuple2<Integer, String> rec1 = new Tuple2<>();
+			Tuple2<Integer, String> rec2 = new Tuple2<>();
 			
 			Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
 			while ((rec2 = iterator.next(rec2)) != null) {
-				final Key k1 = rec1.getField(0, TestData.Key.class);
-				final Key k2 = rec2.getField(0, TestData.Key.class);
 				pairsEmitted++;
 				
-				Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
-				
-				Record tmp = rec1;
+				Assert.assertTrue(keyComparator.compare(rec1.f0, rec2.f0) <= 0);
+
+				Tuple2<Integer, String> tmp = rec1;
 				rec1 = rec2;
-				k1.setKey(k2.getKey());
-				
 				rec2 = tmp;
 			}
 			Assert.assertTrue(NUM_PAIRS == pairsEmitted);
@@ -162,15 +153,15 @@ public class ExternalSortITCase {
 	public void testInMemorySortUsing10Buffers() {
 		try {
 			// comparator
-			final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+			final TypeComparator<Integer> keyComparator = new IntComparator(true);
 			
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS);
+			final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
+			final MutableObjectIterator<Tuple2<Integer, String>> source = new TestData.TupleGeneratorIterator(generator, NUM_PAIRS);
 	
 			// merge iterator
 			LOG.debug("Initializing sortmerger...");
 			
-			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
+			Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
 					source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
 					(double)64/78, 10, 2, 0.9f, false);
 	
@@ -178,26 +169,22 @@ public class ExternalSortITCase {
 			LOG.debug("Reading and sorting data...");
 	
 			// check order
-			MutableObjectIterator<Record> iterator = merger.getIterator();
+			MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
 			
 			LOG.debug("Checking results...");
 			int pairsEmitted = 1;
 	
-			Record rec1 = new Record();
-			Record rec2 = new Record();
+			Tuple2<Integer, String> rec1 = new Tuple2<>();
+			Tuple2<Integer, String> rec2 = new Tuple2<>();
 			
 			Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
 			while ((rec2 = iterator.next(rec2)) != null) {
-				final Key k1 = rec1.getField(0, TestData.Key.class);
-				final Key k2 = rec2.getField(0, TestData.Key.class);
 				pairsEmitted++;
 				
-				Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
-				
-				Record tmp = rec1;
+				Assert.assertTrue(keyComparator.compare(rec1.f0, rec2.f0) <= 0);
+
+				Tuple2<Integer, String> tmp = rec1;
 				rec1 = rec2;
-				k1.setKey(k2.getKey());
-				
 				rec2 = tmp;
 			}
 			Assert.assertTrue(NUM_PAIRS == pairsEmitted);
@@ -215,15 +202,15 @@ public class ExternalSortITCase {
 	public void testSpillingSort() {
 		try {
 			// comparator
-			final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+			final TypeComparator<Integer> keyComparator = new IntComparator(true);
 			
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS);
+			final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
+			final MutableObjectIterator<Tuple2<Integer, String>> source = new TestData.TupleGeneratorIterator(generator, NUM_PAIRS);
 	
 			// merge iterator
 			LOG.debug("Initializing sortmerger...");
 			
-			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
+			Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
 					source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
 					(double)16/78, 64, 0.7f, true);
 	
@@ -231,26 +218,22 @@ public class ExternalSortITCase {
 			LOG.debug("Reading and sorting data...");
 	
 			// check order
-			MutableObjectIterator<Record> iterator = merger.getIterator();
+			MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
 			
 			LOG.debug("Checking results...");
 			int pairsEmitted = 1;
 	
-			Record rec1 = new Record();
-			Record rec2 = new Record();
+			Tuple2<Integer, String> rec1 = new Tuple2<>();
+			Tuple2<Integer, String> rec2 = new Tuple2<>();
 			
 			Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
 			while ((rec2 = iterator.next(rec2)) != null) {
-				final Key k1 = rec1.getField(0, TestData.Key.class);
-				final Key k2 = rec2.getField(0, TestData.Key.class);
 				pairsEmitted++;
 				
-				Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
-				
-				Record tmp = rec1;
+				Assert.assertTrue(keyComparator.compare(rec1.f0, rec2.f0) <= 0);
+
+				Tuple2<Integer, String> tmp = rec1;
 				rec1 = rec2;
-				k1.setKey(k2.getKey());
-				
 				rec2 = tmp;
 			}
 			Assert.assertTrue(NUM_PAIRS == pairsEmitted);
@@ -271,15 +254,15 @@ public class ExternalSortITCase {
 			final int PAIRS = 10000000;
 	
 			// comparator
-			final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+			final TypeComparator<Integer> keyComparator = new IntComparator(true);
 	
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, PAIRS);
+			final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+			final MutableObjectIterator<Tuple2<Integer, String>> source = new TestData.TupleGeneratorIterator(generator, PAIRS);
 			
 			// merge iterator
 			LOG.debug("Initializing sortmerger...");
 			
-			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
+			Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
 					source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
 					(double)64/78, 16, 0.7f, false);
 			
@@ -287,26 +270,23 @@ public class ExternalSortITCase {
 			LOG.debug("Emitting data...");
 	
 			// check order
-			MutableObjectIterator<Record> iterator = merger.getIterator();
+			MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
 			
 			LOG.debug("Checking results...");
 			int pairsRead = 1;
 			int nextStep = PAIRS / 20;
 	
-			Record rec1 = new Record();
-			Record rec2 = new Record();
+			Tuple2<Integer, String> rec1 = new Tuple2<>();
+			Tuple2<Integer, String> rec2 = new Tuple2<>();
 			
 			Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
 			while ((rec2 = iterator.next(rec2)) != null) {
-				final Key k1 = rec1.getField(0, TestData.Key.class);
-				final Key k2 = rec2.getField(0, TestData.Key.class);
 				pairsRead++;
 				
-				Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
-				
-				Record tmp = rec1;
+				Assert.assertTrue(keyComparator.compare(rec1.f0, rec2.f0) <= 0);
+
+				Tuple2<Integer, String> tmp = rec1;
 				rec1 = rec2;
-				k1.setKey(k2.getKey());
 				rec2 = tmp;
 				
 				// log
@@ -335,7 +315,7 @@ public class ExternalSortITCase {
 			final RandomIntPairGenerator generator = new RandomIntPairGenerator(12345678, PAIRS);
 			
 			final TypeSerializerFactory<IntPair> serializerFactory = new IntPairSerializer.IntPairSerializerFactory();
-			final TypeComparator<IntPair> comparator = new IntPairComparator();
+			final TypeComparator<IntPair> comparator = new TestData.IntPairComparator();
 			
 			// merge iterator
 			LOG.debug("Initializing sortmerger...");

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
index 07330ee..3329335 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
@@ -19,16 +19,12 @@
 package org.apache.flink.runtime.operators.sort;
 
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.runtime.operators.sort.MergeIterator;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Assert;
 import org.junit.Before;
@@ -36,33 +32,33 @@ import org.junit.Test;
 
 public class MergeIteratorTest {
 	
-	private TypeComparator<Record> comparator;
+	private TypeComparator<Tuple2<Integer, String>> comparator;
 	
 	
 	@SuppressWarnings("unchecked")
 	@Before
 	public void setup() {
-		this.comparator = new RecordComparator(new int[] {0}, new Class[] { TestData.Key.class});
+		this.comparator = TestData.getIntStringTupleComparator();
 	}
 	
 	
-	private MutableObjectIterator<Record> newIterator(final int[] keys, final String[] values) {
+	private MutableObjectIterator<Tuple2<Integer, String>> newIterator(final int[] keys, final String[] values) {
 		
-		return new MutableObjectIterator<Record>() {
+		return new MutableObjectIterator<Tuple2<Integer, String>>() {
 			
-			private Key key = new Key();
-			private Value value = new Value();
+			private int key = 0;
+			private String value = new String();
 			
 			private int current = 0;
 
 			@Override
-			public Record next(Record reuse) {
+			public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) {
 				if (current < keys.length) {
-					key.setKey(keys[current]);
-					value.setValue(values[current]);
+					key = keys[current];
+					value = values[current];
 					current++;
-					reuse.setField(0, key);
-					reuse.setField(1, value);
+					reuse.setField(key, 0);
+					reuse.setField(value, 1);
 					return reuse;
 				}
 				else {
@@ -71,9 +67,9 @@ public class MergeIteratorTest {
 			}
 
 			@Override
-			public Record next() {
+			public Tuple2<Integer, String> next() {
 				if (current < keys.length) {
-					Record result = new Record(new Key(keys[current]), new Value(values[current]));
+					Tuple2<Integer, String> result = new Tuple2<>(keys[current], values[current]);
 					current++;
 					return result;
 				}
@@ -88,37 +84,37 @@ public class MergeIteratorTest {
 	public void testMergeOfTwoStreams() throws Exception
 	{
 		// iterators
-		List<MutableObjectIterator<Record>> iterators = new ArrayList<MutableObjectIterator<Record>>();
+		List<MutableObjectIterator<Tuple2<Integer, String>>> iterators = new ArrayList<>();
 		iterators.add(newIterator(new int[] { 1, 2, 4, 5, 10 }, new String[] { "1", "2", "4", "5", "10" }));
 		iterators.add(newIterator(new int[] { 3, 6, 7, 10, 12 }, new String[] { "3", "6", "7", "10", "12" }));
 		
 		final int[] expected = new int[] {1, 2, 3, 4, 5, 6, 7, 10, 10, 12};
 
 		// comparator
-		Comparator<TestData.Key> comparator = new TestData.KeyComparator();
+		TypeComparator<Integer> comparator = new IntComparator(true);
 
 		// merge iterator
-		MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.comparator);
+		MutableObjectIterator<Tuple2<Integer, String>> iterator = new MergeIterator<>(iterators, this.comparator);
 
 		// check expected order
-		Record rec1 = new Record();
-		Record rec2 = new Record();
-		final Key k1 = new Key();
-		final Key k2 = new Key();
+		Tuple2<Integer, String> rec1 = new Tuple2<>();
+		Tuple2<Integer, String> rec2 = new Tuple2<>();
+		int k1 = 0;
+		int k2 = 0;
 		
 		int pos = 1;
 		
 		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
-		Assert.assertEquals(expected[0], rec1.getField(0, TestData.Key.class).getKey());
+		Assert.assertEquals(expected[0], rec1.f0.intValue());
 		
 		while ((rec2 = iterator.next(rec2)) != null) {
-			k1.setKey(rec1.getField(0, TestData.Key.class).getKey());
-			k2.setKey(rec2.getField(0, TestData.Key.class).getKey());
+			k1 = rec1.f0;
+			k2 = rec2.f0;
 			
 			Assert.assertTrue(comparator.compare(k1, k2) <= 0);
-			Assert.assertEquals(expected[pos++], k2.getKey()); 
+			Assert.assertEquals(expected[pos++], k2); 
 			
-			Record tmp = rec1;
+			Tuple2<Integer, String> tmp = rec1;
 			rec1 = rec2;
 			rec2 = tmp;
 		}
@@ -128,7 +124,7 @@ public class MergeIteratorTest {
 	public void testMergeOfTenStreams() throws Exception
 	{
 		// iterators
-		List<MutableObjectIterator<Record>> iterators = new ArrayList<MutableObjectIterator<Record>>();
+		List<MutableObjectIterator<Tuple2<Integer, String>>> iterators = new ArrayList<>();
 		iterators.add(newIterator(new int[] { 1, 2, 17, 23, 23 }, new String[] { "A", "B", "C", "D", "E" }));
 		iterators.add(newIterator(new int[] { 2, 6, 7, 8, 9 }, new String[] { "A", "B", "C", "D", "E" }));
 		iterators.add(newIterator(new int[] { 4, 10, 11, 11, 12 }, new String[] { "A", "B", "C", "D", "E" }));
@@ -141,26 +137,23 @@ public class MergeIteratorTest {
 		iterators.add(newIterator(new int[] { 8, 8, 14, 14, 15 }, new String[] { "A", "B", "C", "D", "E" }));
 
 		// comparator
-		Comparator<TestData.Key> comparator = new TestData.KeyComparator();
+		TypeComparator<Integer> comparator = new IntComparator(true);
 
 		// merge iterator
-		MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.comparator);
+		MutableObjectIterator<Tuple2<Integer, String>> iterator = new MergeIterator<>(iterators, this.comparator);
 
 		int elementsFound = 1;
 		// check expected order
-		Record rec1 = new Record();
-		Record rec2 = new Record();
-		final Key k1 = new Key();
-		final Key k2 = new Key();
+		Tuple2<Integer, String> rec1 = new Tuple2<>();
+		Tuple2<Integer, String> rec2 = new Tuple2<>();
 		
 		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
 		while ((rec2 = iterator.next(rec2)) != null) {
 			elementsFound++;
-			k1.setKey(rec1.getField(0, TestData.Key.class).getKey());
-			k2.setKey(rec2.getField(0, TestData.Key.class).getKey());
-			Assert.assertTrue(comparator.compare(k1, k2) <= 0);
+
+			Assert.assertTrue(comparator.compare(rec1.f0, rec2.f0) <= 0);
 			
-			Record tmp = rec1;
+			Tuple2<Integer, String> tmp = rec1;
 			rec1 = rec2;
 			rec2 = tmp;
 		}
@@ -172,7 +165,7 @@ public class MergeIteratorTest {
 	public void testInvalidMerge() throws Exception
 	{
 		// iterators
-		List<MutableObjectIterator<Record>> iterators = new ArrayList<MutableObjectIterator<Record>>();
+		List<MutableObjectIterator<Tuple2<Integer, String>>> iterators = new ArrayList<>();
 		iterators.add(newIterator(new int[] { 1, 2, 17, 23, 23 }, new String[] { "A", "B", "C", "D", "E" }));
 		iterators.add(newIterator(new int[] { 2, 6, 7, 8, 9 }, new String[] { "A", "B", "C", "D", "E" }));
 		iterators.add(newIterator(new int[] { 4, 10, 11, 11, 12 }, new String[] { "A", "B", "C", "D", "E" }));
@@ -185,31 +178,26 @@ public class MergeIteratorTest {
 		iterators.add(newIterator(new int[] { 8, 8, 14, 14, 15 }, new String[] { "A", "B", "C", "D", "E" }));
 
 		// comparator
-		Comparator<TestData.Key> comparator = new TestData.KeyComparator();
+		TypeComparator<Integer> comparator = new IntComparator(true);
 
 		// merge iterator
-		MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.comparator);
+		MutableObjectIterator<Tuple2<Integer, String>> iterator = new MergeIterator<>(iterators, this.comparator);
 
 		boolean violationFound = false;
 		
 		// check expected order
-		Record rec1 = new Record();
-		Record rec2 = new Record();
+		Tuple2<Integer, String> rec1 = new Tuple2<>();
+		Tuple2<Integer, String> rec2 = new Tuple2<>();
 		
 		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
 		while ((rec2 = iterator.next(rec2)) != null)
-		{
-			final Key k1 = new Key();
-			final Key k2 = new Key();
-			k1.setKey(rec1.getField(0, TestData.Key.class).getKey());
-			k2.setKey(rec2.getField(0, TestData.Key.class).getKey());
-			
-			if (comparator.compare(k1, k2) > 0) {
+		{			
+			if (comparator.compare(rec1.f0, rec2.f0) > 0) {
 				violationFound = true;
 				break;
 			}
 			
-			Record tmp = rec1;
+			Tuple2<Integer, String> tmp = rec1;
 			rec1 = rec2;
 			rec2 = tmp;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
index 1a6884e..dc517b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
@@ -22,14 +22,10 @@ package org.apache.flink.runtime.operators.sort;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Assert;
 import org.junit.Before;
@@ -43,6 +39,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
 
 /**
  */
@@ -59,77 +57,77 @@ public class NonReusingSortMergeCoGroupIteratorITCase
 	private static final long SEED2 = 231434613412342L;
 
 	// left and right input data generators
-	private Generator generator1;
+	private TupleGenerator generator1;
 
-	private Generator generator2;
+	private TupleGenerator generator2;
 
-	// left and right input RecordReader mocks
-	private MutableObjectIterator<Record> reader1;
+	// left and right input TupleReader mocks
+	private MutableObjectIterator<Tuple2<Integer, String>> reader1;
 
-	private MutableObjectIterator<Record> reader2;
+	private MutableObjectIterator<Tuple2<Integer, String>> reader2;
 	
 	
-	private TypeSerializer<Record> serializer1;
-	private TypeSerializer<Record> serializer2;
-	private TypeComparator<Record> comparator1;
-	private TypeComparator<Record> comparator2;
-	private TypePairComparator<Record, Record> pairComparator;
+	private TypeSerializer<Tuple2<Integer, String>> serializer1;
+	private TypeSerializer<Tuple2<Integer, String>> serializer2;
+	private TypeComparator<Tuple2<Integer, String>> comparator1;
+	private TypeComparator<Tuple2<Integer, String>> comparator2;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator;
 
 
 	@SuppressWarnings("unchecked")
 	@Before
 	public void beforeTest() {
-		this.serializer1 = RecordSerializer.get();
-		this.serializer2 = RecordSerializer.get();
-		this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
+		this.serializer1 = TestData.getIntStringTupleSerializer();
+		this.serializer2 = TestData.getIntStringTupleSerializer();
+		this.comparator1 = TestData.getIntStringTupleComparator();
+		this.comparator2 = TestData.getIntStringTupleComparator();
+		this.pairComparator = new GenericPairComparator(this.comparator1, this.comparator2);
 	}
 	
 	@Test
 	public void testMerge() {
 		try {
 			
-			generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-			generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
 
-			reader1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			reader2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			reader1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			reader2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 
 			// collect expected data
-			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap1 = collectData(generator1, INPUT_1_SIZE);
-			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap2 = collectData(generator2, INPUT_2_SIZE);
-			Map<TestData.Key, List<Collection<TestData.Value>>> expectedCoGroupsMap = coGroupValues(expectedValuesMap1, expectedValuesMap2);
+			Map<Integer, Collection<String>> expectedValuesMap1 = collectData(generator1, INPUT_1_SIZE);
+			Map<Integer, Collection<String>> expectedValuesMap2 = collectData(generator2, INPUT_2_SIZE);
+			Map<Integer, List<Collection<String>>> expectedCoGroupsMap = coGroupValues(expectedValuesMap1, expectedValuesMap2);
 	
 			// reset the generators
 			generator1.reset();
 			generator2.reset();
 	
 			// compare with iterator values
-			NonReusingSortMergeCoGroupIterator<Record, Record> iterator =	new NonReusingSortMergeCoGroupIterator<Record, Record>(
+			NonReusingSortMergeCoGroupIterator<Tuple2<Integer, String>,Tuple2<Integer, String>> iterator =	new NonReusingSortMergeCoGroupIterator<>(
 					this.reader1, this.reader2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
 					this.pairComparator);
 	
 			iterator.open();
 			
-			final TestData.Key key = new TestData.Key();
+			int key = 0;
 			while (iterator.next())
 			{
-				Iterator<Record> iter1 = iterator.getValues1().iterator();
-				Iterator<Record> iter2 = iterator.getValues2().iterator();
+				Iterator<Tuple2<Integer, String>> iter1 = iterator.getValues1().iterator();
+				Iterator<Tuple2<Integer, String>> iter2 = iterator.getValues2().iterator();
 				
-				TestData.Value v1 = null;
-				TestData.Value v2 = null;
+				String v1 = null;
+				String v2 = null;
 				
 				if (iter1.hasNext()) {
-					Record rec = iter1.next();
-					rec.getFieldInto(0, key);
-					v1 = rec.getField(1, TestData.Value.class);
+					Tuple2<Integer, String> rec = iter1.next();
+					key = rec.f0;
+					v1 = rec.f1;
 				}
 				else if (iter2.hasNext()) {
-					Record rec = iter2.next();
-					rec.getFieldInto(0, key);
-					v2 = rec.getField(1, TestData.Value.class);
+					Tuple2<Integer, String> rec = iter2.next();
+					key = rec.f0;
+					v2 = rec.f1;
 				}
 				else {
 					Assert.fail("No input on both sides.");
@@ -138,8 +136,8 @@ public class NonReusingSortMergeCoGroupIteratorITCase
 				// assert that matches for this key exist
 				Assert.assertTrue("No matches for key " + key, expectedCoGroupsMap.containsKey(key));
 				
-				Collection<TestData.Value> expValues1 = expectedCoGroupsMap.get(key).get(0);
-				Collection<TestData.Value> expValues2 = expectedCoGroupsMap.get(key).get(1);
+				Collection<String> expValues1 = expectedCoGroupsMap.get(key).get(0);
+				Collection<String> expValues2 = expectedCoGroupsMap.get(key).get(1);
 				
 				if (v1 != null) {
 					expValues1.remove(v1);
@@ -149,14 +147,14 @@ public class NonReusingSortMergeCoGroupIteratorITCase
 				}
 				
 				while(iter1.hasNext()) {
-					Record rec = iter1.next();
-					Assert.assertTrue("Value not in expected set of first input", expValues1.remove(rec.getField(1, TestData.Value.class)));
+					Tuple2<Integer, String> rec = iter1.next();
+					Assert.assertTrue("Value not in expected set of first input", expValues1.remove(rec.f1));
 				}
 				Assert.assertTrue("Expected set of first input not empty", expValues1.isEmpty());
 				
 				while(iter2.hasNext()) {
-					Record rec = iter2.next();
-					Assert.assertTrue("Value not in expected set of second input", expValues2.remove(rec.getField(1, TestData.Value.class)));
+					Tuple2<Integer, String> rec = iter2.next();
+					Assert.assertTrue("Value not in expected set of second input", expValues2.remove(rec.f1));
 				}
 				Assert.assertTrue("Expected set of second input not empty", expValues2.isEmpty());
 	
@@ -174,28 +172,28 @@ public class NonReusingSortMergeCoGroupIteratorITCase
 
 	// --------------------------------------------------------------------------------------------
 	
-	private Map<TestData.Key, List<Collection<TestData.Value>>> coGroupValues(
-			Map<TestData.Key, Collection<TestData.Value>> leftMap,
-			Map<TestData.Key, Collection<TestData.Value>> rightMap)
+	private Map<Integer, List<Collection<String>>> coGroupValues(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
 	{
-		Map<TestData.Key, List<Collection<TestData.Value>>> map = new HashMap<TestData.Key, List<Collection<TestData.Value>>>(1000);
+		Map<Integer, List<Collection<String>>> map = new HashMap<>(1000);
 
-		Set<TestData.Key> keySet = new HashSet<TestData.Key>(leftMap.keySet());
+		Set<Integer> keySet = new HashSet<>(leftMap.keySet());
 		keySet.addAll(rightMap.keySet());
 		
-		for (TestData.Key key : keySet) {
-			Collection<TestData.Value> leftValues = leftMap.get(key);
-			Collection<TestData.Value> rightValues = rightMap.get(key);
-			ArrayList<Collection<TestData.Value>> list = new ArrayList<Collection<TestData.Value>>(2);
+		for (Integer key : keySet) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
+			ArrayList<Collection<String>> list = new ArrayList<>(2);
 			
 			if (leftValues == null) {
-				list.add(new ArrayList<TestData.Value>(0));
+				list.add(new ArrayList<String>(0));
 			} else {
 				list.add(leftValues);
 			}
 			
 			if (rightValues == null) {
-				list.add(new ArrayList<TestData.Value>(0));
+				list.add(new ArrayList<String>(0));
 			} else {
 				list.add(rightValues);
 			}
@@ -205,22 +203,22 @@ public class NonReusingSortMergeCoGroupIteratorITCase
 		return map;
 	}
 
-	private Map<TestData.Key, Collection<TestData.Value>> collectData(Generator iter, int num)
+	private Map<Integer, Collection<String>> collectData(TupleGenerator iter, int num)
 	throws Exception
 	{
-		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
-		Record pair = new Record();
+		Map<Integer, Collection<String>> map = new HashMap<>();
+		Tuple2<Integer, String> pair = new Tuple2<>();
 		
 		for (int i = 0; i < num; i++) {
 			iter.next(pair);
-			TestData.Key key = pair.getField(0, TestData.Key.class);
+			Integer key = pair.f0;
 			
 			if (!map.containsKey(key)) {
-				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+				map.put(key, new ArrayList<String>());
 			}
 
-			Collection<TestData.Value> values = map.get(key);
-			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+			Collection<String> values = map.get(key);
+			values.add(pair.f1);
 		}
 		return map;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
index f8a8f11..45f20ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
@@ -21,19 +21,16 @@ package org.apache.flink.runtime.operators.sort;
 
 import java.util.List;
 import java.util.Random;
+import org.apache.flink.api.common.typeutils.TypeComparator;
 
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.util.MutableObjectIterator;
 
 import org.junit.After;
@@ -76,10 +73,9 @@ public class NormalizedKeySorterTest {
 		}
 	}
 
-	private NormalizedKeySorter<Record> newSortBuffer(List<MemorySegment> memory) throws Exception {
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		RecordComparator accessors = new RecordComparator(new int[] {0}, new Class[]{ Key.class });
-		return new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
+	private NormalizedKeySorter<Tuple2<Integer, String>> newSortBuffer(List<MemorySegment> memory) throws Exception
+	{
+		return new NormalizedKeySorter<>(TestData.getIntStringTupleSerializer(), TestData.getIntStringTupleComparator(), memory);
 	}
 
 	@Test
@@ -87,12 +83,12 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
 			ValueMode.RANDOM_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		int num = -1;
 		do {
 			generator.next(record);
@@ -102,18 +98,18 @@ public class NormalizedKeySorterTest {
 		
 		// re-read the records
 		generator.reset();
-		Record readTarget = new Record();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
 		
 		int i = 0;
 		while (i < num) {
 			generator.next(record);
 			readTarget = sorter.getRecord(readTarget, i++);
 			
-			Key rk = readTarget.getField(0, Key.class);
-			Key gk = record.getField(0, Key.class);
+			int rk = readTarget.f0;
+			int gk = record.f0;
 			
-			Value rv = readTarget.getField(1, Value.class);
-			Value gv = record.getField(1, Value.class);
+			String rv = readTarget.f1;
+			String gv = record.f1;
 			
 			Assert.assertEquals("The re-read key is wrong", gk, rk);
 			Assert.assertEquals("The re-read value is wrong", gv, rv);
@@ -129,12 +125,12 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
 			ValueMode.RANDOM_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		do {
 			generator.next(record);
 		}
@@ -142,17 +138,17 @@ public class NormalizedKeySorterTest {
 		
 		// re-read the records
 		generator.reset();
-		MutableObjectIterator<Record> iter = sorter.getIterator();
-		Record readTarget = new Record();
+		MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
 		
 		while ((readTarget = iter.next(readTarget)) != null) {
 			generator.next(record);
 			
-			Key rk = readTarget.getField(0, Key.class);
-			Key gk = record.getField(0, Key.class);
+			int rk = readTarget.f0;
+			int gk = record.f0;
 			
-			Value rv = readTarget.getField(1, Value.class);
-			Value gv = record.getField(1, Value.class);
+			String rv = readTarget.f1;
+			String gv = record.f1;
 			
 			Assert.assertEquals("The re-read key is wrong", gk, rk);
 			Assert.assertEquals("The re-read value is wrong", gv, rv);
@@ -168,11 +164,11 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
 		
 		// write the buffer full with the first set of records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		int num = -1;
 		do {
 			generator.next(record);
@@ -183,7 +179,7 @@ public class NormalizedKeySorterTest {
 		sorter.reset();
 		
 		// write a second sequence of records. since the values are of fixed length, we must be able to write an equal number
-		generator = new TestData.Generator(SEED2, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+		generator = new TestData.TupleGenerator(SEED2, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
 		
 		// write the buffer full with the first set of records
 		int num2 = -1;
@@ -197,18 +193,18 @@ public class NormalizedKeySorterTest {
 		
 		// re-read the records
 		generator.reset();
-		Record readTarget = new Record();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
 		
 		int i = 0;
 		while (i < num) {
 			generator.next(record);
 			readTarget = sorter.getRecord(readTarget, i++);
 			
-			Key rk = readTarget.getField(0, Key.class);
-			Key gk = record.getField(0, Key.class);
+			int rk = readTarget.f0;
+			int gk = record.f0;
 			
-			Value rv = readTarget.getField(1, Value.class);
-			Value gv = record.getField(1, Value.class);
+			String rv = readTarget.f1;
+			String gv = record.f1;
 			
 			Assert.assertEquals("The re-read key is wrong", gk, rk);
 			Assert.assertEquals("The re-read value is wrong", gv, rv);
@@ -229,12 +225,12 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
 			ValueMode.RANDOM_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		int num = -1;
 		do {
 			generator.next(record);
@@ -250,18 +246,18 @@ public class NormalizedKeySorterTest {
 		
 		// re-read the records
 		generator.reset();
-		Record readTarget = new Record();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
 		
 		int i = num - 1;
 		while (i >= 0) {
 			generator.next(record);
 			readTarget = sorter.getRecord(readTarget, i--);
 			
-			Key rk = readTarget.getField(0, Key.class);
-			Key gk = record.getField(0, Key.class);
+			int rk = readTarget.f0;
+			int gk = record.f0;
 			
-			Value rv = readTarget.getField(1, Value.class);
-			Value gv = record.getField(1, Value.class);
+			String rv = readTarget.f1;
+			String gv = record.f1;
 			
 			Assert.assertEquals("The re-read key is wrong", gk, rk);
 			Assert.assertEquals("The re-read value is wrong", gv, rv);
@@ -282,12 +278,12 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.SORTED,
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.SORTED,
 			ValueMode.RANDOM_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		int num = -1;
 		do {
 			generator.next(record);
@@ -323,12 +319,12 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
 			ValueMode.RANDOM_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		int num = 0;
 		do {
 			generator.next(record);
@@ -339,26 +335,21 @@ public class NormalizedKeySorterTest {
 		QuickSort qs = new QuickSort();
 		qs.sort(sorter);
 		
-		MutableObjectIterator<Record> iter = sorter.getIterator();
-		Record readTarget = new Record();
-		
-		Key current = new Key();
-		Key last = new Key();
-		
+		MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
+
 		iter.next(readTarget);
-		readTarget.getFieldInto(0, last);
+		int last = readTarget.f0;
 		
 		while ((readTarget = iter.next(readTarget)) != null) {
-			readTarget.getFieldInto(0, current);
+			int current = readTarget.f0;
 			
-			final int cmp = last.compareTo(current);
+			final int cmp = last - current;
 			if (cmp > 0) {
 				Assert.fail("Next key is not larger or equal to previous key.");
 			}
 			
-			Key tmp = current;
-			current = last;
-			last = tmp;
+			last = current;
 		}
 		
 		// release the memory occupied by the buffers
@@ -370,16 +361,16 @@ public class NormalizedKeySorterTest {
 	public void testSortShortStringKeys() throws Exception {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
-
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		RecordComparator accessors = new RecordComparator(new int[] {1}, new Class[]{Value.class});
-		NormalizedKeySorter<Record> sorter = new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
 		
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, 5, KeyMode.RANDOM,
+		@SuppressWarnings("unchecked")
+		TypeComparator<Tuple2<Integer, String>> accessors = TestData.getIntStringTupleTypeInfo().createComparator(new int[]{1}, new boolean[]{true}, 0, null);
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = new NormalizedKeySorter<>(TestData.getIntStringTupleSerializer(), accessors, memory);
+		
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, 5, KeyMode.RANDOM,
 			ValueMode.FIX_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		do {
 			generator.next(record);
 		}
@@ -388,26 +379,21 @@ public class NormalizedKeySorterTest {
 		QuickSort qs = new QuickSort();
 		qs.sort(sorter);
 		
-		MutableObjectIterator<Record> iter = sorter.getIterator();
-		Record readTarget = new Record();
-		
-		Value current = new Value();
-		Value last = new Value();
-		
+		MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
+	
 		iter.next(readTarget);
-		readTarget.getFieldInto(1, last);
+		String last = readTarget.f1;
 		
 		while ((readTarget = iter.next(readTarget)) != null) {
-			readTarget.getFieldInto(1, current);
+			String current = readTarget.f1;
 			
 			final int cmp = last.compareTo(current);
 			if (cmp > 0) {
 				Assert.fail("Next value is not larger or equal to previous value.");
 			}
 			
-			Value tmp = current;
-			current = last;
-			last = tmp;
+			last = current;
 		}
 		
 		// release the memory occupied by the buffers
@@ -420,15 +406,15 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		RecordComparator accessors = new RecordComparator(new int[] {1}, new Class[]{Value.class});
-		NormalizedKeySorter<Record> sorter = new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
+		@SuppressWarnings("unchecked")
+		TypeComparator<Tuple2<Integer, String>> accessors = TestData.getIntStringTupleTypeInfo().createComparator(new int[]{1}, new boolean[]{true}, 0, null);
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = new NormalizedKeySorter<>(TestData.getIntStringTupleSerializer(), accessors, memory);
 		
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
 			ValueMode.FIX_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		do {
 			generator.next(record);
 		}
@@ -437,26 +423,21 @@ public class NormalizedKeySorterTest {
 		QuickSort qs = new QuickSort();
 		qs.sort(sorter);
 		
-		MutableObjectIterator<Record> iter = sorter.getIterator();
-		Record readTarget = new Record();
-		
-		Value current = new Value();
-		Value last = new Value();
+		MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
 		
 		iter.next(readTarget);
-		readTarget.getFieldInto(1, last);
+		String last = readTarget.f1;
 		
 		while ((readTarget = iter.next(readTarget)) != null) {
-			readTarget.getFieldInto(1, current);
+			String current = readTarget.f1;
 			
 			final int cmp = last.compareTo(current);
 			if (cmp > 0) {
 				Assert.fail("Next value is not larger or equal to previous value.");
 			}
 			
-			Value tmp = current;
-			current = last;
-			last = tmp;
+			last = current;
 		}
 		
 		// release the memory occupied by the buffers

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
index a487a65..d1de5dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
@@ -27,18 +27,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Assert;
 import org.junit.Before;
@@ -59,77 +57,76 @@ public class ReusingSortMergeCoGroupIteratorITCase
 	private static final long SEED2 = 231434613412342L;
 
 	// left and right input data generators
-	private Generator generator1;
+	private TupleGenerator generator1;
 
-	private Generator generator2;
+	private TupleGenerator generator2;
 
-	// left and right input RecordReader mocks
-	private MutableObjectIterator<Record> reader1;
+	// left and right input Tuple2<Integer, String>Reader mocks
+	private MutableObjectIterator<Tuple2<Integer, String>> reader1;
 
-	private MutableObjectIterator<Record> reader2;
+	private MutableObjectIterator<Tuple2<Integer, String>> reader2;
 	
-	
-	private TypeSerializer<Record> serializer1;
-	private TypeSerializer<Record> serializer2;
-	private TypeComparator<Record> comparator1;
-	private TypeComparator<Record> comparator2;
-	private TypePairComparator<Record, Record> pairComparator;
+	private TypeSerializer<Tuple2<Integer, String>> serializer1;
+	private TypeSerializer<Tuple2<Integer, String>> serializer2;
+	private TypeComparator<Tuple2<Integer, String>> comparator1;
+	private TypeComparator<Tuple2<Integer, String>> comparator2;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator;
 
 
 	@SuppressWarnings("unchecked")
 	@Before
 	public void beforeTest() {
-		this.serializer1 = RecordSerializer.get();
-		this.serializer2 = RecordSerializer.get();
-		this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
+		this.serializer1 = TestData.getIntStringTupleSerializer();
+		this.serializer2 = TestData.getIntStringTupleSerializer();
+		this.comparator1 = TestData.getIntStringTupleComparator();
+		this.comparator2 = TestData.getIntStringTupleComparator();
+		this.pairComparator = new GenericPairComparator(comparator1, comparator2);
 	}
 	
 	@Test
 	public void testMerge() {
 		try {
 			
-			generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-			generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
 
-			reader1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			reader2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			reader1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			reader2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 
 			// collect expected data
-			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap1 = collectData(generator1, INPUT_1_SIZE);
-			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap2 = collectData(generator2, INPUT_2_SIZE);
-			Map<TestData.Key, List<Collection<TestData.Value>>> expectedCoGroupsMap = coGroupValues(expectedValuesMap1, expectedValuesMap2);
+			Map<Integer, Collection<String>> expectedStringsMap1 = collectData(generator1, INPUT_1_SIZE);
+			Map<Integer, Collection<String>> expectedStringsMap2 = collectData(generator2, INPUT_2_SIZE);
+			Map<Integer, List<Collection<String>>> expectedCoGroupsMap = coGroupValues(expectedStringsMap1, expectedStringsMap2);
 	
 			// reset the generators
 			generator1.reset();
 			generator2.reset();
 	
 			// compare with iterator values
-			ReusingSortMergeCoGroupIterator<Record, Record> iterator =	new ReusingSortMergeCoGroupIterator<Record, Record>(
+			ReusingSortMergeCoGroupIterator<Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =	new ReusingSortMergeCoGroupIterator<>(
 					this.reader1, this.reader2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
 					this.pairComparator);
 	
 			iterator.open();
 			
-			final TestData.Key key = new TestData.Key();
+			int key = 0;
 			while (iterator.next())
 			{
-				Iterator<Record> iter1 = iterator.getValues1().iterator();
-				Iterator<Record> iter2 = iterator.getValues2().iterator();
+				Iterator<Tuple2<Integer, String>> iter1 = iterator.getValues1().iterator();
+				Iterator<Tuple2<Integer, String>> iter2 = iterator.getValues2().iterator();
 				
-				TestData.Value v1 = null;
-				TestData.Value v2 = null;
+				String v1 = null;
+				String v2 = null;
 				
 				if (iter1.hasNext()) {
-					Record rec = iter1.next();
-					rec.getFieldInto(0, key);
-					v1 = rec.getField(1, TestData.Value.class);
+					Tuple2<Integer, String> rec = iter1.next();
+					key = rec.f0;
+					v1 = rec.f1;
 				}
 				else if (iter2.hasNext()) {
-					Record rec = iter2.next();
-					rec.getFieldInto(0, key);
-					v2 = rec.getField(1, TestData.Value.class);
+					Tuple2<Integer, String> rec = iter2.next();
+					key = rec.f0;
+					v2 = rec.f1;
 				}
 				else {
 					Assert.fail("No input on both sides.");
@@ -138,8 +135,8 @@ public class ReusingSortMergeCoGroupIteratorITCase
 				// assert that matches for this key exist
 				Assert.assertTrue("No matches for key " + key, expectedCoGroupsMap.containsKey(key));
 				
-				Collection<TestData.Value> expValues1 = expectedCoGroupsMap.get(key).get(0);
-				Collection<TestData.Value> expValues2 = expectedCoGroupsMap.get(key).get(1);
+				Collection<String> expValues1 = expectedCoGroupsMap.get(key).get(0);
+				Collection<String> expValues2 = expectedCoGroupsMap.get(key).get(1);
 				
 				if (v1 != null) {
 					expValues1.remove(v1);
@@ -149,14 +146,14 @@ public class ReusingSortMergeCoGroupIteratorITCase
 				}
 				
 				while(iter1.hasNext()) {
-					Record rec = iter1.next();
-					Assert.assertTrue("Value not in expected set of first input", expValues1.remove(rec.getField(1, TestData.Value.class)));
+					Tuple2<Integer, String> rec = iter1.next();
+					Assert.assertTrue("String not in expected set of first input", expValues1.remove(rec.f1));
 				}
 				Assert.assertTrue("Expected set of first input not empty", expValues1.isEmpty());
 				
 				while(iter2.hasNext()) {
-					Record rec = iter2.next();
-					Assert.assertTrue("Value not in expected set of second input", expValues2.remove(rec.getField(1, TestData.Value.class)));
+					Tuple2<Integer, String> rec = iter2.next();
+					Assert.assertTrue("String not in expected set of second input", expValues2.remove(rec.f1));
 				}
 				Assert.assertTrue("Expected set of second input not empty", expValues2.isEmpty());
 	
@@ -174,28 +171,28 @@ public class ReusingSortMergeCoGroupIteratorITCase
 
 	// --------------------------------------------------------------------------------------------
 	
-	private Map<TestData.Key, List<Collection<TestData.Value>>> coGroupValues(
-			Map<TestData.Key, Collection<TestData.Value>> leftMap,
-			Map<TestData.Key, Collection<TestData.Value>> rightMap)
+	private Map<Integer, List<Collection<String>>> coGroupValues(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
 	{
-		Map<TestData.Key, List<Collection<TestData.Value>>> map = new HashMap<TestData.Key, List<Collection<TestData.Value>>>(1000);
+		Map<Integer, List<Collection<String>>> map = new HashMap<>(1000);
 
-		Set<TestData.Key> keySet = new HashSet<TestData.Key>(leftMap.keySet());
+		Set<Integer> keySet = new HashSet<>(leftMap.keySet());
 		keySet.addAll(rightMap.keySet());
 		
-		for (TestData.Key key : keySet) {
-			Collection<TestData.Value> leftValues = leftMap.get(key);
-			Collection<TestData.Value> rightValues = rightMap.get(key);
-			ArrayList<Collection<TestData.Value>> list = new ArrayList<Collection<TestData.Value>>(2);
+		for (Integer key : keySet) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
+			ArrayList<Collection<String>> list = new ArrayList<>(2);
 			
 			if (leftValues == null) {
-				list.add(new ArrayList<TestData.Value>(0));
+				list.add(new ArrayList<String>(0));
 			} else {
 				list.add(leftValues);
 			}
 			
 			if (rightValues == null) {
-				list.add(new ArrayList<TestData.Value>(0));
+				list.add(new ArrayList<String>(0));
 			} else {
 				list.add(rightValues);
 			}
@@ -205,22 +202,22 @@ public class ReusingSortMergeCoGroupIteratorITCase
 		return map;
 	}
 
-	private Map<TestData.Key, Collection<TestData.Value>> collectData(Generator iter, int num)
+	private Map<Integer, Collection<String>> collectData(TupleGenerator iter, int num)
 	throws Exception
 	{
-		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
-		Record pair = new Record();
+		Map<Integer, Collection<String>> map = new HashMap<>();
+		Tuple2<Integer, String> pair = new Tuple2<>();
 		
 		for (int i = 0; i < num; i++) {
 			iter.next(pair);
-			TestData.Key key = pair.getField(0, TestData.Key.class);
+			int key = pair.f0;
 			
 			if (!map.containsKey(key)) {
-				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+				map.put(key, new ArrayList<String>());
 			}
 
-			Collection<TestData.Value> values = map.get(key);
-			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+			Collection<String> values = map.get(key);
+			values.add(pair.f1);
 		}
 		return map;
 	}