You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/15 15:07:20 UTC

[1/6] flink git commit: [FLINK-2843] Add documentation for DataSet outer joins.

Repository: flink
Updated Branches:
  refs/heads/master f7ab4f373 -> c82ebbfce


[FLINK-2843] Add documentation for DataSet outer joins.

This closes #1248


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9e32da2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9e32da2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9e32da2

Branch: refs/heads/master
Commit: d9e32da2631d519d434238b6153332ed03047461
Parents: f7ab4f3
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Oct 9 18:46:31 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Oct 15 11:28:57 2015 +0200

----------------------------------------------------------------------
 docs/apis/dataset_transformations.md | 171 +++++++++++++++++++++++++++++-
 docs/apis/programming_guide.md       |  50 +++++++--
 2 files changed, 209 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9e32da2/docs/apis/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/apis/dataset_transformations.md b/docs/apis/dataset_transformations.md
index 0847190..cc7e742 100644
--- a/docs/apis/dataset_transformations.md
+++ b/docs/apis/dataset_transformations.md
@@ -1105,7 +1105,7 @@ Not supported.
 
 The Join transformation joins two DataSets into one DataSet. The elements of both DataSets are joined on one or more keys which can be specified using
 
-- a kex expression
+- a key expression
 - a key-selector function
 - one or more field position keys (Tuple DataSet only).
 - Case Class Fields
@@ -1152,7 +1152,7 @@ val result = input1.join(input2).where(0).equalTo(1)
 </div>
 </div>
 
-#### Join with Join-Function
+#### Join with Join Function
 
 A Join transformation can also call a user-defined join function to process joining tuples.
 A join function receives one element of the first input DataSet and one element of the second input DataSet and returns exactly one element.
@@ -1380,7 +1380,7 @@ DataSet<SomeType> input1 = // [...]
 DataSet<AnotherType> input2 = // [...]
 
 DataSet<Tuple2<SomeType, AnotherType> result =
-      input1.join(input2, BROADCAST_HASH_FIRST)
+      input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
             .where("id").equalTo("key");
 ~~~
 
@@ -1392,7 +1392,7 @@ val input1: DataSet[SomeType] = // [...]
 val input2: DataSet[AnotherType] = // [...]
 
 // hint that the second DataSet is very small
-val result1 = input1.join(input2, BROADCAST_HASH_FIRST).where("id").equalTo("key")
+val result1 = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")
 
 ~~~
 
@@ -1432,6 +1432,169 @@ The following hints are available:
   already sorted.
 
 
+### OuterJoin
+
+The OuterJoin transformation performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pair of elements (or one element and a `null` value for the other input) are given to a `JoinFunction` to turn the pair of elements into a single element, or to a `FlatJoinFunction` to turn the pair of elements into arbitararily many (including none) elements. 
+
+The elements of both DataSets are joined on one or more keys which can be specified using
+
+- a key expression
+- a key-selector function
+- one or more field position keys (Tuple DataSet only).
+- Case Class Fields
+
+**OuterJoins are only supported for the Java and Scala DataSet API.**
+
+
+#### OuterJoin with Join Function
+
+A OuterJoin transformation calls a user-defined join function to process joining tuples.
+A join function receives one element of the first input DataSet and one element of the second input DataSet and returns exactly one element. Depending on the type of the outer join (left, right, full) one of both input elements of the join function can be `null`.
+
+The following code performs a left outer join of DataSet with custom java objects and a Tuple DataSet using key-selector functions and shows how to use a user-defined join function:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// some POJO
+public class Rating {
+  public String name;
+  public String category;
+  public int points;
+}
+
+// Join function that joins a custom POJO with a Tuple
+public class PointAssigner
+         implements JoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> {
+
+  @Override
+  public Tuple2<String, Integer> join(Tuple2<String, String> movie, Rating rating) {
+    // Assigns the rating points to the movie.
+    // NOTE: rating might be null
+    return new Tuple2<String, Double>(movie.f0, rating == null ? -1 : rating.points;
+  }
+}
+
+DataSet<Tuple2<String, String>> movies = // [...]
+DataSet<Rating> ratings = // [...]
+DataSet<Tuple2<String, Integer>>
+            moviesWithPoints =
+            movies.leftOuterJoin(ratings)
+
+                   // key of the first input
+                   .where("f0")
+
+                   // key of the second input
+                   .equalTo("name")
+
+                   // applying the JoinFunction on joining pairs
+                   .with(new PointAssigner());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+case class Rating(name: String, category: String, points: Int)
+
+val movies: DataSet[(String, String)] = // [...]
+val ratings: DataSet[Ratings] = // [...]
+
+val moviesWithPoints = movies.leftOuterJoin(ratings).where(0).equalTo("name") {
+  (movie, rating) => (movie._1, if (rating == null) -1 else rating.points)
+}
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
+</div>
+
+#### OuterJoin with Flat-Join Function
+
+Analogous to Map and FlatMap, an OuterJoin with flat-join function behaves in the same
+way as an OuterJoin with join function, but instead of returning one element, it can
+return (collect), zero, one, or more elements.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+public class PointAssigner
+         implements FlatJoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> {
+  @Override
+  public void join(Tuple2<String, String> movie, Rating rating
+    Collector<Tuple2<String, Integer>> out) {
+  if (rating == null ) {
+    out.collect(new Tuple2<String, Integer>(movie.f0, -1));
+  } else if (rating.points < 10) {
+    out.collect(new Tuple2<String, Integer>(movie.f0, rating.points));
+  } else {
+    // do not emit
+  }
+}
+
+DataSet<Tuple2<String, Integer>>
+            moviesWithPoints =
+            movies.leftOuterJoin(ratings) // [...]
+~~~
+
+#### Join Algorithm Hints
+
+The Flink runtime can execute outer joins in various ways. Each possible way outperforms the others under
+different circumstances. The system tries to pick a reasonable way automatically, but allows you
+to manually pick a strategy, in case you want to enforce a specific way of executing the outer join.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<SomeType> input1 = // [...]
+DataSet<AnotherType> input2 = // [...]
+
+DataSet<Tuple2<SomeType, AnotherType> result =
+      input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE)
+            .where("id").equalTo("key");
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input1: DataSet[SomeType] = // [...]
+val input2: DataSet[AnotherType] = // [...]
+
+// hint that the second DataSet is very small
+val result1 = input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE).where("id").equalTo("key")
+
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
+</div>
+
+**NOTE:** Right now, outer joins can only be executed using the `REPARTITION_SORT_MERGE` strategy. Further execution strategies will be added in the future.
+
+* OPTIMIZER_CHOOSES: Equivalent to not giving a hint at all, leaves the choice to the system.
+
+* REPARTITION_SORT_MERGE: The system partitions (shuffles) each input (unless the input is already
+  partitioned) and sorts each input (unless it is already sorted). The inputs are joined by
+  a streamed merge of the sorted inputs. This strategy is good if one or both of the inputs are
+  already sorted.
+
+
 ### Cross
 
 The Cross transformation combines two DataSets into one DataSet. It builds all pairwise combinations of the elements of both input DataSets, i.e., it builds a Cartesian product.

http://git-wip-us.apache.org/repos/asf/flink/blob/d9e32da2/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index ad29ac3..7f5e41d 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -630,8 +630,8 @@ DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2);
       <td>
         Joins two data sets by creating all pairs of elements that are equal on their keys.
         Optionally uses a JoinFunction to turn the pair of elements into a single element, or a
-        FlatJoinFunction to turn the pair of elements into arbitararily many (including none)
-        elements. See <a href="#specifying-keys">keys</a> on how to define join keys.
+        FlatJoinFunction to turn the pair of elements into arbitrarily many (including none)
+        elements. See the <a href="#specifying-keys">keys section</a> to learn how to define join keys.
 {% highlight java %}
 result = input1.join(input2)
                .where(0)       // key of the first input (tuple field 0)
@@ -650,7 +650,27 @@ result = input1.join(input2)
 result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
                .where(0).equalTo(1);
 {% endhighlight %}
-        Note that the join transformation works only for equi-joins. Other join types, for example outer-joins need to be expressed using CoGroup.
+        Note that the join transformation works only for equi-joins. Other join types need to be expressed using OuterJoin or CoGroup.
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>OuterJoin</strong></td>
+      <td>
+        Performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pairs of elements (or one element and a `null` value for the other input) are given to a JoinFunction to turn the pair of elements into a single element, or to a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none)         elements. See the <a href="#specifying-keys">keys section</a> to learn how to define join keys.
+{% highlight java %}
+input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins
+      .where(0)              // key of the first input (tuple field 0)
+      .equalTo(1)            // key of the second input (tuple field 1)
+      .with(new JoinFunction<String, String, String>() {
+          public String join(String v1, String v2) {
+             // NOTE: 
+             // - v2 might be null for leftOuterJoin
+             // - v1 might be null for rightOuterJoin
+             // - v1 OR v2 might be null for fullOuterJoin
+          }
+      });
+{% endhighlight %}
       </td>
     </tr>
 
@@ -659,7 +679,7 @@ result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
       <td>
         <p>The two-dimensional variant of the reduce operation. Groups each input on one or more
         fields and then joins the groups. The transformation function is called per pair of groups.
-        See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p>
+        See the <a href="#specifying-keys">keys section</a> to learn how to define coGroup keys.</p>
 {% highlight java %}
 data1.coGroup(data2)
      .where(0)
@@ -906,8 +926,8 @@ val output: DataSet[(Int, String, Doublr)] = input.sum(0).min(2)
       <td>
         Joins two data sets by creating all pairs of elements that are equal on their keys.
         Optionally uses a JoinFunction to turn the pair of elements into a single element, or a
-        FlatJoinFunction to turn the pair of elements into arbitararily many (including none)
-        elements. See <a href="#specifying-keys">keys</a> on how to define join keys.
+        FlatJoinFunction to turn the pair of elements into arbitrarily many (including none)
+        elements. See the <a href="#specifying-keys">keys section</a> to learn how to define join keys.
 {% highlight scala %}
 // In this case tuple fields are used as keys. "0" is the join field on the first tuple
 // "1" is the join field on the second tuple.
@@ -926,7 +946,21 @@ val result = input1.join(input2).where(0).equalTo(1)
 val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
                    .where(0).equalTo(1)
 {% endhighlight %}
-          Note that the join transformation works only for equi-joins. Other join types, for example outer-joins need to be expressed using CoGroup.
+          Note that the join transformation works only for equi-joins. Other join types need to be expressed using OuterJoin or CoGroup.
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>OuterJoin</strong></td>
+      <td>
+        Performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pairs of elements (or one element and a `null` value for the other input) are given to a JoinFunction to turn the pair of elements into a single element, or to a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none)         elements. See the <a href="#specifying-keys">keys section</a> to learn how to define join keys.
+{% highlight scala %}
+val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
+   (left, right) =>
+     val a = if (left == null) "none" else left._1
+     (a, right)
+  }
+{% endhighlight %}
       </td>
     </tr>
 
@@ -935,7 +969,7 @@ val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
       <td>
         <p>The two-dimensional variant of the reduce operation. Groups each input on one or more
         fields and then joins the groups. The transformation function is called per pair of groups.
-        See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p>
+        See the <a href="#specifying-keys">keys section</a> to learn how to define coGroup keys.
 {% highlight scala %}
 data1.coGroup(data2).where(0).equalTo(1)
 {% endhighlight %}


[6/6] flink git commit: [FLINK-2774] [scala shell] Extended default imports for ScalaShell

Posted by fh...@apache.org.
[FLINK-2774] [scala shell] Extended default imports for ScalaShell

This closes #1247

-- PRs closed due to inactivity
This closes #1077


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c82ebbfc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c82ebbfc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c82ebbfc

Branch: refs/heads/master
Commit: c82ebbfce0b11a4b4de3126fb02ccfdad80e0837
Parents: fbc18b9
Author: Chiwan Park <ch...@apache.org>
Authored: Fri Oct 9 17:30:50 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Oct 15 11:34:01 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/scala/FlinkILoop.scala     | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c82ebbfc/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
index f13288b..9fb45a8 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -96,8 +96,22 @@ class FlinkILoop(
   }
 
   private val packageImports = Seq[String](
+    "org.apache.flink.core.fs._",
+    "org.apache.flink.core.fs.local._",
+    "org.apache.flink.api.common.io._",
+    "org.apache.flink.api.common.aggregators._",
+    "org.apache.flink.api.common.accumulators._",
+    "org.apache.flink.api.common.distributions._",
+    "org.apache.flink.api.common.operators._",
+    "org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint",
+    "org.apache.flink.api.common.functions._",
+    "org.apache.flink.api.java.io._",
+    "org.apache.flink.api.java.aggregation._",
+    "org.apache.flink.api.java.functions._",
+    "org.apache.flink.api.java.operators._",
+    "org.apache.flink.api.java.sampling._",
     "org.apache.flink.api.scala._",
-    "org.apache.flink.api.common.functions._"
+    "org.apache.flink.api.scala.utils._"
   )
 
   override def createInterpreter(): Unit = {


[2/6] flink git commit: [FLINK-2479] Refactor runtime.operators.* tests

Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
index 2427edd..d12307a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
@@ -18,13 +18,24 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
-import java.util.Comparator;
+import java.io.IOException;
 import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.java.tuple.Tuple;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.testutils.types.IntPair;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**
@@ -38,250 +49,6 @@ public final class TestData {
 	private TestData() {}
 
 	/**
-	 * Key comparator.
-	 */
-	public static class KeyComparator implements Comparator<Key> {
-		@Override
-		public int compare(Key k1, Key k2) {
-			return k1.compareTo(k2);
-		}
-	};
-
-	/**
-	 * Key implementation.
-	 */
-	public static class Key extends IntValue {
-		private static final long serialVersionUID = 1L;
-		
-		public Key() {
-			super();
-		}
-
-		public Key(int k) {
-			super(k);
-		}
-
-		public int getKey() {
-			return getValue();
-		}
-		
-		public void setKey(int key) {
-			setValue(key);
-		}
-	}
-
-	/**
-	 * Value implementation.
-	 */
-	public static class Value extends StringValue {
-		
-		private static final long serialVersionUID = 1L;
-
-		public Value() {
-			super();
-		}
-
-		public Value(String v) {
-			super(v);
-		}
-		
-		@Override
-		public boolean equals(final Object obj) {
-			if (this == obj) {
-				return true;
-			}
-			
-			if (obj.getClass() == TestData.Value.class) {
-				final StringValue other = (StringValue) obj;
-				int len = this.length();
-				
-				if (len == other.length()) {
-					final char[] tc = this.getCharArray();
-					final char[] oc = other.getCharArray();
-					int i = 0, j = 0;
-					
-					while (len-- != 0) {
-						if (tc[i++] != oc[j++]) {
-							return false;
-						}
-					}
-					return true;
-				}
-			}
-			return false;
-		}
-	}
-
-	/**
-	 * Pair generator.
-	 */
-	public static class Generator implements MutableObjectIterator<Record> {
-		
-		public enum KeyMode {
-			SORTED, RANDOM
-		};
-
-		public enum ValueMode {
-			FIX_LENGTH, RANDOM_LENGTH, CONSTANT
-		};
-
-		private static char[] alpha = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'a', 'b', 'c',
-			'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm' };
-
-		private final long seed;
-
-		private final int keyMax;
-
-		private final int valueLength;
-
-		private final KeyMode keyMode;
-
-		private final ValueMode valueMode;
-
-		private Random random;
-
-		private int counter;
-
-		private Key key;
-		private Value value;
-
-		public Generator(long seed, int keyMax, int valueLength) {
-			this(seed, keyMax, valueLength, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-		}
-
-		public Generator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode) {
-			this(seed, keyMax, valueLength, keyMode, valueMode, null);
-		}
-		
-		public Generator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode, Value constant) {
-			this.seed = seed;
-			this.keyMax = keyMax;
-			this.valueLength = valueLength;
-			this.keyMode = keyMode;
-			this.valueMode = valueMode;
-
-			this.random = new Random(seed);
-			this.counter = 0;
-			
-			this.key = new Key();
-			this.value = constant == null ? new Value() : constant;
-		}
-
-		public Record next(Record reuse) {
-			this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1);
-			if (this.valueMode != ValueMode.CONSTANT) {
-				this.value.setValue(randomString());
-			}
-			reuse.setField(0, this.key);
-			reuse.setField(1, this.value);
-			return reuse;
-		}
-
-		public Record next() {
-			return next(new Record(2));
-		}
-
-		public boolean next(org.apache.flink.types.Value[] target) {
-			this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1);
-			// TODO change this to something proper
-			((IntValue)target[0]).setValue(this.key.getValue());
-			((IntValue)target[1]).setValue(random.nextInt());
-			return true;
-		}
-
-		public int sizeOf(Record rec) {
-			// key
-			int valueLength = Integer.SIZE / 8;
-
-			// value
-			String text = rec.getField(1, Value.class).getValue();
-			int strlen = text.length();
-			int utflen = 0;
-			int c;
-			for (int i = 0; i < strlen; i++) {
-				c = text.charAt(i);
-				if ((c >= 0x0001) && (c <= 0x007F)) {
-					utflen++;
-				} else if (c > 0x07FF) {
-					utflen += 3;
-				} else {
-					utflen += 2;
-				}
-			}
-			valueLength += 2 + utflen;
-
-			return valueLength;
-		}
-
-		public void reset() {
-			this.random = new Random(seed);
-			this.counter = 0;
-		}
-
-		private String randomString() {
-			int length;
-
-			if (valueMode == ValueMode.FIX_LENGTH) {
-				length = valueLength;
-			} else {
-				length = valueLength - random.nextInt(valueLength / 3);
-			}
-
-			StringBuilder sb = new StringBuilder();
-			for (int i = 0; i < length; i++) {
-				sb.append(alpha[random.nextInt(alpha.length)]);
-			}
-			return sb.toString();
-		}
-
-	}
-	
-	/**
-	 * Record reader mock.
-	 */
-	public static class GeneratorIterator implements MutableObjectIterator<Record> {
-		
-		private final Generator generator;
-
-		private final int numberOfRecords;
-
-		private int counter;
-
-		public GeneratorIterator(Generator generator, int numberOfRecords) {
-			this.generator = generator;
-			this.generator.reset();
-			this.numberOfRecords = numberOfRecords;
-			this.counter = 0;
-		}
-
-		@Override
-		public Record next(Record target) {
-			if (counter < numberOfRecords) {
-				counter++;
-				return generator.next(target);
-			}
-			else {
-				return null;
-			}
-		}
-
-		@Override
-		public Record next() {
-			if (counter < numberOfRecords) {
-				counter++;
-				return generator.next();
-			}
-			else {
-				return null;
-			}
-		}
-		
-		public void reset() {
-			this.counter = 0;
-		}
-	}
-
-	/**
 	 * Tuple2<Integer, String> generator.
 	 */
 	public static class TupleGenerator implements MutableObjectIterator<Tuple2<Integer, String>> {
@@ -398,9 +165,8 @@ public final class TestData {
 
 	}
 
-
 	/**
-	 * Record reader mock.
+	 * Tuple reader mock.
 	 */
 	public static class TupleGeneratorIterator implements MutableObjectIterator<Tuple2<Integer, String>> {
 
@@ -443,35 +209,31 @@ public final class TestData {
 			this.counter = 0;
 		}
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static class ConstantValueIterator implements MutableObjectIterator<Record> {
-		
-		private final Key key;
-		private final Value value;
-		
+
+	public static class TupleConstantValueIterator implements MutableObjectIterator<Tuple2<Integer, String>> {
+
+		private int key;
+		private String value;
+
 		private final String valueValue;
-		
-		
+
+
 		private final int numPairs;
-		
+
 		private int pos;
-		
-		
-		public ConstantValueIterator(int keyValue, String valueValue, int numPairs) {
-			this.key = new Key(keyValue);
-			this.value = new Value();
+
+
+		public TupleConstantValueIterator(int keyValue, String valueValue, int numPairs) {
+			this.key = keyValue;
 			this.valueValue = valueValue;
 			this.numPairs = numPairs;
 		}
-		
+
 		@Override
-		public Record next(Record reuse) {
+		public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) {
 			if (pos < this.numPairs) {
-				this.value.setValue(this.valueValue + ' ' + pos);
-				reuse.setField(0, this.key);
-				reuse.setField(1, this.value);
+				this.value = this.valueValue + ' ' + pos;
+				reuse.setFields(this.key, this.value);
 				pos++;
 				return reuse;
 			}
@@ -481,8 +243,8 @@ public final class TestData {
 		}
 
 		@Override
-		public Record next() {
-			return next(new Record(2));
+		public Tuple2<Integer, String> next() {
+			return next(new Tuple2<Integer, String>());
 		}
 
 		public void reset() {
@@ -490,45 +252,307 @@ public final class TestData {
 		}
 	}
 
-	public static class TupleConstantValueIterator implements MutableObjectIterator<Tuple2<Integer, String>> {
+	/**
+	 * An iterator that returns the Key/Value pairs with identical value a given number of times.
+	 */
+	public static final class ConstantIntIntTuplesIterator implements MutableObjectIterator<Tuple2<Integer, Integer>> {
 
-		private int key;
-		private String value;
+		private final int key;
+		private final int value;
 
-		private final String valueValue;
+		private int numLeft;
 
+		public ConstantIntIntTuplesIterator(int key, int value, int count) {
+			this.key = key;
+			this.value = value;
+			this.numLeft = count;
+		}
 
-		private final int numPairs;
+		@Override
+		public Tuple2<Integer, Integer> next(Tuple2<Integer, Integer> reuse) {
+			if (this.numLeft > 0) {
+				this.numLeft--;
+				reuse.setField(this.key, 0);
+				reuse.setField(this.value, 1);
+				return reuse;
+			} else {
+				return null;
+			}
+		}
 
-		private int pos;
+		@Override
+		public Tuple2<Integer, Integer> next() {
+			return next(new Tuple2<>(0, 0));
+		}
+	}
 
+	//----Tuple2<Integer, String>
+	private static final TupleTypeInfo<Tuple2<Integer, String>> typeInfoIntString = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class);
 
-		public TupleConstantValueIterator(int keyValue, String valueValue, int numPairs) {
-			this.key = keyValue;
-			this.valueValue = valueValue;
-			this.numPairs = numPairs;
+	private static final TypeSerializerFactory<Tuple2<Integer, String>> serializerFactoryIntString = new MockTupleSerializerFactory(typeInfoIntString);
+
+	public static TupleTypeInfo<Tuple2<Integer, String>> getIntStringTupleTypeInfo() {
+		return typeInfoIntString;
+	}
+
+	public static TypeSerializerFactory<Tuple2<Integer, String>> getIntStringTupleSerializerFactory() {
+		return serializerFactoryIntString;
+	}
+
+	public static TypeSerializer<Tuple2<Integer, String>> getIntStringTupleSerializer() {
+		return serializerFactoryIntString.getSerializer();
+	}
+
+	public static TypeComparator<Tuple2<Integer, String>> getIntStringTupleComparator() {
+		return getIntStringTupleTypeInfo().createComparator(new int[]{0}, new boolean[]{true}, 0, null);
+	}
+
+	public static MockTuple2Reader<Tuple2<Integer, String>> getIntStringTupleReader() {
+		return new MockTuple2Reader<Tuple2<Integer, String>>();
+	}
+
+	//----Tuple2<Integer, Integer>
+	private static final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfoIntInt = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class);
+
+	private static final TypeSerializerFactory<Tuple2<Integer, Integer>> serializerFactoryIntInt = new MockTupleSerializerFactory(typeInfoIntInt);
+
+	public static TupleTypeInfo<Tuple2<Integer, Integer>> getIntIntTupleTypeInfo() {
+		return typeInfoIntInt;
+	}
+
+	public static TypeSerializerFactory<Tuple2<Integer, Integer>> getIntIntTupleSerializerFactory() {
+		return serializerFactoryIntInt;
+	}
+
+	public static TypeSerializer<Tuple2<Integer, Integer>> getIntIntTupleSerializer() {
+		return getIntIntTupleSerializerFactory().getSerializer();
+	}
+
+	public static TypeComparator<Tuple2<Integer, Integer>> getIntIntTupleComparator() {
+		return getIntIntTupleTypeInfo().createComparator(new int[]{0}, new boolean[]{true}, 0, null);
+	}
+
+	public static MockTuple2Reader<Tuple2<Integer, Integer>> getIntIntTupleReader() {
+		return new MockTuple2Reader<>();
+	}
+
+	//----Tuple2<?, ?>
+	private static class MockTupleSerializerFactory<T extends Tuple> implements TypeSerializerFactory<T> {
+		private final TupleTypeInfo<T> info;
+
+		public MockTupleSerializerFactory(TupleTypeInfo<T> info) {
+			this.info = info;
 		}
 
 		@Override
-		public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) {
-			if (pos < this.numPairs) {
-				this.value = this.valueValue + ' ' + pos;
-				reuse.setFields(this.key, this.value);
-				pos++;
+		public void writeParametersToConfig(Configuration config) {
+			throw new UnsupportedOperationException("Not supported yet.");
+		}
+
+		@Override
+		public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {
+			throw new UnsupportedOperationException("Not supported yet.");
+		}
+
+		@Override
+		public TypeSerializer<T> getSerializer() {
+			return info.createSerializer(null);
+		}
+
+		@Override
+		public Class<T> getDataType() {
+			return info.getTypeClass();
+		}
+	}
+
+	public static class MockTuple2Reader<T extends Tuple2> implements MutableObjectIterator<T> {
+		private final Tuple2 SENTINEL = new Tuple2();
+
+		private final BlockingQueue<Tuple2> queue;
+
+		public MockTuple2Reader() {
+			this.queue = new ArrayBlockingQueue<Tuple2>(32, false);
+		}
+
+		public MockTuple2Reader(int size) {
+			this.queue = new ArrayBlockingQueue<Tuple2>(size, false);
+		}
+
+		@Override
+		public T next(T reuse) {
+			Tuple2 r = null;
+			while (r == null) {
+				try {
+					r = queue.take();
+				} catch (InterruptedException iex) {
+					throw new RuntimeException("Reader was interrupted.");
+				}
+			}
+
+			if (r.equals(SENTINEL)) {
+				// put the sentinel back, to ensure that repeated calls do not block
+				try {
+					queue.put(r);
+				} catch (InterruptedException e) {
+					throw new RuntimeException("Reader was interrupted.");
+				}
+				return null;
+			} else {
+				reuse.setField(r.getField(0), 0);
+				reuse.setField(r.getField(1), 1);
 				return reuse;
 			}
-			else {
+		}
+
+		@Override
+		public T next() {
+			Tuple2 r = null;
+			while (r == null) {
+				try {
+					r = queue.take();
+				} catch (InterruptedException iex) {
+					throw new RuntimeException("Reader was interrupted.");
+				}
+			}
+
+			if (r.equals(SENTINEL)) {
+				// put the sentinel back, to ensure that repeated calls do not block
+				try {
+					queue.put(r);
+				} catch (InterruptedException e) {
+					throw new RuntimeException("Reader was interrupted.");
+				}
 				return null;
+			} else {
+				Tuple2 result = new Tuple2(r.f0, r.f1);
+				return (T) result;
+			}
+		}
+
+		public void emit(Tuple2 element) throws InterruptedException {
+			queue.put(new Tuple2(element.f0, element.f1));
+		}
+
+		public void close() {
+			try {
+				queue.put(SENTINEL);
+			} catch (InterruptedException e) {
+				throw new RuntimeException(e);
 			}
 		}
+	}
+	
+	public static class IntPairComparator extends TypeComparator<IntPair> {
+
+		private static final long serialVersionUID = 1L;
+
+		private int reference;
+
+		private final TypeComparator[] comparators = new TypeComparator[]{new IntComparator(true)};
 
 		@Override
-		public Tuple2<Integer, String> next() {
-			return next(new Tuple2<Integer, String>());
+		public int hash(IntPair object) {
+			return comparators[0].hash(object.getKey());
 		}
 
-		public void reset() {
-			this.pos = 0;
+		@Override
+		public void setReference(IntPair toCompare) {
+			this.reference = toCompare.getKey();
+		}
+
+		@Override
+		public boolean equalToReference(IntPair candidate) {
+			return candidate.getKey() == this.reference;
+		}
+
+		@Override
+		public int compareToReference(TypeComparator<IntPair> referencedAccessors) {
+			final IntPairComparator comp = (IntPairComparator) referencedAccessors;
+			return comp.reference - this.reference;
+		}
+
+		@Override
+		public int compare(IntPair first, IntPair second) {
+			return first.getKey() - second.getKey();
+		}
+
+		@Override
+		public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
+			return source1.readInt() - source2.readInt();
+		}
+
+		@Override
+		public boolean supportsNormalizedKey() {
+			return true;
+		}
+
+		@Override
+		public int getNormalizeKeyLen() {
+			return 4;
+		}
+
+		@Override
+		public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+			return keyBytes < 4;
+		}
+
+		@Override
+		public void putNormalizedKey(IntPair record, MemorySegment target, int offset, int len) {
+			// see IntValue for a documentation of the logic
+			final int value = record.getKey() - Integer.MIN_VALUE;
+
+			if (len == 4) {
+				target.putIntBigEndian(offset, value);
+			} else if (len <= 0) {
+			} else if (len < 4) {
+				for (int i = 0; len > 0; len--, i++) {
+					target.put(offset + i, (byte) ((value >>> ((3 - i) << 3)) & 0xff));
+				}
+			} else {
+				target.putIntBigEndian(offset, value);
+				for (int i = 4; i < len; i++) {
+					target.put(offset + i, (byte) 0);
+				}
+			}
+		}
+
+		@Override
+		public boolean invertNormalizedKey() {
+			return false;
+		}
+
+		@Override
+		public IntPairComparator duplicate() {
+			return new IntPairComparator();
+		}
+
+		@Override
+		public int extractKeys(Object record, Object[] target, int index) {
+			target[index] = ((IntPair) record).getKey();
+			return 1;
+		}
+
+		@Override
+		public TypeComparator[] getFlatComparators() {
+			return comparators;
+		}
+
+		@Override
+		public boolean supportsSerializationWithKeyNormalization() {
+			return true;
+		}
+
+		@Override
+		public void writeWithKeyNormalization(IntPair record, DataOutputView target) throws IOException {
+			target.writeInt(record.getKey() - Integer.MIN_VALUE);
+			target.writeInt(record.getValue());
+		}
+
+		@Override
+		public IntPair readWithKeyDenormalization(IntPair reuse, DataInputView source) throws IOException {
+			reuse.setKey(source.readInt() + Integer.MIN_VALUE);
+			reuse.setValue(source.readInt());
+			return reuse;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index f112ff8..4c5a07e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.runtime.operators.util;
 
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -37,10 +36,8 @@ import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
@@ -78,21 +75,21 @@ public class HashVsSortMiniBenchmark {
 	private IOManager ioManager;
 	private MemoryManager memoryManager;
 	
-	private TypeSerializerFactory<Record> serializer1;
-	private TypeSerializerFactory<Record> serializer2;
-	private TypeComparator<Record> comparator1;
-	private TypeComparator<Record> comparator2;
-	private TypePairComparator<Record, Record> pairComparator11;
+	private TypeSerializerFactory<Tuple2<Integer, String>> serializer1;
+	private TypeSerializerFactory<Tuple2<Integer, String>> serializer2;
+	private TypeComparator<Tuple2<Integer, String>> comparator1;
+	private TypeComparator<Tuple2<Integer, String>> comparator2;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator11;
 
 
 	@SuppressWarnings("unchecked")
 	@Before
 	public void beforeTest() {
-		this.serializer1 = RecordSerializerFactory.get();
-		this.serializer2 = RecordSerializerFactory.get();
-		this.comparator1 = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.comparator2 = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.pairComparator11 = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+		this.serializer1 = TestData.getIntStringTupleSerializerFactory();
+		this.serializer2 = TestData.getIntStringTupleSerializerFactory();
+		this.comparator1 = TestData.getIntStringTupleComparator();
+		this.comparator2 = TestData.getIntStringTupleComparator();
+		this.pairComparator11 = new GenericPairComparator(this.comparator1, this.comparator2);
 		
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 		this.ioManager = new IOManagerAsync();
@@ -120,31 +117,31 @@ public class HashVsSortMiniBenchmark {
 	public void testSortBothMerge() {
 		try {
 			
-			Generator generator1 = new Generator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final JoinFunction matcher = new NoOpMatcher();
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new NoOpMatcher();
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 			
 			long start = System.nanoTime();
 			
-			final UnilateralSortMerger<Record> sorter1 = new UnilateralSortMerger<Record>(
+			final UnilateralSortMerger<Tuple2<Integer, String>> sorter1 = new UnilateralSortMerger<>(
 					this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1, 
 					this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
 			
-			final UnilateralSortMerger<Record> sorter2 = new UnilateralSortMerger<Record>(
+			final UnilateralSortMerger<Tuple2<Integer, String>> sorter2 = new UnilateralSortMerger<>(
 					this.memoryManager, this.ioManager, input2, this.parentTask, this.serializer2, 
 					this.comparator2.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
 			
-			final MutableObjectIterator<Record> sortedInput1 = sorter1.getIterator();
-			final MutableObjectIterator<Record> sortedInput2 = sorter2.getIterator();
+			final MutableObjectIterator<Tuple2<Integer, String>> sortedInput1 = sorter1.getIterator();
+			final MutableObjectIterator<Tuple2<Integer, String>> sortedInput2 = sorter2.getIterator();
 			
 			// compare with iterator values
-			ReusingMergeInnerJoinIterator<Record, Record, Record> iterator =
-				new ReusingMergeInnerJoinIterator<Record, Record, Record>(sortedInput1, sortedInput2,
+			ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new ReusingMergeInnerJoinIterator<>(sortedInput1, sortedInput2,
 						this.serializer1.getSerializer(), this.comparator1, this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
 						this.memoryManager, this.ioManager, MEMORY_PAGES_FOR_MERGE, this.parentTask);
 			
@@ -170,21 +167,21 @@ public class HashVsSortMiniBenchmark {
 	@Test
 	public void testBuildFirst() {
 		try {
-			Generator generator1 = new Generator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final JoinFunction matcher = new NoOpMatcher();
+			final FlatJoinFunction matcher = new NoOpMatcher();
 			
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 			
 			long start = System.nanoTime();
 			
 			// compare with iterator values
-			final ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
-					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+			final ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 							this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
 							this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
@@ -209,21 +206,21 @@ public class HashVsSortMiniBenchmark {
 	@Test
 	public void testBuildSecond() {
 		try {
-			Generator generator1 = new Generator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final JoinFunction matcher = new NoOpMatcher();
+			final FlatJoinFunction matcher = new NoOpMatcher();
 			
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 			
 			long start = System.nanoTime();
 			
 			// compare with iterator values
-			ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
-					new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+			ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildSecondHashMatchIterator<>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 						this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
 						this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
@@ -246,11 +243,11 @@ public class HashVsSortMiniBenchmark {
 	}
 	
 	
-	private static final class NoOpMatcher extends JoinFunction {
+	private static final class NoOpMatcher implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
-		public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception {
+		public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception {
 		}
 	}
 }


[3/6] flink git commit: [FLINK-2479] Refactor runtime.operators.* tests

Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index e1e2c0a..7b7b940 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.IOException;
-import java.util.Comparator;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -30,9 +29,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -40,12 +39,9 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
@@ -72,9 +68,11 @@ public class CombiningUnilateralSortMergerITCase {
 
 	private MemoryManager memoryManager;
 
-	private TypeSerializerFactory<Record> serializerFactory;
+	private TypeSerializerFactory<Tuple2<Integer, String>> serializerFactory1;
+	private TypeSerializerFactory<Tuple2<Integer, Integer>> serializerFactory2;
 	
-	private TypeComparator<Record> comparator;
+	private TypeComparator<Tuple2<Integer, String>> comparator1;
+	private TypeComparator<Tuple2<Integer, Integer>> comparator2;
 
 	@SuppressWarnings("unchecked")
 	@Before
@@ -82,8 +80,11 @@ public class CombiningUnilateralSortMergerITCase {
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManagerAsync();
 		
-		this.serializerFactory = RecordSerializerFactory.get();
-		this.comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+		this.serializerFactory1 = TestData.getIntStringTupleSerializerFactory();
+		this.comparator1 = TestData.getIntStringTupleComparator();
+
+		this.serializerFactory2 = TestData.getIntIntTupleSerializerFactory();
+		this.comparator2 = TestData.getIntIntTupleComparator();
 	}
 
 	@After
@@ -107,32 +108,30 @@ public class CombiningUnilateralSortMergerITCase {
 		int noKeys = 100;
 		int noKeyCnt = 10000;
 
-		MockRecordReader reader = new MockRecordReader();
+		TestData.MockTuple2Reader<Tuple2<Integer, Integer>> reader = TestData.getIntIntTupleReader();
 
 		LOG.debug("initializing sortmerger");
 		
 		TestCountCombiner comb = new TestCountCombiner();
 		
-		Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, 
-				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
+		Sorter<Tuple2<Integer, Integer>> merger = new CombiningUnilateralSortMerger<>(comb,
+				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2,
 				0.25, 64, 0.7f, false);
 
-		final Record rec = new Record();
-		rec.setField(1, new IntValue(1));
-		final TestData.Key key = new TestData.Key();
+		final Tuple2<Integer, Integer> rec = new Tuple2<>();
+		rec.setField(1, 1);
 		
 		for (int i = 0; i < noKeyCnt; i++) {
 			for (int j = 0; j < noKeys; j++) {
-				key.setKey(j);
-				rec.setField(0, key);
+				rec.setField(j, 0);
 				reader.emit(rec);
 			}
 		}
 		reader.close();
 		
-		MutableObjectIterator<Record> iterator = merger.getIterator();
+		MutableObjectIterator<Tuple2<Integer, Integer>> iterator = merger.getIterator();
 
-		Iterator<Integer> result = getReducingIterator(iterator, serializerFactory.getSerializer(), comparator.duplicate());
+		Iterator<Integer> result = getReducingIterator(iterator, serializerFactory2.getSerializer(), comparator2.duplicate());
 		while (result.hasNext()) {
 			Assert.assertEquals(noKeyCnt, result.next().intValue());
 		}
@@ -148,32 +147,30 @@ public class CombiningUnilateralSortMergerITCase {
 		int noKeys = 100;
 		int noKeyCnt = 10000;
 
-		MockRecordReader reader = new MockRecordReader();
+		TestData.MockTuple2Reader<Tuple2<Integer, Integer>> reader = TestData.getIntIntTupleReader();
 
 		LOG.debug("initializing sortmerger");
 		
 		TestCountCombiner comb = new TestCountCombiner();
 		
-		Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, 
-				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
+		Sorter<Tuple2<Integer, Integer>> merger = new CombiningUnilateralSortMerger<>(comb,
+				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2,
 				0.01, 64, 0.005f, true);
 
-		final Record rec = new Record();
-		rec.setField(1, new IntValue(1));
-		final TestData.Key key = new TestData.Key();
+		final Tuple2<Integer, Integer> rec = new Tuple2<>();
+		rec.setField(1, 1);
 		
 		for (int i = 0; i < noKeyCnt; i++) {
 			for (int j = 0; j < noKeys; j++) {
-				key.setKey(j);
-				rec.setField(0, key);
+				rec.setField(j, 0);
 				reader.emit(rec);
 			}
 		}
 		reader.close();
 		
-		MutableObjectIterator<Record> iterator = merger.getIterator();
+		MutableObjectIterator<Tuple2<Integer, Integer>> iterator = merger.getIterator();
 
-		Iterator<Integer> result = getReducingIterator(iterator, serializerFactory.getSerializer(), comparator.duplicate());
+		Iterator<Integer> result = getReducingIterator(iterator, serializerFactory2.getSerializer(), comparator2.duplicate());
 		while (result.hasNext()) {
 			Assert.assertEquals(noKeyCnt, result.next().intValue());
 		}
@@ -187,65 +184,60 @@ public class CombiningUnilateralSortMergerITCase {
 	@Test
 	public void testSortAndValidate() throws Exception
 	{
-		final Hashtable<TestData.Key, Integer> countTable = new Hashtable<TestData.Key, Integer>(KEY_MAX);
+		final Hashtable<Integer, Integer> countTable = new Hashtable<>(KEY_MAX);
 		for (int i = 1; i <= KEY_MAX; i++) {
-			countTable.put(new TestData.Key(i), 0);
+			countTable.put(i, 0);
 		}
 
 		// comparator
-		final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+		final TypeComparator<Integer> keyComparator = new IntComparator(true);
 
 		// reader
-		MockRecordReader reader = new MockRecordReader();
+		TestData.MockTuple2Reader<Tuple2<Integer, String>> reader = TestData.getIntStringTupleReader();
 
 		// merge iterator
 		LOG.debug("initializing sortmerger");
 		
 		TestCountCombiner2 comb = new TestCountCombiner2();
 		
-		Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, 
-				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
+		Sorter<Tuple2<Integer, String>> merger = new CombiningUnilateralSortMerger<>(comb,
+				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory1, this.comparator1,
 				0.25, 2, 0.7f, false);
 
 		// emit data
 		LOG.debug("emitting data");
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-		Record rec = new Record();
-		final TestData.Value value = new TestData.Value("1");
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+		Tuple2<Integer, String> rec = new Tuple2<>();
 		
 		for (int i = 0; i < NUM_PAIRS; i++) {
 			Assert.assertTrue((rec = generator.next(rec)) != null);
-			final TestData.Key key = rec.getField(0, TestData.Key.class);
-			rec.setField(1, value);
+			final Integer key = rec.f0;
+			rec.setField("1", 1);
 			reader.emit(rec);
 			
-			countTable.put(new TestData.Key(key.getKey()), countTable.get(key) + 1);
+			countTable.put(key, countTable.get(key) + 1);
 		}
 		reader.close();
-		rec = null;
 
 		// check order
-		MutableObjectIterator<Record> iterator = merger.getIterator();
+		MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
 		
 		LOG.debug("checking results");
 		
-		Record rec1 = new Record();
-		Record rec2 = new Record();
+		Tuple2<Integer, String> rec1 = new Tuple2<>();
+		Tuple2<Integer, String> rec2 = new Tuple2<>();
 		
 		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
-		countTable.put(new TestData.Key(rec1.getField(0, TestData.Key.class).getKey()), countTable.get(rec1.getField(0, TestData.Key.class)) - (Integer.parseInt(rec1.getField(1, TestData.Value.class).toString())));
+		countTable.put(rec1.f0, countTable.get(rec1.f0) - (Integer.parseInt(rec1.f1)));
 
 		while ((rec2 = iterator.next(rec2)) != null) {
-			final Key k1 = rec1.getField(0, TestData.Key.class);
-			final Key k2 = rec2.getField(0, TestData.Key.class);
+			int k1 = rec1.f0;
+			int k2 = rec2.f0;
 			
 			Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
-			countTable.put(new TestData.Key(k2.getKey()), countTable.get(k2) - (Integer.parseInt(rec2.getField(1, TestData.Value.class).toString())));
+			countTable.put(k2, countTable.get(k2) - (Integer.parseInt(rec2.f1)));
 			
-			Record tmp = rec1;
 			rec1 = rec2;
-			k1.setKey(k2.getKey());
-			rec2 = tmp;
 		}
 
 		for (Integer cnt : countTable.values()) {
@@ -260,10 +252,10 @@ public class CombiningUnilateralSortMergerITCase {
 
 	// --------------------------------------------------------------------------------------------
 	
-	public static class TestCountCombiner extends RichGroupReduceFunction<Record, Record> {
+	public static class TestCountCombiner extends RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
 		private static final long serialVersionUID = 1L;
 		
-		private final IntValue count = new IntValue();
+		private Integer count = 0;
 		
 		public volatile boolean opened = false;
 		
@@ -271,21 +263,21 @@ public class CombiningUnilateralSortMergerITCase {
 		
 		
 		@Override
-		public void combine(Iterable<Record> values, Collector<Record> out) {
-			Record rec = null;
+		public void combine(Iterable<Tuple2<Integer, Integer>> values, Collector<Tuple2<Integer, Integer>> out) {
+			Tuple2<Integer, Integer> rec = new Tuple2<>();
 			int cnt = 0;
-			for (Record next : values) {
+			for (Tuple2<Integer, Integer> next : values) {
 				rec = next;
-				cnt += rec.getField(1, IntValue.class).getValue();
+				cnt += rec.f1;
 			}
 			
-			this.count.setValue(cnt);
-			rec.setField(1, this.count);
+			this.count = cnt;
+			rec.setField(this.count, 1);
 			out.collect(rec);
 		}
 
 		@Override
-		public void reduce(Iterable<Record> values, Collector<Record> out) {}
+		public void reduce(Iterable<Tuple2<Integer, Integer>> values, Collector<Tuple2<Integer, Integer>> out) {}
 		
 		@Override
 		public void open(Configuration parameters) throws Exception {
@@ -298,7 +290,7 @@ public class CombiningUnilateralSortMergerITCase {
 		}
 	}
 
-	public static class TestCountCombiner2 extends RichGroupReduceFunction<Record, Record> {
+	public static class TestCountCombiner2 extends RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
 		private static final long serialVersionUID = 1L;
 		
 		public volatile boolean opened = false;
@@ -306,19 +298,19 @@ public class CombiningUnilateralSortMergerITCase {
 		public volatile boolean closed = false;
 		
 		@Override
-		public void combine(Iterable<Record> values, Collector<Record> out) {
-			Record rec = null;
+		public void combine(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) {
+			Tuple2<Integer, String> rec = new Tuple2<>();
 			int cnt = 0;
-			for (Record next : values) {
+			for (Tuple2<Integer, String> next : values) {
 				rec = next;
-				cnt += Integer.parseInt(rec.getField(1, TestData.Value.class).toString());
+				cnt += Integer.parseInt(rec.f1);
 			}
 
-			out.collect(new Record(rec.getField(0, Key.class), new TestData.Value(cnt + "")));
+			out.collect(new Tuple2(rec.f0, cnt + ""));
 		}
 
 		@Override
-		public void reduce(Iterable<Record> values, Collector<Record> out) {
+		public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) {
 			// yo, nothing, mon
 		}
 		
@@ -333,9 +325,9 @@ public class CombiningUnilateralSortMergerITCase {
 		}
 	}
 	
-	private static Iterator<Integer> getReducingIterator(MutableObjectIterator<Record> data, TypeSerializer<Record> serializer, TypeComparator<Record> comparator) {
+	private static Iterator<Integer> getReducingIterator(MutableObjectIterator<Tuple2<Integer, Integer>> data, TypeSerializer<Tuple2<Integer, Integer>> serializer, TypeComparator<Tuple2<Integer, Integer>>  comparator) {
 		
-		final ReusingKeyGroupedIterator<Record> groupIter = new ReusingKeyGroupedIterator<Record>(data, serializer, comparator);
+		final ReusingKeyGroupedIterator<Tuple2<Integer, Integer>>  groupIter = new ReusingKeyGroupedIterator<> (data, serializer, comparator);
 		
 		return new Iterator<Integer>() {
 			
@@ -360,13 +352,13 @@ public class CombiningUnilateralSortMergerITCase {
 				if (hasNext()) {
 					hasNext = false;
 					
-					Iterator<Record> values = groupIter.getValues();
+					Iterator<Tuple2<Integer, Integer>> values = groupIter.getValues();
 					
-					Record rec = null;
+					Tuple2<Integer, Integer> rec;
 					int cnt = 0;
 					while (values.hasNext()) {
 						rec = values.next();
-						cnt += rec.getField(1, IntValue.class).getValue();
+						cnt += rec.f1;
 					}
 					
 					return cnt;

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index 9f0b3d9..b19591b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import java.util.Comparator;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -31,14 +30,10 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.RandomIntPairGenerator;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.runtime.operators.testutils.types.IntPair;
-import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
 import org.junit.Assert;
@@ -58,7 +53,7 @@ public class ExternalSortITCase {
 
 	private static final int VALUE_LENGTH = 114;
 	
-	private static final Value VAL = new Value("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+	private static final String VAL = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
 
 	private static final int NUM_PAIRS = 200000;
 
@@ -70,9 +65,9 @@ public class ExternalSortITCase {
 
 	private MemoryManager memoryManager;
 	
-	private TypeSerializerFactory<Record> pactRecordSerializer;
+	private TypeSerializerFactory<Tuple2<Integer, String>> pactRecordSerializer;
 	
-	private TypeComparator<Record> pactRecordComparator;
+	private TypeComparator<Tuple2<Integer, String>> pactRecordComparator;
 	
 	private boolean testSuccess;
 
@@ -84,8 +79,8 @@ public class ExternalSortITCase {
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManagerAsync();
 		
-		this.pactRecordSerializer = RecordSerializerFactory.get();
-		this.pactRecordComparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+		this.pactRecordSerializer = TestData.getIntStringTupleSerializerFactory();
+		this.pactRecordComparator = TestData.getIntStringTupleComparator();
 	}
 
 	@After
@@ -109,15 +104,15 @@ public class ExternalSortITCase {
 	public void testInMemorySort() {
 		try {
 			// comparator
-			final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+			final TypeComparator<Integer> keyComparator = new IntComparator(true);
 			
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS);
+			final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
+			final MutableObjectIterator<Tuple2<Integer, String>> source = new TestData.TupleGeneratorIterator(generator, NUM_PAIRS);
 	
 			// merge iterator
 			LOG.debug("Initializing sortmerger...");
 			
-			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
+			Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
 				source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
 					(double)64/78, 2, 0.9f, true);
 	
@@ -125,26 +120,22 @@ public class ExternalSortITCase {
 			LOG.debug("Reading and sorting data...");
 	
 			// check order
-			MutableObjectIterator<Record> iterator = merger.getIterator();
+			MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
 			
 			LOG.debug("Checking results...");
 			int pairsEmitted = 1;
 	
-			Record rec1 = new Record();
-			Record rec2 = new Record();
+			Tuple2<Integer, String> rec1 = new Tuple2<>();
+			Tuple2<Integer, String> rec2 = new Tuple2<>();
 			
 			Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
 			while ((rec2 = iterator.next(rec2)) != null) {
-				final Key k1 = rec1.getField(0, TestData.Key.class);
-				final Key k2 = rec2.getField(0, TestData.Key.class);
 				pairsEmitted++;
 				
-				Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
-				
-				Record tmp = rec1;
+				Assert.assertTrue(keyComparator.compare(rec1.f0, rec2.f0) <= 0);
+
+				Tuple2<Integer, String> tmp = rec1;
 				rec1 = rec2;
-				k1.setKey(k2.getKey());
-				
 				rec2 = tmp;
 			}
 			Assert.assertTrue(NUM_PAIRS == pairsEmitted);
@@ -162,15 +153,15 @@ public class ExternalSortITCase {
 	public void testInMemorySortUsing10Buffers() {
 		try {
 			// comparator
-			final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+			final TypeComparator<Integer> keyComparator = new IntComparator(true);
 			
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS);
+			final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
+			final MutableObjectIterator<Tuple2<Integer, String>> source = new TestData.TupleGeneratorIterator(generator, NUM_PAIRS);
 	
 			// merge iterator
 			LOG.debug("Initializing sortmerger...");
 			
-			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
+			Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
 					source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
 					(double)64/78, 10, 2, 0.9f, false);
 	
@@ -178,26 +169,22 @@ public class ExternalSortITCase {
 			LOG.debug("Reading and sorting data...");
 	
 			// check order
-			MutableObjectIterator<Record> iterator = merger.getIterator();
+			MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
 			
 			LOG.debug("Checking results...");
 			int pairsEmitted = 1;
 	
-			Record rec1 = new Record();
-			Record rec2 = new Record();
+			Tuple2<Integer, String> rec1 = new Tuple2<>();
+			Tuple2<Integer, String> rec2 = new Tuple2<>();
 			
 			Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
 			while ((rec2 = iterator.next(rec2)) != null) {
-				final Key k1 = rec1.getField(0, TestData.Key.class);
-				final Key k2 = rec2.getField(0, TestData.Key.class);
 				pairsEmitted++;
 				
-				Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
-				
-				Record tmp = rec1;
+				Assert.assertTrue(keyComparator.compare(rec1.f0, rec2.f0) <= 0);
+
+				Tuple2<Integer, String> tmp = rec1;
 				rec1 = rec2;
-				k1.setKey(k2.getKey());
-				
 				rec2 = tmp;
 			}
 			Assert.assertTrue(NUM_PAIRS == pairsEmitted);
@@ -215,15 +202,15 @@ public class ExternalSortITCase {
 	public void testSpillingSort() {
 		try {
 			// comparator
-			final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+			final TypeComparator<Integer> keyComparator = new IntComparator(true);
 			
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS);
+			final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
+			final MutableObjectIterator<Tuple2<Integer, String>> source = new TestData.TupleGeneratorIterator(generator, NUM_PAIRS);
 	
 			// merge iterator
 			LOG.debug("Initializing sortmerger...");
 			
-			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
+			Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
 					source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
 					(double)16/78, 64, 0.7f, true);
 	
@@ -231,26 +218,22 @@ public class ExternalSortITCase {
 			LOG.debug("Reading and sorting data...");
 	
 			// check order
-			MutableObjectIterator<Record> iterator = merger.getIterator();
+			MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
 			
 			LOG.debug("Checking results...");
 			int pairsEmitted = 1;
 	
-			Record rec1 = new Record();
-			Record rec2 = new Record();
+			Tuple2<Integer, String> rec1 = new Tuple2<>();
+			Tuple2<Integer, String> rec2 = new Tuple2<>();
 			
 			Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
 			while ((rec2 = iterator.next(rec2)) != null) {
-				final Key k1 = rec1.getField(0, TestData.Key.class);
-				final Key k2 = rec2.getField(0, TestData.Key.class);
 				pairsEmitted++;
 				
-				Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
-				
-				Record tmp = rec1;
+				Assert.assertTrue(keyComparator.compare(rec1.f0, rec2.f0) <= 0);
+
+				Tuple2<Integer, String> tmp = rec1;
 				rec1 = rec2;
-				k1.setKey(k2.getKey());
-				
 				rec2 = tmp;
 			}
 			Assert.assertTrue(NUM_PAIRS == pairsEmitted);
@@ -271,15 +254,15 @@ public class ExternalSortITCase {
 			final int PAIRS = 10000000;
 	
 			// comparator
-			final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+			final TypeComparator<Integer> keyComparator = new IntComparator(true);
 	
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, PAIRS);
+			final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+			final MutableObjectIterator<Tuple2<Integer, String>> source = new TestData.TupleGeneratorIterator(generator, PAIRS);
 			
 			// merge iterator
 			LOG.debug("Initializing sortmerger...");
 			
-			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
+			Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
 					source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
 					(double)64/78, 16, 0.7f, false);
 			
@@ -287,26 +270,23 @@ public class ExternalSortITCase {
 			LOG.debug("Emitting data...");
 	
 			// check order
-			MutableObjectIterator<Record> iterator = merger.getIterator();
+			MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
 			
 			LOG.debug("Checking results...");
 			int pairsRead = 1;
 			int nextStep = PAIRS / 20;
 	
-			Record rec1 = new Record();
-			Record rec2 = new Record();
+			Tuple2<Integer, String> rec1 = new Tuple2<>();
+			Tuple2<Integer, String> rec2 = new Tuple2<>();
 			
 			Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
 			while ((rec2 = iterator.next(rec2)) != null) {
-				final Key k1 = rec1.getField(0, TestData.Key.class);
-				final Key k2 = rec2.getField(0, TestData.Key.class);
 				pairsRead++;
 				
-				Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); 
-				
-				Record tmp = rec1;
+				Assert.assertTrue(keyComparator.compare(rec1.f0, rec2.f0) <= 0);
+
+				Tuple2<Integer, String> tmp = rec1;
 				rec1 = rec2;
-				k1.setKey(k2.getKey());
 				rec2 = tmp;
 				
 				// log
@@ -335,7 +315,7 @@ public class ExternalSortITCase {
 			final RandomIntPairGenerator generator = new RandomIntPairGenerator(12345678, PAIRS);
 			
 			final TypeSerializerFactory<IntPair> serializerFactory = new IntPairSerializer.IntPairSerializerFactory();
-			final TypeComparator<IntPair> comparator = new IntPairComparator();
+			final TypeComparator<IntPair> comparator = new TestData.IntPairComparator();
 			
 			// merge iterator
 			LOG.debug("Initializing sortmerger...");

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
index 07330ee..3329335 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
@@ -19,16 +19,12 @@
 package org.apache.flink.runtime.operators.sort;
 
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.runtime.operators.sort.MergeIterator;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Assert;
 import org.junit.Before;
@@ -36,33 +32,33 @@ import org.junit.Test;
 
 public class MergeIteratorTest {
 	
-	private TypeComparator<Record> comparator;
+	private TypeComparator<Tuple2<Integer, String>> comparator;
 	
 	
 	@SuppressWarnings("unchecked")
 	@Before
 	public void setup() {
-		this.comparator = new RecordComparator(new int[] {0}, new Class[] { TestData.Key.class});
+		this.comparator = TestData.getIntStringTupleComparator();
 	}
 	
 	
-	private MutableObjectIterator<Record> newIterator(final int[] keys, final String[] values) {
+	private MutableObjectIterator<Tuple2<Integer, String>> newIterator(final int[] keys, final String[] values) {
 		
-		return new MutableObjectIterator<Record>() {
+		return new MutableObjectIterator<Tuple2<Integer, String>>() {
 			
-			private Key key = new Key();
-			private Value value = new Value();
+			private int key = 0;
+			private String value = new String();
 			
 			private int current = 0;
 
 			@Override
-			public Record next(Record reuse) {
+			public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) {
 				if (current < keys.length) {
-					key.setKey(keys[current]);
-					value.setValue(values[current]);
+					key = keys[current];
+					value = values[current];
 					current++;
-					reuse.setField(0, key);
-					reuse.setField(1, value);
+					reuse.setField(key, 0);
+					reuse.setField(value, 1);
 					return reuse;
 				}
 				else {
@@ -71,9 +67,9 @@ public class MergeIteratorTest {
 			}
 
 			@Override
-			public Record next() {
+			public Tuple2<Integer, String> next() {
 				if (current < keys.length) {
-					Record result = new Record(new Key(keys[current]), new Value(values[current]));
+					Tuple2<Integer, String> result = new Tuple2<>(keys[current], values[current]);
 					current++;
 					return result;
 				}
@@ -88,37 +84,37 @@ public class MergeIteratorTest {
 	public void testMergeOfTwoStreams() throws Exception
 	{
 		// iterators
-		List<MutableObjectIterator<Record>> iterators = new ArrayList<MutableObjectIterator<Record>>();
+		List<MutableObjectIterator<Tuple2<Integer, String>>> iterators = new ArrayList<>();
 		iterators.add(newIterator(new int[] { 1, 2, 4, 5, 10 }, new String[] { "1", "2", "4", "5", "10" }));
 		iterators.add(newIterator(new int[] { 3, 6, 7, 10, 12 }, new String[] { "3", "6", "7", "10", "12" }));
 		
 		final int[] expected = new int[] {1, 2, 3, 4, 5, 6, 7, 10, 10, 12};
 
 		// comparator
-		Comparator<TestData.Key> comparator = new TestData.KeyComparator();
+		TypeComparator<Integer> comparator = new IntComparator(true);
 
 		// merge iterator
-		MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.comparator);
+		MutableObjectIterator<Tuple2<Integer, String>> iterator = new MergeIterator<>(iterators, this.comparator);
 
 		// check expected order
-		Record rec1 = new Record();
-		Record rec2 = new Record();
-		final Key k1 = new Key();
-		final Key k2 = new Key();
+		Tuple2<Integer, String> rec1 = new Tuple2<>();
+		Tuple2<Integer, String> rec2 = new Tuple2<>();
+		int k1 = 0;
+		int k2 = 0;
 		
 		int pos = 1;
 		
 		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
-		Assert.assertEquals(expected[0], rec1.getField(0, TestData.Key.class).getKey());
+		Assert.assertEquals(expected[0], rec1.f0.intValue());
 		
 		while ((rec2 = iterator.next(rec2)) != null) {
-			k1.setKey(rec1.getField(0, TestData.Key.class).getKey());
-			k2.setKey(rec2.getField(0, TestData.Key.class).getKey());
+			k1 = rec1.f0;
+			k2 = rec2.f0;
 			
 			Assert.assertTrue(comparator.compare(k1, k2) <= 0);
-			Assert.assertEquals(expected[pos++], k2.getKey()); 
+			Assert.assertEquals(expected[pos++], k2); 
 			
-			Record tmp = rec1;
+			Tuple2<Integer, String> tmp = rec1;
 			rec1 = rec2;
 			rec2 = tmp;
 		}
@@ -128,7 +124,7 @@ public class MergeIteratorTest {
 	public void testMergeOfTenStreams() throws Exception
 	{
 		// iterators
-		List<MutableObjectIterator<Record>> iterators = new ArrayList<MutableObjectIterator<Record>>();
+		List<MutableObjectIterator<Tuple2<Integer, String>>> iterators = new ArrayList<>();
 		iterators.add(newIterator(new int[] { 1, 2, 17, 23, 23 }, new String[] { "A", "B", "C", "D", "E" }));
 		iterators.add(newIterator(new int[] { 2, 6, 7, 8, 9 }, new String[] { "A", "B", "C", "D", "E" }));
 		iterators.add(newIterator(new int[] { 4, 10, 11, 11, 12 }, new String[] { "A", "B", "C", "D", "E" }));
@@ -141,26 +137,23 @@ public class MergeIteratorTest {
 		iterators.add(newIterator(new int[] { 8, 8, 14, 14, 15 }, new String[] { "A", "B", "C", "D", "E" }));
 
 		// comparator
-		Comparator<TestData.Key> comparator = new TestData.KeyComparator();
+		TypeComparator<Integer> comparator = new IntComparator(true);
 
 		// merge iterator
-		MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.comparator);
+		MutableObjectIterator<Tuple2<Integer, String>> iterator = new MergeIterator<>(iterators, this.comparator);
 
 		int elementsFound = 1;
 		// check expected order
-		Record rec1 = new Record();
-		Record rec2 = new Record();
-		final Key k1 = new Key();
-		final Key k2 = new Key();
+		Tuple2<Integer, String> rec1 = new Tuple2<>();
+		Tuple2<Integer, String> rec2 = new Tuple2<>();
 		
 		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
 		while ((rec2 = iterator.next(rec2)) != null) {
 			elementsFound++;
-			k1.setKey(rec1.getField(0, TestData.Key.class).getKey());
-			k2.setKey(rec2.getField(0, TestData.Key.class).getKey());
-			Assert.assertTrue(comparator.compare(k1, k2) <= 0);
+
+			Assert.assertTrue(comparator.compare(rec1.f0, rec2.f0) <= 0);
 			
-			Record tmp = rec1;
+			Tuple2<Integer, String> tmp = rec1;
 			rec1 = rec2;
 			rec2 = tmp;
 		}
@@ -172,7 +165,7 @@ public class MergeIteratorTest {
 	public void testInvalidMerge() throws Exception
 	{
 		// iterators
-		List<MutableObjectIterator<Record>> iterators = new ArrayList<MutableObjectIterator<Record>>();
+		List<MutableObjectIterator<Tuple2<Integer, String>>> iterators = new ArrayList<>();
 		iterators.add(newIterator(new int[] { 1, 2, 17, 23, 23 }, new String[] { "A", "B", "C", "D", "E" }));
 		iterators.add(newIterator(new int[] { 2, 6, 7, 8, 9 }, new String[] { "A", "B", "C", "D", "E" }));
 		iterators.add(newIterator(new int[] { 4, 10, 11, 11, 12 }, new String[] { "A", "B", "C", "D", "E" }));
@@ -185,31 +178,26 @@ public class MergeIteratorTest {
 		iterators.add(newIterator(new int[] { 8, 8, 14, 14, 15 }, new String[] { "A", "B", "C", "D", "E" }));
 
 		// comparator
-		Comparator<TestData.Key> comparator = new TestData.KeyComparator();
+		TypeComparator<Integer> comparator = new IntComparator(true);
 
 		// merge iterator
-		MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.comparator);
+		MutableObjectIterator<Tuple2<Integer, String>> iterator = new MergeIterator<>(iterators, this.comparator);
 
 		boolean violationFound = false;
 		
 		// check expected order
-		Record rec1 = new Record();
-		Record rec2 = new Record();
+		Tuple2<Integer, String> rec1 = new Tuple2<>();
+		Tuple2<Integer, String> rec2 = new Tuple2<>();
 		
 		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
 		while ((rec2 = iterator.next(rec2)) != null)
-		{
-			final Key k1 = new Key();
-			final Key k2 = new Key();
-			k1.setKey(rec1.getField(0, TestData.Key.class).getKey());
-			k2.setKey(rec2.getField(0, TestData.Key.class).getKey());
-			
-			if (comparator.compare(k1, k2) > 0) {
+		{			
+			if (comparator.compare(rec1.f0, rec2.f0) > 0) {
 				violationFound = true;
 				break;
 			}
 			
-			Record tmp = rec1;
+			Tuple2<Integer, String> tmp = rec1;
 			rec1 = rec2;
 			rec2 = tmp;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
index 1a6884e..dc517b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
@@ -22,14 +22,10 @@ package org.apache.flink.runtime.operators.sort;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Assert;
 import org.junit.Before;
@@ -43,6 +39,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
 
 /**
  */
@@ -59,77 +57,77 @@ public class NonReusingSortMergeCoGroupIteratorITCase
 	private static final long SEED2 = 231434613412342L;
 
 	// left and right input data generators
-	private Generator generator1;
+	private TupleGenerator generator1;
 
-	private Generator generator2;
+	private TupleGenerator generator2;
 
-	// left and right input RecordReader mocks
-	private MutableObjectIterator<Record> reader1;
+	// left and right input TupleReader mocks
+	private MutableObjectIterator<Tuple2<Integer, String>> reader1;
 
-	private MutableObjectIterator<Record> reader2;
+	private MutableObjectIterator<Tuple2<Integer, String>> reader2;
 	
 	
-	private TypeSerializer<Record> serializer1;
-	private TypeSerializer<Record> serializer2;
-	private TypeComparator<Record> comparator1;
-	private TypeComparator<Record> comparator2;
-	private TypePairComparator<Record, Record> pairComparator;
+	private TypeSerializer<Tuple2<Integer, String>> serializer1;
+	private TypeSerializer<Tuple2<Integer, String>> serializer2;
+	private TypeComparator<Tuple2<Integer, String>> comparator1;
+	private TypeComparator<Tuple2<Integer, String>> comparator2;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator;
 
 
 	@SuppressWarnings("unchecked")
 	@Before
 	public void beforeTest() {
-		this.serializer1 = RecordSerializer.get();
-		this.serializer2 = RecordSerializer.get();
-		this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
+		this.serializer1 = TestData.getIntStringTupleSerializer();
+		this.serializer2 = TestData.getIntStringTupleSerializer();
+		this.comparator1 = TestData.getIntStringTupleComparator();
+		this.comparator2 = TestData.getIntStringTupleComparator();
+		this.pairComparator = new GenericPairComparator(this.comparator1, this.comparator2);
 	}
 	
 	@Test
 	public void testMerge() {
 		try {
 			
-			generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-			generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
 
-			reader1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			reader2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			reader1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			reader2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 
 			// collect expected data
-			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap1 = collectData(generator1, INPUT_1_SIZE);
-			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap2 = collectData(generator2, INPUT_2_SIZE);
-			Map<TestData.Key, List<Collection<TestData.Value>>> expectedCoGroupsMap = coGroupValues(expectedValuesMap1, expectedValuesMap2);
+			Map<Integer, Collection<String>> expectedValuesMap1 = collectData(generator1, INPUT_1_SIZE);
+			Map<Integer, Collection<String>> expectedValuesMap2 = collectData(generator2, INPUT_2_SIZE);
+			Map<Integer, List<Collection<String>>> expectedCoGroupsMap = coGroupValues(expectedValuesMap1, expectedValuesMap2);
 	
 			// reset the generators
 			generator1.reset();
 			generator2.reset();
 	
 			// compare with iterator values
-			NonReusingSortMergeCoGroupIterator<Record, Record> iterator =	new NonReusingSortMergeCoGroupIterator<Record, Record>(
+			NonReusingSortMergeCoGroupIterator<Tuple2<Integer, String>,Tuple2<Integer, String>> iterator =	new NonReusingSortMergeCoGroupIterator<>(
 					this.reader1, this.reader2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
 					this.pairComparator);
 	
 			iterator.open();
 			
-			final TestData.Key key = new TestData.Key();
+			int key = 0;
 			while (iterator.next())
 			{
-				Iterator<Record> iter1 = iterator.getValues1().iterator();
-				Iterator<Record> iter2 = iterator.getValues2().iterator();
+				Iterator<Tuple2<Integer, String>> iter1 = iterator.getValues1().iterator();
+				Iterator<Tuple2<Integer, String>> iter2 = iterator.getValues2().iterator();
 				
-				TestData.Value v1 = null;
-				TestData.Value v2 = null;
+				String v1 = null;
+				String v2 = null;
 				
 				if (iter1.hasNext()) {
-					Record rec = iter1.next();
-					rec.getFieldInto(0, key);
-					v1 = rec.getField(1, TestData.Value.class);
+					Tuple2<Integer, String> rec = iter1.next();
+					key = rec.f0;
+					v1 = rec.f1;
 				}
 				else if (iter2.hasNext()) {
-					Record rec = iter2.next();
-					rec.getFieldInto(0, key);
-					v2 = rec.getField(1, TestData.Value.class);
+					Tuple2<Integer, String> rec = iter2.next();
+					key = rec.f0;
+					v2 = rec.f1;
 				}
 				else {
 					Assert.fail("No input on both sides.");
@@ -138,8 +136,8 @@ public class NonReusingSortMergeCoGroupIteratorITCase
 				// assert that matches for this key exist
 				Assert.assertTrue("No matches for key " + key, expectedCoGroupsMap.containsKey(key));
 				
-				Collection<TestData.Value> expValues1 = expectedCoGroupsMap.get(key).get(0);
-				Collection<TestData.Value> expValues2 = expectedCoGroupsMap.get(key).get(1);
+				Collection<String> expValues1 = expectedCoGroupsMap.get(key).get(0);
+				Collection<String> expValues2 = expectedCoGroupsMap.get(key).get(1);
 				
 				if (v1 != null) {
 					expValues1.remove(v1);
@@ -149,14 +147,14 @@ public class NonReusingSortMergeCoGroupIteratorITCase
 				}
 				
 				while(iter1.hasNext()) {
-					Record rec = iter1.next();
-					Assert.assertTrue("Value not in expected set of first input", expValues1.remove(rec.getField(1, TestData.Value.class)));
+					Tuple2<Integer, String> rec = iter1.next();
+					Assert.assertTrue("Value not in expected set of first input", expValues1.remove(rec.f1));
 				}
 				Assert.assertTrue("Expected set of first input not empty", expValues1.isEmpty());
 				
 				while(iter2.hasNext()) {
-					Record rec = iter2.next();
-					Assert.assertTrue("Value not in expected set of second input", expValues2.remove(rec.getField(1, TestData.Value.class)));
+					Tuple2<Integer, String> rec = iter2.next();
+					Assert.assertTrue("Value not in expected set of second input", expValues2.remove(rec.f1));
 				}
 				Assert.assertTrue("Expected set of second input not empty", expValues2.isEmpty());
 	
@@ -174,28 +172,28 @@ public class NonReusingSortMergeCoGroupIteratorITCase
 
 	// --------------------------------------------------------------------------------------------
 	
-	private Map<TestData.Key, List<Collection<TestData.Value>>> coGroupValues(
-			Map<TestData.Key, Collection<TestData.Value>> leftMap,
-			Map<TestData.Key, Collection<TestData.Value>> rightMap)
+	private Map<Integer, List<Collection<String>>> coGroupValues(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
 	{
-		Map<TestData.Key, List<Collection<TestData.Value>>> map = new HashMap<TestData.Key, List<Collection<TestData.Value>>>(1000);
+		Map<Integer, List<Collection<String>>> map = new HashMap<>(1000);
 
-		Set<TestData.Key> keySet = new HashSet<TestData.Key>(leftMap.keySet());
+		Set<Integer> keySet = new HashSet<>(leftMap.keySet());
 		keySet.addAll(rightMap.keySet());
 		
-		for (TestData.Key key : keySet) {
-			Collection<TestData.Value> leftValues = leftMap.get(key);
-			Collection<TestData.Value> rightValues = rightMap.get(key);
-			ArrayList<Collection<TestData.Value>> list = new ArrayList<Collection<TestData.Value>>(2);
+		for (Integer key : keySet) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
+			ArrayList<Collection<String>> list = new ArrayList<>(2);
 			
 			if (leftValues == null) {
-				list.add(new ArrayList<TestData.Value>(0));
+				list.add(new ArrayList<String>(0));
 			} else {
 				list.add(leftValues);
 			}
 			
 			if (rightValues == null) {
-				list.add(new ArrayList<TestData.Value>(0));
+				list.add(new ArrayList<String>(0));
 			} else {
 				list.add(rightValues);
 			}
@@ -205,22 +203,22 @@ public class NonReusingSortMergeCoGroupIteratorITCase
 		return map;
 	}
 
-	private Map<TestData.Key, Collection<TestData.Value>> collectData(Generator iter, int num)
+	private Map<Integer, Collection<String>> collectData(TupleGenerator iter, int num)
 	throws Exception
 	{
-		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
-		Record pair = new Record();
+		Map<Integer, Collection<String>> map = new HashMap<>();
+		Tuple2<Integer, String> pair = new Tuple2<>();
 		
 		for (int i = 0; i < num; i++) {
 			iter.next(pair);
-			TestData.Key key = pair.getField(0, TestData.Key.class);
+			Integer key = pair.f0;
 			
 			if (!map.containsKey(key)) {
-				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+				map.put(key, new ArrayList<String>());
 			}
 
-			Collection<TestData.Value> values = map.get(key);
-			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+			Collection<String> values = map.get(key);
+			values.add(pair.f1);
 		}
 		return map;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
index f8a8f11..45f20ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
@@ -21,19 +21,16 @@ package org.apache.flink.runtime.operators.sort;
 
 import java.util.List;
 import java.util.Random;
+import org.apache.flink.api.common.typeutils.TypeComparator;
 
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.util.MutableObjectIterator;
 
 import org.junit.After;
@@ -76,10 +73,9 @@ public class NormalizedKeySorterTest {
 		}
 	}
 
-	private NormalizedKeySorter<Record> newSortBuffer(List<MemorySegment> memory) throws Exception {
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		RecordComparator accessors = new RecordComparator(new int[] {0}, new Class[]{ Key.class });
-		return new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
+	private NormalizedKeySorter<Tuple2<Integer, String>> newSortBuffer(List<MemorySegment> memory) throws Exception
+	{
+		return new NormalizedKeySorter<>(TestData.getIntStringTupleSerializer(), TestData.getIntStringTupleComparator(), memory);
 	}
 
 	@Test
@@ -87,12 +83,12 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
 			ValueMode.RANDOM_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		int num = -1;
 		do {
 			generator.next(record);
@@ -102,18 +98,18 @@ public class NormalizedKeySorterTest {
 		
 		// re-read the records
 		generator.reset();
-		Record readTarget = new Record();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
 		
 		int i = 0;
 		while (i < num) {
 			generator.next(record);
 			readTarget = sorter.getRecord(readTarget, i++);
 			
-			Key rk = readTarget.getField(0, Key.class);
-			Key gk = record.getField(0, Key.class);
+			int rk = readTarget.f0;
+			int gk = record.f0;
 			
-			Value rv = readTarget.getField(1, Value.class);
-			Value gv = record.getField(1, Value.class);
+			String rv = readTarget.f1;
+			String gv = record.f1;
 			
 			Assert.assertEquals("The re-read key is wrong", gk, rk);
 			Assert.assertEquals("The re-read value is wrong", gv, rv);
@@ -129,12 +125,12 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
 			ValueMode.RANDOM_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		do {
 			generator.next(record);
 		}
@@ -142,17 +138,17 @@ public class NormalizedKeySorterTest {
 		
 		// re-read the records
 		generator.reset();
-		MutableObjectIterator<Record> iter = sorter.getIterator();
-		Record readTarget = new Record();
+		MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
 		
 		while ((readTarget = iter.next(readTarget)) != null) {
 			generator.next(record);
 			
-			Key rk = readTarget.getField(0, Key.class);
-			Key gk = record.getField(0, Key.class);
+			int rk = readTarget.f0;
+			int gk = record.f0;
 			
-			Value rv = readTarget.getField(1, Value.class);
-			Value gv = record.getField(1, Value.class);
+			String rv = readTarget.f1;
+			String gv = record.f1;
 			
 			Assert.assertEquals("The re-read key is wrong", gk, rk);
 			Assert.assertEquals("The re-read value is wrong", gv, rv);
@@ -168,11 +164,11 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
 		
 		// write the buffer full with the first set of records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		int num = -1;
 		do {
 			generator.next(record);
@@ -183,7 +179,7 @@ public class NormalizedKeySorterTest {
 		sorter.reset();
 		
 		// write a second sequence of records. since the values are of fixed length, we must be able to write an equal number
-		generator = new TestData.Generator(SEED2, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+		generator = new TestData.TupleGenerator(SEED2, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
 		
 		// write the buffer full with the first set of records
 		int num2 = -1;
@@ -197,18 +193,18 @@ public class NormalizedKeySorterTest {
 		
 		// re-read the records
 		generator.reset();
-		Record readTarget = new Record();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
 		
 		int i = 0;
 		while (i < num) {
 			generator.next(record);
 			readTarget = sorter.getRecord(readTarget, i++);
 			
-			Key rk = readTarget.getField(0, Key.class);
-			Key gk = record.getField(0, Key.class);
+			int rk = readTarget.f0;
+			int gk = record.f0;
 			
-			Value rv = readTarget.getField(1, Value.class);
-			Value gv = record.getField(1, Value.class);
+			String rv = readTarget.f1;
+			String gv = record.f1;
 			
 			Assert.assertEquals("The re-read key is wrong", gk, rk);
 			Assert.assertEquals("The re-read value is wrong", gv, rv);
@@ -229,12 +225,12 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
 			ValueMode.RANDOM_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		int num = -1;
 		do {
 			generator.next(record);
@@ -250,18 +246,18 @@ public class NormalizedKeySorterTest {
 		
 		// re-read the records
 		generator.reset();
-		Record readTarget = new Record();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
 		
 		int i = num - 1;
 		while (i >= 0) {
 			generator.next(record);
 			readTarget = sorter.getRecord(readTarget, i--);
 			
-			Key rk = readTarget.getField(0, Key.class);
-			Key gk = record.getField(0, Key.class);
+			int rk = readTarget.f0;
+			int gk = record.f0;
 			
-			Value rv = readTarget.getField(1, Value.class);
-			Value gv = record.getField(1, Value.class);
+			String rv = readTarget.f1;
+			String gv = record.f1;
 			
 			Assert.assertEquals("The re-read key is wrong", gk, rk);
 			Assert.assertEquals("The re-read value is wrong", gv, rv);
@@ -282,12 +278,12 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.SORTED,
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.SORTED,
 			ValueMode.RANDOM_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		int num = -1;
 		do {
 			generator.next(record);
@@ -323,12 +319,12 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
 			ValueMode.RANDOM_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		int num = 0;
 		do {
 			generator.next(record);
@@ -339,26 +335,21 @@ public class NormalizedKeySorterTest {
 		QuickSort qs = new QuickSort();
 		qs.sort(sorter);
 		
-		MutableObjectIterator<Record> iter = sorter.getIterator();
-		Record readTarget = new Record();
-		
-		Key current = new Key();
-		Key last = new Key();
-		
+		MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
+
 		iter.next(readTarget);
-		readTarget.getFieldInto(0, last);
+		int last = readTarget.f0;
 		
 		while ((readTarget = iter.next(readTarget)) != null) {
-			readTarget.getFieldInto(0, current);
+			int current = readTarget.f0;
 			
-			final int cmp = last.compareTo(current);
+			final int cmp = last - current;
 			if (cmp > 0) {
 				Assert.fail("Next key is not larger or equal to previous key.");
 			}
 			
-			Key tmp = current;
-			current = last;
-			last = tmp;
+			last = current;
 		}
 		
 		// release the memory occupied by the buffers
@@ -370,16 +361,16 @@ public class NormalizedKeySorterTest {
 	public void testSortShortStringKeys() throws Exception {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
-
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		RecordComparator accessors = new RecordComparator(new int[] {1}, new Class[]{Value.class});
-		NormalizedKeySorter<Record> sorter = new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
 		
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, 5, KeyMode.RANDOM,
+		@SuppressWarnings("unchecked")
+		TypeComparator<Tuple2<Integer, String>> accessors = TestData.getIntStringTupleTypeInfo().createComparator(new int[]{1}, new boolean[]{true}, 0, null);
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = new NormalizedKeySorter<>(TestData.getIntStringTupleSerializer(), accessors, memory);
+		
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, 5, KeyMode.RANDOM,
 			ValueMode.FIX_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		do {
 			generator.next(record);
 		}
@@ -388,26 +379,21 @@ public class NormalizedKeySorterTest {
 		QuickSort qs = new QuickSort();
 		qs.sort(sorter);
 		
-		MutableObjectIterator<Record> iter = sorter.getIterator();
-		Record readTarget = new Record();
-		
-		Value current = new Value();
-		Value last = new Value();
-		
+		MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
+	
 		iter.next(readTarget);
-		readTarget.getFieldInto(1, last);
+		String last = readTarget.f1;
 		
 		while ((readTarget = iter.next(readTarget)) != null) {
-			readTarget.getFieldInto(1, current);
+			String current = readTarget.f1;
 			
 			final int cmp = last.compareTo(current);
 			if (cmp > 0) {
 				Assert.fail("Next value is not larger or equal to previous value.");
 			}
 			
-			Value tmp = current;
-			current = last;
-			last = tmp;
+			last = current;
 		}
 		
 		// release the memory occupied by the buffers
@@ -420,15 +406,15 @@ public class NormalizedKeySorterTest {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		RecordComparator accessors = new RecordComparator(new int[] {1}, new Class[]{Value.class});
-		NormalizedKeySorter<Record> sorter = new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
+		@SuppressWarnings("unchecked")
+		TypeComparator<Tuple2<Integer, String>> accessors = TestData.getIntStringTupleTypeInfo().createComparator(new int[]{1}, new boolean[]{true}, 0, null);
+		NormalizedKeySorter<Tuple2<Integer, String>> sorter = new NormalizedKeySorter<>(TestData.getIntStringTupleSerializer(), accessors, memory);
 		
-		TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+		TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
 			ValueMode.FIX_LENGTH);
 		
 		// write the records
-		Record record = new Record();
+		Tuple2<Integer, String> record = new Tuple2<>();
 		do {
 			generator.next(record);
 		}
@@ -437,26 +423,21 @@ public class NormalizedKeySorterTest {
 		QuickSort qs = new QuickSort();
 		qs.sort(sorter);
 		
-		MutableObjectIterator<Record> iter = sorter.getIterator();
-		Record readTarget = new Record();
-		
-		Value current = new Value();
-		Value last = new Value();
+		MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
+		Tuple2<Integer, String> readTarget = new Tuple2<>();
 		
 		iter.next(readTarget);
-		readTarget.getFieldInto(1, last);
+		String last = readTarget.f1;
 		
 		while ((readTarget = iter.next(readTarget)) != null) {
-			readTarget.getFieldInto(1, current);
+			String current = readTarget.f1;
 			
 			final int cmp = last.compareTo(current);
 			if (cmp > 0) {
 				Assert.fail("Next value is not larger or equal to previous value.");
 			}
 			
-			Value tmp = current;
-			current = last;
-			last = tmp;
+			last = current;
 		}
 		
 		// release the memory occupied by the buffers

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
index a487a65..d1de5dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
@@ -27,18 +27,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Assert;
 import org.junit.Before;
@@ -59,77 +57,76 @@ public class ReusingSortMergeCoGroupIteratorITCase
 	private static final long SEED2 = 231434613412342L;
 
 	// left and right input data generators
-	private Generator generator1;
+	private TupleGenerator generator1;
 
-	private Generator generator2;
+	private TupleGenerator generator2;
 
-	// left and right input RecordReader mocks
-	private MutableObjectIterator<Record> reader1;
+	// left and right input Tuple2<Integer, String>Reader mocks
+	private MutableObjectIterator<Tuple2<Integer, String>> reader1;
 
-	private MutableObjectIterator<Record> reader2;
+	private MutableObjectIterator<Tuple2<Integer, String>> reader2;
 	
-	
-	private TypeSerializer<Record> serializer1;
-	private TypeSerializer<Record> serializer2;
-	private TypeComparator<Record> comparator1;
-	private TypeComparator<Record> comparator2;
-	private TypePairComparator<Record, Record> pairComparator;
+	private TypeSerializer<Tuple2<Integer, String>> serializer1;
+	private TypeSerializer<Tuple2<Integer, String>> serializer2;
+	private TypeComparator<Tuple2<Integer, String>> comparator1;
+	private TypeComparator<Tuple2<Integer, String>> comparator2;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator;
 
 
 	@SuppressWarnings("unchecked")
 	@Before
 	public void beforeTest() {
-		this.serializer1 = RecordSerializer.get();
-		this.serializer2 = RecordSerializer.get();
-		this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
+		this.serializer1 = TestData.getIntStringTupleSerializer();
+		this.serializer2 = TestData.getIntStringTupleSerializer();
+		this.comparator1 = TestData.getIntStringTupleComparator();
+		this.comparator2 = TestData.getIntStringTupleComparator();
+		this.pairComparator = new GenericPairComparator(comparator1, comparator2);
 	}
 	
 	@Test
 	public void testMerge() {
 		try {
 			
-			generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-			generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
 
-			reader1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			reader2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			reader1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			reader2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 
 			// collect expected data
-			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap1 = collectData(generator1, INPUT_1_SIZE);
-			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap2 = collectData(generator2, INPUT_2_SIZE);
-			Map<TestData.Key, List<Collection<TestData.Value>>> expectedCoGroupsMap = coGroupValues(expectedValuesMap1, expectedValuesMap2);
+			Map<Integer, Collection<String>> expectedStringsMap1 = collectData(generator1, INPUT_1_SIZE);
+			Map<Integer, Collection<String>> expectedStringsMap2 = collectData(generator2, INPUT_2_SIZE);
+			Map<Integer, List<Collection<String>>> expectedCoGroupsMap = coGroupValues(expectedStringsMap1, expectedStringsMap2);
 	
 			// reset the generators
 			generator1.reset();
 			generator2.reset();
 	
 			// compare with iterator values
-			ReusingSortMergeCoGroupIterator<Record, Record> iterator =	new ReusingSortMergeCoGroupIterator<Record, Record>(
+			ReusingSortMergeCoGroupIterator<Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =	new ReusingSortMergeCoGroupIterator<>(
 					this.reader1, this.reader2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
 					this.pairComparator);
 	
 			iterator.open();
 			
-			final TestData.Key key = new TestData.Key();
+			int key = 0;
 			while (iterator.next())
 			{
-				Iterator<Record> iter1 = iterator.getValues1().iterator();
-				Iterator<Record> iter2 = iterator.getValues2().iterator();
+				Iterator<Tuple2<Integer, String>> iter1 = iterator.getValues1().iterator();
+				Iterator<Tuple2<Integer, String>> iter2 = iterator.getValues2().iterator();
 				
-				TestData.Value v1 = null;
-				TestData.Value v2 = null;
+				String v1 = null;
+				String v2 = null;
 				
 				if (iter1.hasNext()) {
-					Record rec = iter1.next();
-					rec.getFieldInto(0, key);
-					v1 = rec.getField(1, TestData.Value.class);
+					Tuple2<Integer, String> rec = iter1.next();
+					key = rec.f0;
+					v1 = rec.f1;
 				}
 				else if (iter2.hasNext()) {
-					Record rec = iter2.next();
-					rec.getFieldInto(0, key);
-					v2 = rec.getField(1, TestData.Value.class);
+					Tuple2<Integer, String> rec = iter2.next();
+					key = rec.f0;
+					v2 = rec.f1;
 				}
 				else {
 					Assert.fail("No input on both sides.");
@@ -138,8 +135,8 @@ public class ReusingSortMergeCoGroupIteratorITCase
 				// assert that matches for this key exist
 				Assert.assertTrue("No matches for key " + key, expectedCoGroupsMap.containsKey(key));
 				
-				Collection<TestData.Value> expValues1 = expectedCoGroupsMap.get(key).get(0);
-				Collection<TestData.Value> expValues2 = expectedCoGroupsMap.get(key).get(1);
+				Collection<String> expValues1 = expectedCoGroupsMap.get(key).get(0);
+				Collection<String> expValues2 = expectedCoGroupsMap.get(key).get(1);
 				
 				if (v1 != null) {
 					expValues1.remove(v1);
@@ -149,14 +146,14 @@ public class ReusingSortMergeCoGroupIteratorITCase
 				}
 				
 				while(iter1.hasNext()) {
-					Record rec = iter1.next();
-					Assert.assertTrue("Value not in expected set of first input", expValues1.remove(rec.getField(1, TestData.Value.class)));
+					Tuple2<Integer, String> rec = iter1.next();
+					Assert.assertTrue("String not in expected set of first input", expValues1.remove(rec.f1));
 				}
 				Assert.assertTrue("Expected set of first input not empty", expValues1.isEmpty());
 				
 				while(iter2.hasNext()) {
-					Record rec = iter2.next();
-					Assert.assertTrue("Value not in expected set of second input", expValues2.remove(rec.getField(1, TestData.Value.class)));
+					Tuple2<Integer, String> rec = iter2.next();
+					Assert.assertTrue("String not in expected set of second input", expValues2.remove(rec.f1));
 				}
 				Assert.assertTrue("Expected set of second input not empty", expValues2.isEmpty());
 	
@@ -174,28 +171,28 @@ public class ReusingSortMergeCoGroupIteratorITCase
 
 	// --------------------------------------------------------------------------------------------
 	
-	private Map<TestData.Key, List<Collection<TestData.Value>>> coGroupValues(
-			Map<TestData.Key, Collection<TestData.Value>> leftMap,
-			Map<TestData.Key, Collection<TestData.Value>> rightMap)
+	private Map<Integer, List<Collection<String>>> coGroupValues(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
 	{
-		Map<TestData.Key, List<Collection<TestData.Value>>> map = new HashMap<TestData.Key, List<Collection<TestData.Value>>>(1000);
+		Map<Integer, List<Collection<String>>> map = new HashMap<>(1000);
 
-		Set<TestData.Key> keySet = new HashSet<TestData.Key>(leftMap.keySet());
+		Set<Integer> keySet = new HashSet<>(leftMap.keySet());
 		keySet.addAll(rightMap.keySet());
 		
-		for (TestData.Key key : keySet) {
-			Collection<TestData.Value> leftValues = leftMap.get(key);
-			Collection<TestData.Value> rightValues = rightMap.get(key);
-			ArrayList<Collection<TestData.Value>> list = new ArrayList<Collection<TestData.Value>>(2);
+		for (Integer key : keySet) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
+			ArrayList<Collection<String>> list = new ArrayList<>(2);
 			
 			if (leftValues == null) {
-				list.add(new ArrayList<TestData.Value>(0));
+				list.add(new ArrayList<String>(0));
 			} else {
 				list.add(leftValues);
 			}
 			
 			if (rightValues == null) {
-				list.add(new ArrayList<TestData.Value>(0));
+				list.add(new ArrayList<String>(0));
 			} else {
 				list.add(rightValues);
 			}
@@ -205,22 +202,22 @@ public class ReusingSortMergeCoGroupIteratorITCase
 		return map;
 	}
 
-	private Map<TestData.Key, Collection<TestData.Value>> collectData(Generator iter, int num)
+	private Map<Integer, Collection<String>> collectData(TupleGenerator iter, int num)
 	throws Exception
 	{
-		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
-		Record pair = new Record();
+		Map<Integer, Collection<String>> map = new HashMap<>();
+		Tuple2<Integer, String> pair = new Tuple2<>();
 		
 		for (int i = 0; i < num; i++) {
 			iter.next(pair);
-			TestData.Key key = pair.getField(0, TestData.Key.class);
+			int key = pair.f0;
 			
 			if (!map.containsKey(key)) {
-				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+				map.put(key, new ArrayList<String>());
 			}
 
-			Collection<TestData.Value> values = map.get(key);
-			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+			Collection<String> values = map.get(key);
+			values.add(pair.f1);
 		}
 		return map;
 	}


[4/6] flink git commit: [FLINK-2479] Refactor runtime.operators.* tests

Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index 86f879a..5a4fc6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -22,10 +22,6 @@ package org.apache.flink.runtime.operators.hash;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -33,22 +29,16 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.HashTableITCase.ConstantsKeyValuePairsIterator;
 import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.RecordMatch;
-import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase
-		.RecordMatchRemovingJoin;
+import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.TupleMatch;
+import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.TupleMatchRemovingJoin;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
@@ -63,6 +53,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
 
 import static org.junit.Assert.fail;
 
@@ -86,39 +80,35 @@ public class NonReusingReOpenableHashTableITCase {
 	private IOManager ioManager;
 	private MemoryManager memoryManager;
 
-	private TypeSerializer<Record> recordSerializer;
-	private TypeComparator<Record> record1Comparator;
-	private TypeComparator<Record> record2Comparator;
-	private TypePairComparator<Record, Record> recordPairComparator;
+	private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+	private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+	private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
 
 
 
 
 	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
-	private TypeSerializer<Record> recordBuildSideAccesssor;
-	private TypeSerializer<Record> recordProbeSideAccesssor;
-	private TypeComparator<Record> recordBuildSideComparator;
-	private TypeComparator<Record> recordProbeSideComparator;
-	private TypePairComparator<Record, Record> pactRecordComparator;
+	private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
+	private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
+	private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
+	private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
+	private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
 
 	@SuppressWarnings({"unchecked", "rawtypes"})
 	@Before
 	public void beforeTest() {
-		this.recordSerializer = RecordSerializer.get();
+		this.recordSerializer = TestData.getIntStringTupleSerializer();
 
-		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {Key.class});
-		this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {Key.class});
-		this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {Key.class});
+		this.record1Comparator = TestData.getIntStringTupleComparator();
+		this.record2Comparator = TestData.getIntStringTupleComparator();
+		this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
 
-
-		final int[] keyPos = new int[] {0};
-		final Class<? extends Key>[] keyType = (Class<? extends Key>[]) new Class[] { IntValue.class };
-
-		this.recordBuildSideAccesssor = RecordSerializer.get();
-		this.recordProbeSideAccesssor = RecordSerializer.get();
-		this.recordBuildSideComparator = new RecordComparator(keyPos, keyType);
-		this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
-		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
+		this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
+		this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
+		this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
+		this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
+		this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
 
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 		this.ioManager = new IOManagerAsync();
@@ -153,11 +143,11 @@ public class NonReusingReOpenableHashTableITCase {
 		int buildSize = 1000;
 		int probeSize = 1000;
 		try {
-			Generator bgen = new Generator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TupleGenerator bgen = new TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+			TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
 
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
 			doTest(buildInput,probeInput, bgen, pgen);
 		}
 		catch (Exception e) {
@@ -175,11 +165,11 @@ public class NonReusingReOpenableHashTableITCase {
 		int buildSize = 1000;
 		int probeSize = 1000;
 		try {
-			Generator bgen = new Generator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TupleGenerator bgen = new TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
 
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
 			doTest(buildInput,probeInput, bgen, pgen);
 		}
 		catch (Exception e) {
@@ -198,11 +188,11 @@ public class NonReusingReOpenableHashTableITCase {
 		int buildSize = 1000;
 		int probeSize = 1000;
 		try {
-			Generator bgen = new Generator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TupleGenerator bgen = new TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TupleGenerator pgen = new TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
 
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
 
 			doTest(buildInput,probeInput, bgen, pgen);
 		}
@@ -212,21 +202,21 @@ public class NonReusingReOpenableHashTableITCase {
 		}
 	}
 
-	private void doTest(TestData.GeneratorIterator buildInput, TestData.GeneratorIterator probeInput, Generator bgen, Generator pgen) throws Exception {
+	private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator pgen) throws Exception {
 		// collect expected data
-		final Map<Key, Collection<RecordMatch>> expectedFirstMatchesMap = NonReusingHashMatchIteratorITCase.matchRecordValues(NonReusingHashMatchIteratorITCase.collectRecordData(buildInput), NonReusingHashMatchIteratorITCase.collectRecordData(probeInput));
+		final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = NonReusingHashMatchIteratorITCase.matchSecondTupleFields(NonReusingHashMatchIteratorITCase.collectTupleData(buildInput), NonReusingHashMatchIteratorITCase.collectTupleData(probeInput));
 
-		final List<Map<Key, Collection<RecordMatch>>> expectedNMatchesMapList = new ArrayList<Map<Key,Collection<RecordMatch>>>(NUM_PROBES);
-		final JoinFunction[] nMatcher = new RecordMatchRemovingJoin[NUM_PROBES];
+		final List<Map<Integer, Collection<TupleMatch>>> expectedNMatchesMapList = new ArrayList<>(NUM_PROBES);
+		final FlatJoinFunction[] nMatcher = new TupleMatchRemovingJoin[NUM_PROBES];
 		for(int i = 0; i < NUM_PROBES; i++) {
-			Map<Key, Collection<RecordMatch>> tmp;
+			Map<Integer, Collection<TupleMatch>> tmp;
 			expectedNMatchesMapList.add(tmp = deepCopy(expectedFirstMatchesMap));
-			nMatcher[i] = new RecordMatchRemovingJoin(tmp);
+			nMatcher[i] = new TupleMatchRemovingJoin(tmp);
 		}
 
-		final JoinFunction firstMatcher = new RecordMatchRemovingJoin(expectedFirstMatchesMap);
+		final FlatJoinFunction firstMatcher = new TupleMatchRemovingJoin(expectedFirstMatchesMap);
 
-		final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+		final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 
 		// reset the generators
 		bgen.reset();
@@ -235,8 +225,8 @@ public class NonReusingReOpenableHashTableITCase {
 		probeInput.reset();
 
 		// compare with iterator values
-		NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record> iterator =
-				new NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
+		NonReusingBuildFirstReOpenableHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new NonReusingBuildFirstReOpenableHashMatchIterator<>(
 						buildInput, probeInput, this.recordSerializer, this.record1Comparator,
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 					this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -246,7 +236,7 @@ public class NonReusingReOpenableHashTableITCase {
 		while (iterator.callWithNextKey(firstMatcher, collector));
 
 		// assert that each expected match was seen for the first input
-		for (Entry<Key, Collection<RecordMatch>> entry : expectedFirstMatchesMap.entrySet()) {
+		for (Entry<Integer, Collection<TupleMatch>> entry : expectedFirstMatchesMap.entrySet()) {
 			if (!entry.getValue().isEmpty()) {
 				Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 			}
@@ -261,7 +251,7 @@ public class NonReusingReOpenableHashTableITCase {
 			while (iterator.callWithNextKey(nMatcher[i], collector));
 
 			// assert that each expected match was seen for the second input
-			for (Entry<Key, Collection<RecordMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -277,16 +267,16 @@ public class NonReusingReOpenableHashTableITCase {
 	//
 	//
 
-	private MutableObjectIterator<Record> getProbeInput(final int numKeys,
+	private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(final int numKeys,
 			final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
-		MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
-		MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
-		MutableObjectIterator<Record> probe3 = new ConstantsKeyValuePairsIterator(repeatedValue2, 23, 5);
-		List<MutableObjectIterator<Record>> probes = new ArrayList<MutableObjectIterator<Record>>();
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = new ArrayList<>();
 		probes.add(probe1);
 		probes.add(probe2);
 		probes.add(probe3);
-		return new UnionIterator<Record>(probes);
+		return new UnionIterator<>(probes);
 	}
 
 	@Test
@@ -304,14 +294,14 @@ public class NonReusingReOpenableHashTableITCase {
 		final int PROBE_VALS_PER_KEY = 10;
 
 		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+		MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
 		builds.add(build1);
 		builds.add(build2);
 		builds.add(build3);
-		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+		MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
 
 
 
@@ -331,40 +321,40 @@ public class NonReusingReOpenableHashTableITCase {
 
 		// ----------------------------------------------------------------------------------------
 
-		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+		final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
 				memSegments, ioManager, true);
 
 		for (int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+			MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
 			if(probe == 0) {
 				join.open(buildInput, probeInput);
 			} else {
 				join.reopenProbe(probeInput);
 			}
 
-			Record record;
-			final Record recordReuse = new Record();
+			Tuple2<Integer, Integer> record;
+			final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
 
 			while (join.nextRecord()) {
 				long numBuildValues = 0;
 
-				final Record probeRec = join.getCurrentProbeRecord();
-				int key = probeRec.getField(0, IntValue.class).getValue();
+				final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+				Integer key = probeRec.f0;
 
-				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+				HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
 				if ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
 				}
 				else {
 					fail("No build side values found for a probe key.");
 				}
 				while ((record = buildSide.next(record)) != null) {
 					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
 				}
 
 				Long contained = map.get(key);
@@ -421,14 +411,14 @@ public class NonReusingReOpenableHashTableITCase {
 		final int PROBE_VALS_PER_KEY = 10;
 
 		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+		MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
 		builds.add(build1);
 		builds.add(build2);
 		builds.add(build3);
-		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+		MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
 
 
 		// allocate the memory for the HashTable
@@ -446,39 +436,39 @@ public class NonReusingReOpenableHashTableITCase {
 
 		// ----------------------------------------------------------------------------------------
 
-		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+		final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
 				memSegments, ioManager, true);
 		
 		for (int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+			MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
 			if (probe == 0) {
 				join.open(buildInput, probeInput);
 			} else {
 				join.reopenProbe(probeInput);
 			}
-			Record record;
-			final Record recordReuse = new Record();
+			Tuple2<Integer, Integer> record;
+			final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
 
 			while (join.nextRecord()) {
 				long numBuildValues = 0;
 
-				final Record probeRec = join.getCurrentProbeRecord();
-				int key = probeRec.getField(0, IntValue.class).getValue();
+				final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+				Integer key = probeRec.f0;
 
-				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+				HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
 				if ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
 				}
 				else {
 					fail("No build side values found for a probe key.");
 				}
 				while ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
 				}
 
 				Long contained = map.get(key);
@@ -515,11 +505,11 @@ public class NonReusingReOpenableHashTableITCase {
 	}
 
 
-	static Map<Key, Collection<RecordMatch>> deepCopy(Map<Key, Collection<RecordMatch>> expectedSecondMatchesMap) {
-		Map<Key, Collection<RecordMatch>> copy = new HashMap<Key, Collection<RecordMatch>>(expectedSecondMatchesMap.size());
-		for(Entry<Key, Collection<RecordMatch>> entry : expectedSecondMatchesMap.entrySet()) {
-			List<RecordMatch> matches = new ArrayList<RecordMatch>(entry.getValue().size());
-			for(RecordMatch m : entry.getValue()) {
+	static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, Collection<TupleMatch>> expectedSecondMatchesMap) {
+		Map<Integer, Collection<TupleMatch>> copy = new HashMap<>(expectedSecondMatchesMap.size());
+		for(Entry<Integer, Collection<TupleMatch>> entry : expectedSecondMatchesMap.entrySet()) {
+			List<TupleMatch> matches = new ArrayList<TupleMatch>(entry.getValue().size());
+			for(TupleMatch m : entry.getValue()) {
 				matches.add(m);
 			}
 			copy.put(entry.getKey(), matches);

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
index 4fdff76..12f4a32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
@@ -28,13 +28,11 @@ import java.util.Map.Entry;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -44,16 +42,11 @@ import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
 import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
 import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.runtime.operators.testutils.types.IntPair;
-import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
-import org.apache.flink.types.IntValue;
 import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
@@ -77,31 +70,31 @@ public class ReusingHashMatchIteratorITCase {
 	private IOManager ioManager;
 	private MemoryManager memoryManager;
 	
-	private TypeSerializer<Record> recordSerializer;
-	private TypeComparator<Record> record1Comparator;
-	private TypeComparator<Record> record2Comparator;
-	private TypePairComparator<Record, Record> recordPairComparator;
+	private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+	private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+	private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
 	
 	private TypeSerializer<IntPair> pairSerializer;
 	private TypeComparator<IntPair> pairComparator;
-	private TypePairComparator<IntPair, Record> pairRecordPairComparator;
-	private TypePairComparator<Record, IntPair> recordPairPairComparator;
+	private TypePairComparator<IntPair, Tuple2<Integer, String>> pairRecordPairComparator;
+	private TypePairComparator<Tuple2<Integer, String>, IntPair> recordPairPairComparator;
 
 
 	@SuppressWarnings("unchecked")
 	@Before
 	public void beforeTest() {
-		this.recordSerializer = RecordSerializer.get();
+		this.recordSerializer = TestData.getIntStringTupleSerializer();
 		
-		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+		this.record1Comparator = TestData.getIntStringTupleComparator();
+		this.record2Comparator = TestData.getIntStringTupleComparator();
 		
-		this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+		this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
 		
 		this.pairSerializer = new IntPairSerializer();
-		this.pairComparator = new IntPairComparator();
-		this.pairRecordPairComparator = new IntPairRecordPairComparator();
-		this.recordPairPairComparator = new RecordIntPairPairComparator();
+		this.pairComparator = new TestData.IntPairComparator();
+		this.pairRecordPairComparator = new IntPairTuplePairComparator();
+		this.recordPairPairComparator = new TupleIntPairPairComparator();
 		
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManagerAsync();
@@ -129,19 +122,19 @@ public class ReusingHashMatchIteratorITCase {
 	@Test
 	public void testBuildFirst() {
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
 			// reset the generators
 			generator1.reset();
@@ -150,8 +143,8 @@ public class ReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values
-			ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
-					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+			ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 						this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -163,7 +156,7 @@ public class ReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -187,31 +180,31 @@ public class ReusingHashMatchIteratorITCase {
 		final int DUPLICATE_KEY = 13;
 		
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
-			final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+			final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
 			
-			final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
 			inList1.add(gen1Iter);
 			inList1.add(const1Iter);
 			
-			final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 			
-			MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
-			MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+			MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+			MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
 			
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
 			// re-create the whole thing for actual processing
 			
@@ -231,14 +224,14 @@ public class ReusingHashMatchIteratorITCase {
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 	
-			input1 = new UnionIterator<Record>(inList1);
-			input2 = new UnionIterator<Record>(inList2);
+			input1 = new UnionIterator<Tuple2<Integer, String>>(inList1);
+			input2 = new UnionIterator<Tuple2<Integer, String>>(inList2);
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
-			ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
-					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+			ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 						this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -250,7 +243,7 @@ public class ReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -265,19 +258,19 @@ public class ReusingHashMatchIteratorITCase {
 	@Test
 	public void testBuildSecond() {
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
 			// reset the generators
 			generator1.reset();
@@ -286,8 +279,8 @@ public class ReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values			
-			ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
-				new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+			ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>,Tuple2<Integer, String> ,Tuple2<Integer, String> > iterator =
+				new ReusingBuildSecondHashMatchIterator<>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 					this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -299,7 +292,7 @@ public class ReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -323,31 +316,31 @@ public class ReusingHashMatchIteratorITCase {
 		final int DUPLICATE_KEY = 13;
 		
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
-			final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+			final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
 			
-			final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
 			inList1.add(gen1Iter);
 			inList1.add(const1Iter);
 			
-			final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 			
-			MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
-			MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+			MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+			MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
 			
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
 			// re-create the whole thing for actual processing
 			
@@ -367,14 +360,14 @@ public class ReusingHashMatchIteratorITCase {
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 	
-			input1 = new UnionIterator<Record>(inList1);
-			input2 = new UnionIterator<Record>(inList2);
+			input1 = new UnionIterator<>(inList1);
+			input2 = new UnionIterator<>(inList2);
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
-			ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
-				new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+			ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new ReusingBuildSecondHashMatchIterator<>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 					this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -386,7 +379,7 @@ public class ReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -403,16 +396,16 @@ public class ReusingHashMatchIteratorITCase {
 		try {
 			MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
 			
-			final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+			final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
 				collectIntPairData(input1),
-				collectRecordData(input2));
+				collectTupleData(input2));
 			
-			final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
 			// reset the generators
 			input1 = new UniformIntPairGenerator(500, 40, false);
@@ -420,8 +413,8 @@ public class ReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values
-			ReusingBuildSecondHashMatchIterator<IntPair, Record, Record> iterator =
-					new ReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
+			ReusingBuildSecondHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildSecondHashMatchIterator<>(
 						input1, input2, this.pairSerializer, this.pairComparator,
 						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
 						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
@@ -433,7 +426,7 @@ public class ReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -450,16 +443,16 @@ public class ReusingHashMatchIteratorITCase {
 		try {
 			MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
 			
-			final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+			final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
 				collectIntPairData(input1),
-				collectRecordData(input2));
+				collectTupleData(input2));
 			
-			final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
 			// reset the generators
 			input1 = new UniformIntPairGenerator(500, 40, false);
@@ -467,8 +460,8 @@ public class ReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values
-			ReusingBuildFirstHashMatchIterator<IntPair, Record, Record> iterator =
-					new ReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
+			ReusingBuildFirstHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.pairSerializer, this.pairComparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
 						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
@@ -480,7 +473,7 @@ public class ReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -498,29 +491,29 @@ public class ReusingHashMatchIteratorITCase {
 
 	
 	
-	static Map<TestData.Key, Collection<RecordMatch>> matchRecordValues(
-			Map<TestData.Key, Collection<TestData.Value>> leftMap,
-			Map<TestData.Key, Collection<TestData.Value>> rightMap)
+	static Map<Integer, Collection<TupleMatch>> matchSecondTupleFields(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
 	{
-		Map<TestData.Key, Collection<RecordMatch>> map = new HashMap<TestData.Key, Collection<RecordMatch>>();
+		Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
 
-		for (TestData.Key key : leftMap.keySet()) {
-			Collection<TestData.Value> leftValues = leftMap.get(key);
-			Collection<TestData.Value> rightValues = rightMap.get(key);
+		for (Integer key : leftMap.keySet()) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
 
 			if (rightValues == null) {
 				continue;
 			}
 
 			if (!map.containsKey(key)) {
-				map.put(key, new ArrayList<RecordMatch>());
+				map.put(key, new ArrayList<TupleMatch>());
 			}
 
-			Collection<RecordMatch> matchedValues = map.get(key);
+			Collection<TupleMatch> matchedValues = map.get(key);
 
-			for (TestData.Value leftValue : leftValues) {
-				for (TestData.Value rightValue : rightValues) {
-					matchedValues.add(new RecordMatch(leftValue, rightValue));
+			for (String leftValue : leftValues) {
+				for (String rightValue : rightValues) {
+					matchedValues.add(new TupleMatch(leftValue, rightValue));
 				}
 			}
 		}
@@ -528,32 +521,32 @@ public class ReusingHashMatchIteratorITCase {
 		return map;
 	}
 	
-	static Map<TestData.Key, Collection<RecordIntPairMatch>> matchRecordIntPairValues(
+	static Map<Integer, Collection<TupleIntPairMatch>> matchTupleIntPairValues(
 		Map<Integer, Collection<Integer>> leftMap,
-		Map<TestData.Key, Collection<TestData.Value>> rightMap)
+		Map<Integer, Collection<String>> rightMap)
 	{
-		final Map<TestData.Key, Collection<RecordIntPairMatch>> map = new HashMap<TestData.Key, Collection<RecordIntPairMatch>>();
+		final Map<Integer, Collection<TupleIntPairMatch>> map = new HashMap<>();
 	
 		for (Integer i : leftMap.keySet()) {
 			
-			final TestData.Key key = new TestData.Key(i.intValue());
+			final Integer key = new Integer(i.intValue());
 			
 			final Collection<Integer> leftValues = leftMap.get(i);
-			final Collection<TestData.Value> rightValues = rightMap.get(key);
+			final Collection<String> rightValues = rightMap.get(key);
 	
 			if (rightValues == null) {
 				continue;
 			}
 	
 			if (!map.containsKey(key)) {
-				map.put(key, new ArrayList<RecordIntPairMatch>());
+				map.put(key, new ArrayList<TupleIntPairMatch>());
 			}
 	
-			final Collection<RecordIntPairMatch> matchedValues = map.get(key);
+			final Collection<TupleIntPairMatch> matchedValues = map.get(key);
 	
 			for (Integer v : leftValues) {
-				for (TestData.Value val : rightValues) {
-					matchedValues.add(new RecordIntPairMatch(v, val));
+				for (String val : rightValues) {
+					matchedValues.add(new TupleIntPairMatch(v, val));
 				}
 			}
 		}
@@ -562,21 +555,21 @@ public class ReusingHashMatchIteratorITCase {
 	}
 
 	
-	static Map<TestData.Key, Collection<TestData.Value>> collectRecordData(MutableObjectIterator<Record> iter)
+	static Map<Integer, Collection<String>> collectTupleData(MutableObjectIterator<Tuple2<Integer, String>> iter)
 	throws Exception
 	{
-		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
-		Record pair = new Record();
+		Map<Integer, Collection<String>> map = new HashMap<>();
+		Tuple2<Integer, String> pair = new Tuple2<>();
 		
 		while ((pair = iter.next(pair)) != null) {
 
-			TestData.Key key = pair.getField(0, TestData.Key.class);
+			Integer key = pair.f0;
 			if (!map.containsKey(key)) {
-				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+				map.put(key, new ArrayList<String>());
 			}
 
-			Collection<TestData.Value> values = map.get(key);
-			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+			Collection<String> values = map.get(key);
+			values.add(pair.f1);
 		}
 
 		return map;
@@ -585,7 +578,7 @@ public class ReusingHashMatchIteratorITCase {
 	static Map<Integer, Collection<Integer>> collectIntPairData(MutableObjectIterator<IntPair> iter)
 	throws Exception
 	{
-		Map<Integer, Collection<Integer>> map = new HashMap<Integer, Collection<Integer>>();
+		Map<Integer, Collection<Integer>> map = new HashMap<>();
 		IntPair pair = new IntPair();
 		
 		while ((pair = iter.next(pair)) != null) {
@@ -606,19 +599,19 @@ public class ReusingHashMatchIteratorITCase {
 	/**
 	 * Private class used for storage of the expected matches in a hash-map.
 	 */
-	static class RecordMatch {
+	static class TupleMatch {
 		
-		private final Value left;
-		private final Value right;
+		private final String left;
+		private final String right;
 
-		public RecordMatch(Value left, Value right) {
+		public TupleMatch(String left, String right) {
 			this.left = left;
 			this.right = right;
 		}
 
 		@Override
 		public boolean equals(Object obj) {
-			RecordMatch o = (RecordMatch) obj;
+			TupleMatch o = (TupleMatch) obj;
 			return this.left.equals(o.left) && this.right.equals(o.right);
 		}
 		
@@ -636,19 +629,19 @@ public class ReusingHashMatchIteratorITCase {
 	/**
 	 * Private class used for storage of the expected matches in a hash-map.
 	 */
-	static class RecordIntPairMatch
+	static class TupleIntPairMatch
 	{
 		private final int left;
-		private final Value right;
+		private final String right;
 
-		public RecordIntPairMatch(int left, Value right) {
+		public TupleIntPairMatch(int left, String right) {
 			this.left = left;
 			this.right = right;
 		}
 
 		@Override
 		public boolean equals(Object obj) {
-			RecordIntPairMatch o = (RecordIntPairMatch) obj;
+			TupleIntPairMatch o = (TupleIntPairMatch) obj;
 			return this.left == o.left && this.right.equals(o.right);
 		}
 		
@@ -663,28 +656,28 @@ public class ReusingHashMatchIteratorITCase {
 		}
 	}
 	
-	static final class RecordMatchRemovingJoin extends JoinFunction
+	static final class TupleMatchRemovingJoin implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>
 	{
-		private final Map<TestData.Key, Collection<RecordMatch>> toRemoveFrom;
+		private final Map<Integer, Collection<TupleMatch>> toRemoveFrom;
 		
-		protected RecordMatchRemovingJoin(Map<TestData.Key, Collection<RecordMatch>> map) {
+		protected TupleMatchRemovingJoin(Map<Integer, Collection<TupleMatch>> map) {
 			this.toRemoveFrom = map;
 		}
 		
 		@Override
-		public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception
+		public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
 		{
-			TestData.Key key = rec1.getField(0, TestData.Key.class);
-			TestData.Value value1 = rec1.getField(1, TestData.Value.class);
-			TestData.Value value2 = rec2.getField(1, TestData.Value.class);
+			Integer key = rec1.f0;
+			String value1 = rec1.f1;
+			String value2 = rec2.f1;
 			//System.err.println("rec1 key = "+key+"  rec2 key= "+rec2.getField(0, TestData.Key.class));
-			Collection<RecordMatch> matches = this.toRemoveFrom.get(key);
+			Collection<TupleMatch> matches = this.toRemoveFrom.get(key);
 			if (matches == null) {
 				Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
 			}
 			
 			Assert.assertTrue("Produced match was not contained: " + key + " - " + value1 + ":" + value2,
-				matches.remove(new RecordMatch(value1, value2)));
+				matches.remove(new TupleMatch(value1, value2)));
 			
 			if (matches.isEmpty()) {
 				this.toRemoveFrom.remove(key);
@@ -692,32 +685,32 @@ public class ReusingHashMatchIteratorITCase {
 		}
 	}
 	
-	static final class RecordIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Record, Record>
+	static final class TupleIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>>
 	{
-		private final Map<TestData.Key, Collection<RecordIntPairMatch>> toRemoveFrom;
+		private final Map<Integer, Collection<TupleIntPairMatch>> toRemoveFrom;
 		
-		protected RecordIntPairMatchRemovingMatcher(Map<TestData.Key, Collection<RecordIntPairMatch>> map) {
+		protected TupleIntPairMatchRemovingMatcher(Map<Integer, Collection<TupleIntPairMatch>> map) {
 			this.toRemoveFrom = map;
 		}
 		
 		@Override
-		public void join(IntPair rec1, Record rec2, Collector<Record> out) throws Exception
+		public void join(IntPair rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
 		{
 			final int k = rec1.getKey();
 			final int v = rec1.getValue(); 
 			
-			final TestData.Key key = rec2.getField(0, TestData.Key.class);
-			final TestData.Value value = rec2.getField(1, TestData.Value.class);
+			final Integer key = rec2.f0;
+			final String value = rec2.f1;
 			
-			Assert.assertTrue("Key does not match for matching IntPair Record combination.", k == key.getKey()); 
+			Assert.assertTrue("Key does not match for matching IntPair Tuple combination.", k == key); 
 			
-			Collection<RecordIntPairMatch> matches = this.toRemoveFrom.get(key);
+			Collection<TupleIntPairMatch> matches = this.toRemoveFrom.get(key);
 			if (matches == null) {
 				Assert.fail("Match " + key + " - " + v + ":" + value + " is unexpected.");
 			}
 			
 			Assert.assertTrue("Produced match was not contained: " + key + " - " + v + ":" + value,
-				matches.remove(new RecordIntPairMatch(v, value)));
+				matches.remove(new TupleIntPairMatch(v, value)));
 			
 			if (matches.isEmpty()) {
 				this.toRemoveFrom.remove(key);
@@ -725,7 +718,7 @@ public class ReusingHashMatchIteratorITCase {
 		}
 	}
 	
-	static final class IntPairRecordPairComparator extends TypePairComparator<IntPair, Record>
+	static final class IntPairTuplePairComparator extends TypePairComparator<IntPair, Tuple2<Integer, String>>
 	{
 		private int reference;
 		
@@ -735,33 +728,31 @@ public class ReusingHashMatchIteratorITCase {
 		}
 
 		@Override
-		public boolean equalToReference(Record candidate) {
+		public boolean equalToReference(Tuple2<Integer, String> candidate) {
 			try {
-				final IntValue i = candidate.getField(0, IntValue.class);
-				return i.getValue() == this.reference;
+				return candidate.f0 == this.reference;
 			} catch (NullPointerException npex) {
 				throw new NullKeyFieldException();
 			}
 		}
 
 		@Override
-		public int compareToReference(Record candidate) {
+		public int compareToReference(Tuple2<Integer, String> candidate) {
 			try {
-				final IntValue i = candidate.getField(0, IntValue.class);
-				return i.getValue() - this.reference;
+				return candidate.f0 - this.reference;
 			} catch (NullPointerException npex) {
 				throw new NullKeyFieldException();
 			}
 		}
 	}
 	
-	static final class RecordIntPairPairComparator extends TypePairComparator<Record, IntPair>
+	static final class TupleIntPairPairComparator extends TypePairComparator<Tuple2<Integer, String>, IntPair>
 	{
 		private int reference;
 		
 		@Override
-		public void setReference(Record reference) {
-			this.reference = reference.getField(0, IntValue.class).getValue();
+		public void setReference(Tuple2<Integer, String> reference) {
+			this.reference = reference.f0;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index ba5a325..2a306ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -28,14 +28,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -43,21 +42,16 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.RecordMatch;
-import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.RecordMatchRemovingJoin;
-import org.apache.flink.runtime.operators.hash.HashTableITCase.ConstantsKeyValuePairsIterator;
+import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.TupleMatch;
+import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.TupleMatchRemovingJoin;
 import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
@@ -85,39 +79,35 @@ public class ReusingReOpenableHashTableITCase {
 	private IOManager ioManager;
 	private MemoryManager memoryManager;
 	
-	private TypeSerializer<Record> recordSerializer;
-	private TypeComparator<Record> record1Comparator;
-	private TypeComparator<Record> record2Comparator;
-	private TypePairComparator<Record, Record> recordPairComparator;
+	private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+	private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+	private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
 	
 	
 	
 	
 	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
-	private TypeSerializer<Record> recordBuildSideAccesssor;
-	private TypeSerializer<Record> recordProbeSideAccesssor;
-	private TypeComparator<Record> recordBuildSideComparator;
-	private TypeComparator<Record> recordProbeSideComparator;
-	private TypePairComparator<Record, Record> pactRecordComparator;
+	private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
+	private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
+	private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
+	private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
+	private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
 
 	@SuppressWarnings({"unchecked", "rawtypes"})
 	@Before
 	public void beforeTest() {
-		this.recordSerializer = RecordSerializer.get();
+		this.recordSerializer = TestData.getIntStringTupleSerializer();
 		
-		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+		this.record1Comparator = TestData.getIntStringTupleComparator();
+		this.record2Comparator = TestData.getIntStringTupleComparator();
+		this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
 		
-		
-		final int[] keyPos = new int[] {0};
-		final Class<? extends Key>[] keyType = (Class<? extends Key>[]) new Class[] { IntValue.class };
-		
-		this.recordBuildSideAccesssor = RecordSerializer.get();
-		this.recordProbeSideAccesssor = RecordSerializer.get();
-		this.recordBuildSideComparator = new RecordComparator(keyPos, keyType);
-		this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
-		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
+		this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
+		this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
+		this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
+		this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
+		this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
 		
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 		this.ioManager = new IOManagerAsync();
@@ -152,11 +142,11 @@ public class ReusingReOpenableHashTableITCase {
 		int buildSize = 1000;
 		int probeSize = 1000;
 		try {
-			Generator bgen = new Generator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+			TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
 			
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
 			doTest(buildInput,probeInput, bgen, pgen);
 		}
 		catch (Exception e) {
@@ -174,11 +164,11 @@ public class ReusingReOpenableHashTableITCase {
 		int buildSize = 1000;
 		int probeSize = 1000;
 		try {
-			Generator bgen = new Generator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
 			
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
 			doTest(buildInput,probeInput, bgen, pgen);
 		}
 		catch (Exception e) {
@@ -197,11 +187,11 @@ public class ReusingReOpenableHashTableITCase {
 		int buildSize = 1000;
 		int probeSize = 1000;
 		try {
-			Generator bgen = new Generator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
 			
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
 			
 			doTest(buildInput,probeInput, bgen, pgen);
 		}
@@ -211,21 +201,21 @@ public class ReusingReOpenableHashTableITCase {
 		}
 	}
 	
-	private void doTest(TestData.GeneratorIterator buildInput, TestData.GeneratorIterator probeInput, Generator bgen, Generator pgen) throws Exception {
+	private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, TestData.TupleGenerator pgen) throws Exception {
 		// collect expected data
-		final Map<TestData.Key, Collection<RecordMatch>> expectedFirstMatchesMap = ReusingHashMatchIteratorITCase.matchRecordValues(ReusingHashMatchIteratorITCase.collectRecordData(buildInput), ReusingHashMatchIteratorITCase.collectRecordData(probeInput));
+		final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = ReusingHashMatchIteratorITCase.matchSecondTupleFields(ReusingHashMatchIteratorITCase.collectTupleData(buildInput), ReusingHashMatchIteratorITCase.collectTupleData(probeInput));
 		
-		final List<Map<TestData.Key, Collection<RecordMatch>>> expectedNMatchesMapList = new ArrayList<Map<Key,Collection<RecordMatch>>>(NUM_PROBES);
-		final JoinFunction[] nMatcher = new RecordMatchRemovingJoin[NUM_PROBES];
+		final List<Map<Integer, Collection<TupleMatch>>> expectedNMatchesMapList = new ArrayList<>(NUM_PROBES);
+		final FlatJoinFunction[] nMatcher = new TupleMatchRemovingJoin[NUM_PROBES];
 		for(int i = 0; i < NUM_PROBES; i++) {
-			Map<TestData.Key, Collection<RecordMatch>> tmp;
+			Map<Integer, Collection<TupleMatch>> tmp;
 			expectedNMatchesMapList.add(tmp = deepCopy(expectedFirstMatchesMap));
-			nMatcher[i] = new RecordMatchRemovingJoin(tmp);
+			nMatcher[i] = new TupleMatchRemovingJoin(tmp);
 		}
 		
-		final JoinFunction firstMatcher = new RecordMatchRemovingJoin(expectedFirstMatchesMap);
+		final FlatJoinFunction firstMatcher = new TupleMatchRemovingJoin(expectedFirstMatchesMap);
 		
-		final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+		final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 
 		// reset the generators
 		bgen.reset();
@@ -234,8 +224,8 @@ public class ReusingReOpenableHashTableITCase {
 		probeInput.reset();
 
 		// compare with iterator values
-		ReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record> iterator =
-				new ReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
+		ReusingBuildFirstReOpenableHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new ReusingBuildFirstReOpenableHashMatchIterator<>(
 						buildInput, probeInput, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 					this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -245,7 +235,7 @@ public class ReusingReOpenableHashTableITCase {
 		while (iterator.callWithNextKey(firstMatcher, collector));
 
 		// assert that each expected match was seen for the first input
-		for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedFirstMatchesMap.entrySet()) {
+		for (Entry<Integer, Collection<TupleMatch>> entry : expectedFirstMatchesMap.entrySet()) {
 			if (!entry.getValue().isEmpty()) {
 				Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 			}
@@ -260,7 +250,7 @@ public class ReusingReOpenableHashTableITCase {
 			while (iterator.callWithNextKey(nMatcher[i], collector));
 			
 			// assert that each expected match was seen for the second input
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -276,16 +266,16 @@ public class ReusingReOpenableHashTableITCase {
 	//
 	//
 	
-	private MutableObjectIterator<Record> getProbeInput(final int numKeys,
+	private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(final int numKeys,
 			final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
-		MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
-		MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
-		MutableObjectIterator<Record> probe3 = new ConstantsKeyValuePairsIterator(repeatedValue2, 23, 5);
-		List<MutableObjectIterator<Record>> probes = new ArrayList<MutableObjectIterator<Record>>();
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = new ArrayList<>();
 		probes.add(probe1);
 		probes.add(probe2);
 		probes.add(probe3);
-		return new UnionIterator<Record>(probes);
+		return new UnionIterator<>(probes);
 	}
 	
 	@Test
@@ -302,14 +292,14 @@ public class ReusingReOpenableHashTableITCase {
 		final int PROBE_VALS_PER_KEY = 10;
 		
 		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+		MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
 		builds.add(build1);
 		builds.add(build2);
 		builds.add(build3);
-		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+		MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
 
 		// allocate the memory for the HashTable
 		List<MemorySegment> memSegments;
@@ -326,40 +316,40 @@ public class ReusingReOpenableHashTableITCase {
 		
 		// ----------------------------------------------------------------------------------------
 		
-		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+		final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
 				memSegments, ioManager, true);
 		
 		for(int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+			MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
 			if(probe == 0) {
 				join.open(buildInput, probeInput);
 			} else {
 				join.reopenProbe(probeInput);
 			}
 		
-			Record record;
-			final Record recordReuse = new Record();
+			Tuple2<Integer, Integer> record;
+			final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
 
 			while (join.nextRecord()) {
 				long numBuildValues = 0;
 		
-				final Record probeRec = join.getCurrentProbeRecord();
-				int key = probeRec.getField(0, IntValue.class).getValue();
+				final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+				Integer key = probeRec.f0;
 				
-				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+				HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
 				if ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); 
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); 
 				}
 				else {
 					fail("No build side values found for a probe key.");
 				}
 				while ((record = buildSide.next(record)) != null) {
 					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
 				}
 				
 				Long contained = map.get(key);
@@ -416,14 +406,14 @@ public class ReusingReOpenableHashTableITCase {
 		final int PROBE_VALS_PER_KEY = 10;
 		
 		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+		MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
 		builds.add(build1);
 		builds.add(build2);
 		builds.add(build3);
-		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+		MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
 	
 
 		// allocate the memory for the HashTable
@@ -441,40 +431,40 @@ public class ReusingReOpenableHashTableITCase {
 		
 		// ----------------------------------------------------------------------------------------
 		
-		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+		final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
 				memSegments, ioManager, true);
 		
 		for (int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+			MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
 			if(probe == 0) {
 				join.open(buildInput, probeInput);
 			} else {
 				join.reopenProbe(probeInput);
 			}
-			Record record;
-			final Record recordReuse = new Record();
+			Tuple2<Integer, Integer> record;
+			final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
 
 			while (join.nextRecord())
 			{	
 				long numBuildValues = 0;
 				
-				final Record probeRec = join.getCurrentProbeRecord();
-				int key = probeRec.getField(0, IntValue.class).getValue();
+				final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+				Integer key = probeRec.f0;
 				
-				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+				HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
 				if ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); 
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); 
 				}
 				else {
 					fail("No build side values found for a probe key.");
 				}
 				while ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
 				}
 				
 				Long contained = map.get(key);
@@ -511,11 +501,11 @@ public class ReusingReOpenableHashTableITCase {
 	}
 	
 	
-	static Map<Key, Collection<RecordMatch>> deepCopy(Map<Key, Collection<RecordMatch>> expectedSecondMatchesMap) {
-		Map<Key, Collection<RecordMatch>> copy = new HashMap<Key, Collection<RecordMatch>>(expectedSecondMatchesMap.size());
-		for(Map.Entry<Key, Collection<RecordMatch>> entry : expectedSecondMatchesMap.entrySet()) {
-			List<RecordMatch> matches = new ArrayList<RecordMatch>(entry.getValue().size());
-			for(RecordMatch m : entry.getValue()) {
+	static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, Collection<TupleMatch>> expectedSecondMatchesMap) {
+		Map<Integer, Collection<TupleMatch>> copy = new HashMap<>(expectedSecondMatchesMap.size());
+		for(Map.Entry<Integer, Collection<TupleMatch>> entry : expectedSecondMatchesMap.entrySet()) {
+			List<TupleMatch> matches = new ArrayList<TupleMatch>(entry.getValue().size());
+			for(TupleMatch m : entry.getValue()) {
 				matches.add(m);
 			}
 			copy.put(entry.getKey(), matches);


[5/6] flink git commit: [FLINK-2479] Refactor runtime.operators.* tests

Posted by fh...@apache.org.
[FLINK-2479] Refactor runtime.operators.* tests

This closes #1160


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fbc18b96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fbc18b96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fbc18b96

Branch: refs/heads/master
Commit: fbc18b96a86bc54da189f713ed01370524558249
Parents: d9e32da
Author: zentol <s....@web.de>
Authored: Sun Sep 20 19:10:26 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Oct 15 11:30:42 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/io/disk/ChannelViewsTest.java | 136 +++--
 .../runtime/io/disk/SpillingBufferTest.java     | 141 ++---
 .../hash/NonReusingHashMatchIteratorITCase.java | 343 +++++------
 .../NonReusingReOpenableHashTableITCase.java    | 190 +++---
 .../hash/ReusingHashMatchIteratorITCase.java    | 335 +++++-----
 .../hash/ReusingReOpenableHashTableITCase.java  | 188 +++---
 .../CombiningUnilateralSortMergerITCase.java    | 144 +++--
 .../operators/sort/ExternalSortITCase.java      | 120 ++--
 .../operators/sort/MergeIteratorTest.java       |  98 ++-
 ...onReusingSortMergeCoGroupIteratorITCase.java | 128 ++--
 .../operators/sort/NormalizedKeySorterTest.java | 171 +++---
 .../ReusingSortMergeCoGroupIteratorITCase.java  | 129 ++--
 .../runtime/operators/testutils/TestData.java   | 608 ++++++++++---------
 .../operators/util/HashVsSortMiniBenchmark.java |  93 ++-
 14 files changed, 1374 insertions(+), 1450 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index a44916a..ba86131 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -36,11 +36,10 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -102,19 +101,19 @@ public class ChannelViewsTest
 	@Test
 	public void testWriteReadSmallRecords() throws Exception
 	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		final FileIOChannel.ID channel = this.ioManager.createChannel();
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
 		final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
-		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		this.memoryManager.release(outView.close());
 		
@@ -125,18 +124,18 @@ public class ChannelViewsTest
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(inView.close());
@@ -146,8 +145,9 @@ public class ChannelViewsTest
 	@Test
 	public void testWriteAndReadLongRecords() throws Exception
 	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		final FileIOChannel.ID channel = this.ioManager.createChannel();
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -155,10 +155,10 @@ public class ChannelViewsTest
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_LONG; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		this.memoryManager.release(outView.close());
 		
@@ -169,15 +169,15 @@ public class ChannelViewsTest
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_LONG; i++) {
 			generator.next(rec);
-			readRec.read(inView);
-			final Key k1 = rec.getField(0, Key.class);
-			final Value v1 = rec.getField(1, Value.class);
-			final Key k2 = readRec.getField(0, Key.class);
-			final Value v2 = readRec.getField(1, Value.class);
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			serializer.deserialize(readRec, inView);
+			final int k1 = rec.f0;
+			final String v1 = rec.f1;
+			final int k2 = readRec.f0;
+			final String v2 = readRec.f1;
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(inView.close());
@@ -187,8 +187,9 @@ public class ChannelViewsTest
 	@Test
 	public void testReadTooMany() throws Exception
 	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		final FileIOChannel.ID channel = this.ioManager.createChannel();
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -196,10 +197,10 @@ public class ChannelViewsTest
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		this.memoryManager.release(outView.close());
 
@@ -211,15 +212,15 @@ public class ChannelViewsTest
 
 		// read and re-generate all records and compare them
 		try {
-			final Record readRec = new Record();
+			final Tuple2<Integer, String> readRec = new Tuple2<>();
 			for (int i = 0; i < NUM_PAIRS_SHORT + 1; i++) {
 				generator.next(rec);
-				readRec.read(inView);
-				final Key k1 = rec.getField(0, Key.class);
-				final Value v1 = rec.getField(1, Value.class);
-				final Key k2 = readRec.getField(0, Key.class);
-				final Value v2 = readRec.getField(1, Value.class);
-				Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+				serializer.deserialize(readRec, inView);
+				final int k1 = rec.f0;
+				final String v1 = rec.f1;
+				final int k2 = readRec.f0;
+				final String v2 = readRec.f1;
+				Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 			}
 			Assert.fail("Expected an EOFException which did not occur.");
 		}
@@ -238,8 +239,9 @@ public class ChannelViewsTest
 	@Test
 	public void testReadWithoutKnownBlockCount() throws Exception
 	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		final FileIOChannel.ID channel = this.ioManager.createChannel();
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -247,10 +249,10 @@ public class ChannelViewsTest
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		this.memoryManager.release(outView.close());
 		
@@ -261,18 +263,18 @@ public class ChannelViewsTest
 		generator.reset();
 		
 		// read and re-generate all records and cmpare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(inView.close());
@@ -282,8 +284,9 @@ public class ChannelViewsTest
 	@Test
 	public void testWriteReadOneBufferOnly() throws Exception
 	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		final FileIOChannel.ID channel = this.ioManager.createChannel();
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, 1);
@@ -291,10 +294,10 @@ public class ChannelViewsTest
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		this.memoryManager.release(outView.close());
 		
@@ -305,18 +308,18 @@ public class ChannelViewsTest
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(inView.close());
@@ -326,8 +329,9 @@ public class ChannelViewsTest
 	@Test
 	public void testWriteReadNotAll() throws Exception
 	{
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 		final FileIOChannel.ID channel = this.ioManager.createChannel();
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -335,10 +339,10 @@ public class ChannelViewsTest
 		final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		this.memoryManager.release(outView.close());
 		
@@ -349,18 +353,18 @@ public class ChannelViewsTest
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_SHORT / 2; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(inView.close());

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index 0b1e0c3..538c416 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -27,11 +27,10 @@ import org.apache.flink.runtime.memory.ListMemorySegmentSource;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -90,7 +89,8 @@ public class SpillingBufferTest {
 	
 	@Test
 	public void testWriteReadInMemory() throws Exception {
-		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -99,10 +99,10 @@ public class SpillingBufferTest {
 							new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		
 		// create the reader input view
@@ -110,18 +110,18 @@ public class SpillingBufferTest {
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		// re-read the data
@@ -131,15 +131,15 @@ public class SpillingBufferTest {
 		// read and re-generate all records and compare them
 		for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(outView.close());
@@ -148,8 +148,9 @@ public class SpillingBufferTest {
 	
 	@Test
 	public void testWriteReadTooMuchInMemory() throws Exception {
-		final TestData.Generator generator = new TestData.Generator(
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(
 				SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -158,10 +159,10 @@ public class SpillingBufferTest {
 							new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		
 		// create the reader input view
@@ -169,19 +170,19 @@ public class SpillingBufferTest {
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		try {
 			for (int i = 0; i < NUM_PAIRS_INMEM + 1; i++) {
 				generator.next(rec);
-				readRec.read(inView);
+				serializer.deserialize(readRec, inView);
 				
-				Key k1 = rec.getField(0, Key.class);
-				Value v1 = rec.getField(1, Value.class);
+				int k1 = rec.f0;
+				String v1 = rec.f1;
 				
-				Key k2 = readRec.getField(0, Key.class);
-				Value v2 = readRec.getField(1, Value.class);
+				int k2 = readRec.f0;
+				String v2 = readRec.f1;
 				
-				Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+				Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 			}
 			Assert.fail("Read too much, expected EOFException.");
 		}
@@ -196,15 +197,15 @@ public class SpillingBufferTest {
 		// read and re-generate all records and compare them
 		for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(outView.close());
@@ -215,8 +216,9 @@ public class SpillingBufferTest {
 	
 	@Test
 	public void testWriteReadExternal() throws Exception {
-		final TestData.Generator generator = new TestData.Generator(
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(
 				SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -225,10 +227,10 @@ public class SpillingBufferTest {
 							new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		
 		// create the reader input view
@@ -236,18 +238,18 @@ public class SpillingBufferTest {
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		// re-read the data
@@ -257,15 +259,15 @@ public class SpillingBufferTest {
 		// read and re-generate all records and compare them
 		for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(outView.close());
@@ -274,8 +276,9 @@ public class SpillingBufferTest {
 
 	@Test
 	public void testWriteReadTooMuchExternal() throws Exception {
-		final TestData.Generator generator = new TestData.Generator(
+		final TestData.TupleGenerator generator = new TestData.TupleGenerator(
 				SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+		final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
 		
 		// create the writer output view
 		final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -284,10 +287,10 @@ public class SpillingBufferTest {
 							new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
 		
 		// write a number of pairs
-		final Record rec = new Record();
+		final Tuple2<Integer, String> rec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
 			generator.next(rec);
-			rec.write(outView);
+			serializer.serialize(rec, outView);
 		}
 		
 		// create the reader input view
@@ -295,19 +298,19 @@ public class SpillingBufferTest {
 		generator.reset();
 		
 		// read and re-generate all records and compare them
-		final Record readRec = new Record();
+		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		try {
 			for (int i = 0; i < NUM_PAIRS_EXTERNAL + 1; i++) {
 				generator.next(rec);
-				readRec.read(inView);
+				serializer.deserialize(readRec, inView);
 				
-				Key k1 = rec.getField(0, Key.class);
-				Value v1 = rec.getField(1, Value.class);
+				int k1 = rec.f0;
+				String v1 = rec.f1;
 				
-				Key k2 = readRec.getField(0, Key.class);
-				Value v2 = readRec.getField(1, Value.class);
+				int k2 = readRec.f0;
+				String v2 = readRec.f1;
 				
-				Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+				Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 			}
 			Assert.fail("Read too much, expected EOFException.");
 		}
@@ -322,15 +325,15 @@ public class SpillingBufferTest {
 		// read and re-generate all records and compare them
 		for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
 			generator.next(rec);
-			readRec.read(inView);
+			serializer.deserialize(readRec, inView);
 			
-			Key k1 = rec.getField(0, Key.class);
-			Value v1 = rec.getField(1, Value.class);
+			int k1 = rec.f0;
+			String v1 = rec.f1;
 			
-			Key k2 = readRec.getField(0, Key.class);
-			Value v2 = readRec.getField(1, Value.class);
+			int k2 = readRec.f0;
+			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(outView.close());

http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
index 2da97e9..1795062 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
@@ -19,15 +19,10 @@
 
 package org.apache.flink.runtime.operators.hash;
 
-import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -35,18 +30,14 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
 import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
 import org.apache.flink.runtime.operators.testutils.UnionIterator;
 import org.apache.flink.runtime.operators.testutils.types.IntPair;
-import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
-import org.apache.flink.types.IntValue;
 import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
@@ -60,6 +51,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
 
 @SuppressWarnings({"serial", "deprecation"})
 public class NonReusingHashMatchIteratorITCase {
@@ -77,31 +70,31 @@ public class NonReusingHashMatchIteratorITCase {
 	private IOManager ioManager;
 	private MemoryManager memoryManager;
 	
-	private TypeSerializer<Record> recordSerializer;
-	private TypeComparator<Record> record1Comparator;
-	private TypeComparator<Record> record2Comparator;
-	private TypePairComparator<Record, Record> recordPairComparator;
+	private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+	private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+	private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
 	
 	private TypeSerializer<IntPair> pairSerializer;
 	private TypeComparator<IntPair> pairComparator;
-	private TypePairComparator<IntPair, Record> pairRecordPairComparator;
-	private TypePairComparator<Record, IntPair> recordPairPairComparator;
+	private TypePairComparator<IntPair, Tuple2<Integer, String>> pairRecordPairComparator;
+	private TypePairComparator<Tuple2<Integer, String>, IntPair> recordPairPairComparator;
 
 
 	@SuppressWarnings("unchecked")
 	@Before
 	public void beforeTest() {
-		this.recordSerializer = RecordSerializer.get();
+		this.recordSerializer = TestData.getIntStringTupleSerializer();
 		
-		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+		this.record1Comparator = TestData.getIntStringTupleComparator();
+		this.record2Comparator = TestData.getIntStringTupleComparator();
 		
-		this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+		this.recordPairComparator = new GenericPairComparator(record1Comparator, record2Comparator);
 		
 		this.pairSerializer = new IntPairSerializer();
-		this.pairComparator = new IntPairComparator();
-		this.pairRecordPairComparator = new IntPairRecordPairComparator();
-		this.recordPairPairComparator = new RecordIntPairPairComparator();
+		this.pairComparator = new TestData.IntPairComparator();
+		this.pairRecordPairComparator = new IntPairTuplePairComparator();
+		this.recordPairPairComparator = new TupleIntPairPairComparator();
 		
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
 		this.ioManager = new IOManagerAsync();
@@ -129,19 +122,19 @@ public class NonReusingHashMatchIteratorITCase {
 	@Test
 	public void testBuildFirst() {
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
 	
 			// reset the generators
 			generator1.reset();
@@ -150,8 +143,8 @@ public class NonReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values
-			NonReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
-					new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+			NonReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 						this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -163,7 +156,7 @@ public class NonReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -187,31 +180,31 @@ public class NonReusingHashMatchIteratorITCase {
 		final int DUPLICATE_KEY = 13;
 		
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
-			final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+			final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
 			
-			final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
 			inList1.add(gen1Iter);
 			inList1.add(const1Iter);
 			
-			final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 			
-			MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
-			MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+			MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+			MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
 			
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
 			// re-create the whole thing for actual processing
 			
@@ -231,14 +224,14 @@ public class NonReusingHashMatchIteratorITCase {
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 	
-			input1 = new UnionIterator<Record>(inList1);
-			input2 = new UnionIterator<Record>(inList2);
+			input1 = new UnionIterator<>(inList1);
+			input2 = new UnionIterator<>(inList2);
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
-			NonReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
-					new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+			NonReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 						this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -250,7 +243,7 @@ public class NonReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -265,19 +258,19 @@ public class NonReusingHashMatchIteratorITCase {
 	@Test
 	public void testBuildSecond() {
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
 			// reset the generators
 			generator1.reset();
@@ -286,8 +279,8 @@ public class NonReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values			
-			NonReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
-				new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+			NonReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new NonReusingBuildSecondHashMatchIterator<>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 					this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -299,7 +292,7 @@ public class NonReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -323,31 +316,31 @@ public class NonReusingHashMatchIteratorITCase {
 		final int DUPLICATE_KEY = 13;
 		
 		try {
-			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			
-			final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
-			final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
-			final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+			final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
 			
-			final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
 			inList1.add(gen1Iter);
 			inList1.add(const1Iter);
 			
-			final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 			
-			MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
-			MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+			MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+			MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
 			
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
-				collectRecordData(input1),
-				collectRecordData(input2));
+			final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+				collectTupleData(input1),
+				collectTupleData(input2));
 			
 			// re-create the whole thing for actual processing
 			
@@ -367,14 +360,14 @@ public class NonReusingHashMatchIteratorITCase {
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 	
-			input1 = new UnionIterator<Record>(inList1);
-			input2 = new UnionIterator<Record>(inList2);
+			input1 = new UnionIterator<>(inList1);
+			input2 = new UnionIterator<>(inList2);
 			
-			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 
-			NonReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
-				new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+			NonReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new NonReusingBuildSecondHashMatchIterator<>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
 					this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -386,7 +379,7 @@ public class NonReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -403,16 +396,16 @@ public class NonReusingHashMatchIteratorITCase {
 		try {
 			MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
 			
-			final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+			final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
 				collectIntPairData(input1),
-				collectRecordData(input2));
+				collectTupleData(input2));
 			
-			final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
 	
 			// reset the generators
 			input1 = new UniformIntPairGenerator(500, 40, false);
@@ -420,8 +413,8 @@ public class NonReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values
-			NonReusingBuildSecondHashMatchIterator<IntPair, Record, Record> iterator =
-					new NonReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
+			NonReusingBuildSecondHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildSecondHashMatchIterator<>(
 						input1, input2, this.pairSerializer, this.pairComparator,
 						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
 						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
@@ -433,7 +426,7 @@ public class NonReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -450,16 +443,16 @@ public class NonReusingHashMatchIteratorITCase {
 		try {
 			MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
 			
-			final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 			
 			// collect expected data
-			final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+			final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
 				collectIntPairData(input1),
-				collectRecordData(input2));
+				collectTupleData(input2));
 			
-			final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+			final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
 	
 			// reset the generators
 			input1 = new UniformIntPairGenerator(500, 40, false);
@@ -467,8 +460,8 @@ public class NonReusingHashMatchIteratorITCase {
 			input2.reset();
 	
 			// compare with iterator values
-			NonReusingBuildFirstHashMatchIterator<IntPair, Record, Record> iterator =
-					new NonReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
+			NonReusingBuildFirstHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new NonReusingBuildFirstHashMatchIterator<>(
 						input1, input2, this.pairSerializer, this.pairComparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
 						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
@@ -480,7 +473,7 @@ public class NonReusingHashMatchIteratorITCase {
 			iterator.close();
 	
 			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+			for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
 					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
 				}
@@ -498,29 +491,29 @@ public class NonReusingHashMatchIteratorITCase {
 
 	
 	
-	static Map<TestData.Key, Collection<RecordMatch>> matchRecordValues(
-			Map<TestData.Key, Collection<TestData.Value>> leftMap,
-			Map<TestData.Key, Collection<TestData.Value>> rightMap)
+	static Map<Integer, Collection<TupleMatch>> matchSecondTupleFields(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
 	{
-		Map<TestData.Key, Collection<RecordMatch>> map = new HashMap<TestData.Key, Collection<RecordMatch>>();
+		Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
 
-		for (TestData.Key key : leftMap.keySet()) {
-			Collection<TestData.Value> leftValues = leftMap.get(key);
-			Collection<TestData.Value> rightValues = rightMap.get(key);
+		for (Integer key : leftMap.keySet()) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
 
 			if (rightValues == null) {
 				continue;
 			}
 
 			if (!map.containsKey(key)) {
-				map.put(key, new ArrayList<RecordMatch>());
+				map.put(key, new ArrayList<TupleMatch>());
 			}
 
-			Collection<RecordMatch> matchedValues = map.get(key);
+			Collection<TupleMatch> matchedValues = map.get(key);
 
-			for (TestData.Value leftValue : leftValues) {
-				for (TestData.Value rightValue : rightValues) {
-					matchedValues.add(new RecordMatch(leftValue, rightValue));
+			for (String leftValue : leftValues) {
+				for (String rightValue : rightValues) {
+					matchedValues.add(new TupleMatch(leftValue, rightValue));
 				}
 			}
 		}
@@ -528,32 +521,30 @@ public class NonReusingHashMatchIteratorITCase {
 		return map;
 	}
 	
-	static Map<TestData.Key, Collection<RecordIntPairMatch>> matchRecordIntPairValues(
+	static Map<Integer, Collection<TupleIntPairMatch>> matchTupleIntPairValues(
 		Map<Integer, Collection<Integer>> leftMap,
-		Map<TestData.Key, Collection<TestData.Value>> rightMap)
+		Map<Integer, Collection<String>> rightMap)
 	{
-		final Map<TestData.Key, Collection<RecordIntPairMatch>> map = new HashMap<TestData.Key, Collection<RecordIntPairMatch>>();
+		final Map<Integer, Collection<TupleIntPairMatch>> map = new HashMap<>();
 	
 		for (Integer i : leftMap.keySet()) {
 			
-			final TestData.Key key = new TestData.Key(i.intValue());
-			
 			final Collection<Integer> leftValues = leftMap.get(i);
-			final Collection<TestData.Value> rightValues = rightMap.get(key);
+			final Collection<String> rightValues = rightMap.get(i);
 	
 			if (rightValues == null) {
 				continue;
 			}
 	
-			if (!map.containsKey(key)) {
-				map.put(key, new ArrayList<RecordIntPairMatch>());
+			if (!map.containsKey(i)) {
+				map.put(i, new ArrayList<TupleIntPairMatch>());
 			}
 	
-			final Collection<RecordIntPairMatch> matchedValues = map.get(key);
+			final Collection<TupleIntPairMatch> matchedValues = map.get(i);
 	
 			for (Integer v : leftValues) {
-				for (TestData.Value val : rightValues) {
-					matchedValues.add(new RecordIntPairMatch(v, val));
+				for (String val : rightValues) {
+					matchedValues.add(new TupleIntPairMatch(v, val));
 				}
 			}
 		}
@@ -562,21 +553,21 @@ public class NonReusingHashMatchIteratorITCase {
 	}
 
 	
-	static Map<TestData.Key, Collection<TestData.Value>> collectRecordData(MutableObjectIterator<Record> iter)
+	static Map<Integer, Collection<String>> collectTupleData(MutableObjectIterator<Tuple2<Integer, String>> iter)
 	throws Exception
 	{
-		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
-		Record pair = new Record();
+		Map<Integer, Collection<String>> map = new HashMap<>();
+		Tuple2<Integer, String> pair = new Tuple2<>();
 		
 		while ((pair = iter.next(pair)) != null) {
 
-			TestData.Key key = pair.getField(0, TestData.Key.class);
+			Integer key = pair.f0;
 			if (!map.containsKey(key)) {
-				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+				map.put(key, new ArrayList<String>());
 			}
 
-			Collection<TestData.Value> values = map.get(key);
-			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+			Collection<String> values = map.get(key);
+			values.add(pair.f1);
 		}
 
 		return map;
@@ -606,19 +597,19 @@ public class NonReusingHashMatchIteratorITCase {
 	/**
 	 * Private class used for storage of the expected matches in a hash-map.
 	 */
-	static class RecordMatch {
+	static class TupleMatch {
 		
-		private final Value left;
-		private final Value right;
+		private final String left;
+		private final String right;
 
-		public RecordMatch(Value left, Value right) {
+		public TupleMatch(String left, String right) {
 			this.left = left;
 			this.right = right;
 		}
 
 		@Override
 		public boolean equals(Object obj) {
-			RecordMatch o = (RecordMatch) obj;
+			TupleMatch o = (TupleMatch) obj;
 			return this.left.equals(o.left) && this.right.equals(o.right);
 		}
 		
@@ -636,19 +627,19 @@ public class NonReusingHashMatchIteratorITCase {
 	/**
 	 * Private class used for storage of the expected matches in a hash-map.
 	 */
-	static class RecordIntPairMatch
+	static class TupleIntPairMatch
 	{
 		private final int left;
-		private final Value right;
+		private final String right;
 
-		public RecordIntPairMatch(int left, Value right) {
+		public TupleIntPairMatch(int left, String right) {
 			this.left = left;
-			this.right = right;
+			this.right = new String(right);
 		}
 
 		@Override
 		public boolean equals(Object obj) {
-			RecordIntPairMatch o = (RecordIntPairMatch) obj;
+			TupleIntPairMatch o = (TupleIntPairMatch) obj;
 			return this.left == o.left && this.right.equals(o.right);
 		}
 		
@@ -663,28 +654,28 @@ public class NonReusingHashMatchIteratorITCase {
 		}
 	}
 	
-	static final class RecordMatchRemovingJoin extends JoinFunction
+	static final class TupleMatchRemovingJoin implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>
 	{
-		private final Map<TestData.Key, Collection<RecordMatch>> toRemoveFrom;
+		private final Map<Integer, Collection<TupleMatch>> toRemoveFrom;
 		
-		protected RecordMatchRemovingJoin(Map<TestData.Key, Collection<RecordMatch>> map) {
+		protected TupleMatchRemovingJoin(Map<Integer, Collection<TupleMatch>> map) {
 			this.toRemoveFrom = map;
 		}
 		
 		@Override
-		public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception
+		public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
 		{
-			TestData.Key key = rec1.getField(0, TestData.Key.class);
-			TestData.Value value1 = rec1.getField(1, TestData.Value.class);
-			TestData.Value value2 = rec2.getField(1, TestData.Value.class);
-			//System.err.println("rec1 key = "+key+"  rec2 key= "+rec2.getField(0, TestData.Key.class));
-			Collection<RecordMatch> matches = this.toRemoveFrom.get(key);
+			int key = rec1.f0;
+			String value1 = rec1.f1;
+			String value2 = rec2.f1;
+			//System.err.println("rec1 key = "+key+"  rec2 key= "+rec2.f0);
+			Collection<TupleMatch> matches = this.toRemoveFrom.get(key);
 			if (matches == null) {
 				Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
 			}
 			
 			Assert.assertTrue("Produced match was not contained: " + key + " - " + value1 + ":" + value2,
-				matches.remove(new RecordMatch(value1, value2)));
+				matches.remove(new TupleMatch(value1, value2)));
 			
 			if (matches.isEmpty()) {
 				this.toRemoveFrom.remove(key);
@@ -692,32 +683,32 @@ public class NonReusingHashMatchIteratorITCase {
 		}
 	}
 	
-	static final class RecordIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Record, Record>
+	static final class TupleIntPairMatchRemovingMatcher implements FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>>
 	{
-		private final Map<TestData.Key, Collection<RecordIntPairMatch>> toRemoveFrom;
+		private final Map<Integer, Collection<TupleIntPairMatch>> toRemoveFrom;
 		
-		protected RecordIntPairMatchRemovingMatcher(Map<TestData.Key, Collection<RecordIntPairMatch>> map) {
+		protected TupleIntPairMatchRemovingMatcher(Map<Integer, Collection<TupleIntPairMatch>> map) {
 			this.toRemoveFrom = map;
 		}
 		
 		@Override
-		public void join(IntPair rec1, Record rec2, Collector<Record> out) throws Exception
+		public void join(IntPair rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
 		{
 			final int k = rec1.getKey();
 			final int v = rec1.getValue(); 
 			
-			final TestData.Key key = rec2.getField(0, TestData.Key.class);
-			final TestData.Value value = rec2.getField(1, TestData.Value.class);
-			
-			Assert.assertTrue("Key does not match for matching IntPair Record combination.", k == key.getKey()); 
+			final Integer key = rec2.f0;
+			final String value = rec2.f1;
+
+			Assert.assertTrue("Key does not match for matching IntPair Tuple combination.", k == key);
 			
-			Collection<RecordIntPairMatch> matches = this.toRemoveFrom.get(key);
+			Collection<TupleIntPairMatch> matches = this.toRemoveFrom.get(key);
 			if (matches == null) {
 				Assert.fail("Match " + key + " - " + v + ":" + value + " is unexpected.");
 			}
 			
 			Assert.assertTrue("Produced match was not contained: " + key + " - " + v + ":" + value,
-				matches.remove(new RecordIntPairMatch(v, value)));
+				matches.remove(new TupleIntPairMatch(v, value)));
 			
 			if (matches.isEmpty()) {
 				this.toRemoveFrom.remove(key);
@@ -725,7 +716,7 @@ public class NonReusingHashMatchIteratorITCase {
 		}
 	}
 	
-	static final class IntPairRecordPairComparator extends TypePairComparator<IntPair, Record>
+	static final class IntPairTuplePairComparator extends TypePairComparator<IntPair, Tuple2<Integer, String>>
 	{
 		private int reference;
 		
@@ -735,33 +726,31 @@ public class NonReusingHashMatchIteratorITCase {
 		}
 
 		@Override
-		public boolean equalToReference(Record candidate) {
+		public boolean equalToReference(Tuple2<Integer, String> candidate) {
 			try {
-				final IntValue i = candidate.getField(0, IntValue.class);
-				return i.getValue() == this.reference;
+				return candidate.f0 == this.reference;
 			} catch (NullPointerException npex) {
 				throw new NullKeyFieldException();
 			}
 		}
 
 		@Override
-		public int compareToReference(Record candidate) {
+		public int compareToReference(Tuple2<Integer, String> candidate) {
 			try {
-				final IntValue i = candidate.getField(0, IntValue.class);
-				return i.getValue() - this.reference;
+				return candidate.f0 - this.reference;
 			} catch (NullPointerException npex) {
 				throw new NullKeyFieldException();
 			}
 		}
 	}
 	
-	static final class RecordIntPairPairComparator extends TypePairComparator<Record, IntPair>
+	static final class TupleIntPairPairComparator extends TypePairComparator<Tuple2<Integer, String>, IntPair>
 	{
 		private int reference;
 		
 		@Override
-		public void setReference(Record reference) {
-			this.reference = reference.getField(0, IntValue.class).getValue();
+		public void setReference(Tuple2<Integer, String> reference) {
+			this.reference = reference.f0;
 		}
 
 		@Override