You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:03 UTC

[21/60] git commit: Refactor TupleTypeInfo and add GenericPairComparator

Refactor TupleTypeInfo and add GenericPairComparator

Now we have TupleTypeInfoBase, TupleSerializerBase, and TupleComparatorBase. They
are now super classes of TupleTypeInfo and the others.

Also rename compare on DataInputView to compareSerialized because Scala
cannot distinguish between the to compare methods for some reason.

This change is necessary for allowing the Scala API to reuse most of the
functionality.

The GenericPairComparator uses the new extractKeys method of
TypeComparator to compare values of any type. This replaces
TuplePairComparator and some other special-case pair comparators. This
is preparatory work for enabling support for Scala Tuples and POJO
comparators.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/57b8e66a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/57b8e66a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/57b8e66a

Branch: refs/heads/master
Commit: 57b8e66ae9a34534efac194d794b11d44914f711
Parents: 7f946ce
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Aug 22 12:24:11 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:57 2014 +0200

----------------------------------------------------------------------
 .../api/common/typeutils/TypeComparator.java    |  18 +-
 .../typeutils/base/BasicTypeComparator.java     |  17 ++
 .../typeutils/base/BooleanComparator.java       |   2 +-
 .../common/typeutils/base/ByteComparator.java   |   2 +-
 .../common/typeutils/base/CharComparator.java   |   2 +-
 .../common/typeutils/base/DoubleComparator.java |   2 +-
 .../common/typeutils/base/FloatComparator.java  |   2 +-
 .../common/typeutils/base/IntComparator.java    |   2 +-
 .../common/typeutils/base/LongComparator.java   |   2 +-
 .../common/typeutils/base/ShortComparator.java  |   2 +-
 .../common/typeutils/base/StringComparator.java |   2 +-
 .../common/typeutils/ComparatorTestBase.java    |  12 +-
 .../flink/api/java/typeutils/TupleTypeInfo.java | 137 +--------
 .../api/java/typeutils/TupleTypeInfoBase.java   | 117 ++++++++
 .../runtime/CopyableValueComparator.java        |  20 +-
 .../runtime/GenericPairComparator.java          |  91 ++++++
 .../runtime/GenericTypeComparator.java          |  17 +-
 .../java/typeutils/runtime/PojoComparator.java  |  26 +-
 .../typeutils/runtime/PojoPairComparator.java   | 153 ----------
 .../runtime/RuntimePairComparatorFactory.java   | 121 +-------
 .../java/typeutils/runtime/TupleComparator.java | 257 ++---------------
 .../typeutils/runtime/TupleComparatorBase.java  | 281 +++++++++++++++++++
 .../runtime/TupleLeadingFieldComparator.java    | 155 ----------
 .../TupleLeadingFieldPairComparator.java        |  70 -----
 .../typeutils/runtime/TuplePairComparator.java  | 105 -------
 .../java/typeutils/runtime/TupleSerializer.java |  73 +----
 .../typeutils/runtime/TupleSerializerBase.java  | 100 +++++++
 .../java/typeutils/runtime/ValueComparator.java |  20 +-
 .../typeutils/runtime/WritableComparator.java   |  18 +-
 .../runtime/record/RecordComparator.java        |  14 +-
 .../api/java/typeutils/TypeInfoParserTest.java  |   6 +-
 .../runtime/GenericPairComparatorTest.java      |  89 ++++++
 .../TupleLeadingFieldComparatorTest.java        |  74 -----
 .../TupleLeadingFieldPairComparatorTest.java    |  66 -----
 .../runtime/TuplePairComparatorTest.java        |  78 -----
 .../operators/sort/NormalizedKeySorter.java     |   2 +-
 .../testutils/types/IntListComparator.java      |  18 +-
 .../testutils/types/IntPairComparator.java      |  18 +-
 .../testutils/types/StringPairComparator.java   |  17 +-
 .../operators/util/OutputEmitterTest.java       |  18 +-
 .../VertexWithAdjacencyListComparator.java      |  20 +-
 .../VertexWithRankAndDanglingComparator.java    |  20 +-
 .../types/VertexWithRankComparator.java         |  20 +-
 43 files changed, 992 insertions(+), 1294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
index ae9e4f1..1958824 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
@@ -177,7 +177,7 @@ public abstract class TypeComparator<T> implements Serializable {
 	 * 
 	 *  @see java.util.Comparator#compare(Object, Object)
 	 */
-	public abstract int compare(DataInputView firstSource, DataInputView secondSource) throws IOException;
+	public abstract int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException;
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -286,7 +286,21 @@ public abstract class TypeComparator<T> implements Serializable {
 	public abstract TypeComparator<T> duplicate();
 	
 	// --------------------------------------------------------------------------------------------
-	
+
+	/**
+	 * Extracts the key fields from a record. This is for use by the PairComparator to provide
+	 * interoperability between different record types.
+	 */
+	public abstract Object[] extractKeys(T record);
+
+	/**
+	 * Get the field comparators. This is used together with {@link #extractKeys(Object)} to provide
+	 * interoperability between different record types.
+	 */
+	public abstract TypeComparator[] getComparators();
+
+	// --------------------------------------------------------------------------------------------
+
 	@SuppressWarnings("rawtypes")
 	public int compareAgainstReference(Comparable[] keys) {
 		throw new UnsupportedOperationException("Workaround hack.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BasicTypeComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BasicTypeComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BasicTypeComparator.java
index 8c6158c..a57bc62 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BasicTypeComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BasicTypeComparator.java
@@ -32,6 +32,12 @@ public abstract class BasicTypeComparator<T extends Comparable<T>> extends TypeC
 	private transient T reference;
 	
 	protected final boolean ascendingComparison;
+
+	// This is used in extractKeys, so that we don't create a new array for every call.
+	private final Comparable[] extractedKey = new Comparable[1];
+
+	// For use by getComparators
+	private final TypeComparator[] comparators = new TypeComparator[] {this};
 	
 
 	protected BasicTypeComparator(boolean ascending) {
@@ -81,6 +87,17 @@ public abstract class BasicTypeComparator<T extends Comparable<T>> extends TypeC
 	}
 
 	@Override
+	public Object[] extractKeys(T record) {
+		extractedKey[0] = record;
+		return extractedKey;
+	}
+
+	@Override
+	public TypeComparator[] getComparators() {
+		return comparators;
+	}
+
+	@Override
 	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
 		throw new UnsupportedOperationException();
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanComparator.java
index 2dd1aa2..25d04b1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanComparator.java
@@ -35,7 +35,7 @@ public final class BooleanComparator extends BasicTypeComparator<Boolean> {
 	}
 
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		final int fs = firstSource.readBoolean() ? 1 : 0;
 		final int ss = secondSource.readBoolean() ? 1 : 0;
 		int comp = fs - ss; 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteComparator.java
index 9d147c4..a301cc2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteComparator.java
@@ -35,7 +35,7 @@ public final class ByteComparator extends BasicTypeComparator<Byte> {
 	}
 
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		byte b1 = firstSource.readByte();
 		byte b2 = secondSource.readByte();
 		int comp = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1)); 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharComparator.java
index 45f3d06..eff0260 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharComparator.java
@@ -35,7 +35,7 @@ public final class CharComparator extends BasicTypeComparator<Character> {
 	}
 
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		char c1 = firstSource.readChar();
 		char c2 = secondSource.readChar();
 		int comp = (c1 < c2 ? -1 : (c1 == c2 ? 0 : 1)); 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleComparator.java
index 39f13bb..3d89760 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleComparator.java
@@ -35,7 +35,7 @@ public final class DoubleComparator extends BasicTypeComparator<Double> {
 	}
 
 	@Override 
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException { 
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		double l1 = firstSource.readDouble(); 
 		double l2 = secondSource.readDouble(); 
 		int comp = (l1 < l2 ? -1 : (l1 > l2 ? 1 : 0)); 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatComparator.java
index 3ca7d0d..7f0d971 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatComparator.java
@@ -35,7 +35,7 @@ public final class FloatComparator extends BasicTypeComparator<Float> {
 	}
 
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		float l1 = firstSource.readFloat();
 		float l2 = secondSource.readFloat();
 		int comp = (l1 < l2 ? -1 : (l1 > l2 ? 1 : 0)); 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntComparator.java
index 7e22d65..9622fb2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntComparator.java
@@ -35,7 +35,7 @@ public final class IntComparator extends BasicTypeComparator<Integer> {
 	}
 
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		int i1 = firstSource.readInt();
 		int i2 = secondSource.readInt();
 		int comp = (i1 < i2 ? -1 : (i1 == i2 ? 0 : 1)); 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongComparator.java
index 28d80b8..2bb0b34 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongComparator.java
@@ -35,7 +35,7 @@ public final class LongComparator extends BasicTypeComparator<Long> {
 	}
 
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		long l1 = firstSource.readLong();
 		long l2 = secondSource.readLong();
 		int comp = (l1 < l2 ? -1 : (l1 == l2 ? 0 : 1)); 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortComparator.java
index efec90b..03707ad 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortComparator.java
@@ -35,7 +35,7 @@ public final class ShortComparator extends BasicTypeComparator<Short> {
 	}
 
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		short s1 = firstSource.readShort();
 		short s2 = secondSource.readShort();
 		int comp = (s1 < s2 ? -1 : (s1 == s2 ? 0 : 1)); 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringComparator.java
index dd706db..5bcb82a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringComparator.java
@@ -42,7 +42,7 @@ public final class StringComparator extends BasicTypeComparator<String> {
 	}
 
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		String s1 = StringValue.readString(firstSource);
 		String s2 = StringValue.readString(secondSource);
 		int comp = s1.compareTo(s2); 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
index cd952d6..5392fa9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
@@ -26,8 +26,6 @@ import java.io.IOException;
 
 import static org.junit.Assert.*;
 
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
@@ -108,7 +106,7 @@ public abstract class ComparatorTestBase<T> {
 				writeSortedData(d, out1);
 				in1 = out1.getInputView();
 
-				assertTrue(comparator.compare(in1, in2) == 0);
+				assertTrue(comparator.compareSerialized(in1, in2) == 0);
 			}
 		} catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -173,16 +171,16 @@ public abstract class ComparatorTestBase<T> {
 					in2 = out2.getInputView();
 
 					if (greater && ascending) {
-						assertTrue(comparator.compare(in1, in2) < 0);
+						assertTrue(comparator.compareSerialized(in1, in2) < 0);
 					}
 					if (greater && !ascending) {
-						assertTrue(comparator.compare(in1, in2) > 0);
+						assertTrue(comparator.compareSerialized(in1, in2) > 0);
 					}
 					if (!greater && ascending) {
-						assertTrue(comparator.compare(in2, in1) > 0);
+						assertTrue(comparator.compareSerialized(in2, in1) > 0);
 					}
 					if (!greater && !ascending) {
-						assertTrue(comparator.compare(in2, in1) < 0);
+						assertTrue(comparator.compareSerialized(in2, in1) < 0);
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index b9dce11..25be7f1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -23,81 +23,29 @@ import java.util.Arrays;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleLeadingFieldComparator;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.types.TypeInformation;
 
-
 //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
 import org.apache.flink.api.java.tuple.*;
 //CHECKSTYLE.ON: AvoidStarImport
 
 
-public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implements CompositeType<T> {
-	
-	private final TypeInformation<?>[] types;
-	
-	private final Class<T> tupleType;
-	
-	
-	public TupleTypeInfo(TypeInformation<?>... types) {
-		if (types == null || types.length == 0 || types.length > Tuple.MAX_ARITY) {
-			throw new IllegalArgumentException();
-		}
 
-		@SuppressWarnings("unchecked")
-		Class<T> typeClass = (Class<T>) CLASSES[types.length - 1];
-		
-		this.types = types;
-		this.tupleType = typeClass;
+public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
+
+	@SuppressWarnings("unchecked")
+	public TupleTypeInfo(TypeInformation<?>... types) {
+		this((Class<T>) CLASSES[types.length - 1], types);
 	}
-	
+
 	public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) {
+		super(tupleType, types);
 		if (types == null || types.length == 0 || types.length > Tuple.MAX_ARITY) {
 			throw new IllegalArgumentException();
 		}
-		
-		this.tupleType = tupleType;
-		this.types = types;
-	}
-	
-	
-	@Override
-	public boolean isBasicType() {
-		return false;
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return true;
-	}
-
-	@Override
-	public int getArity() {
-		return types.length;
-	}
-
-	@Override
-	public Class<T> getTypeClass() {
-		return tupleType;
 	}
 
-	
-	public <X> TypeInformation<X> getTypeAt(int pos) {
-		if (pos < 0 || pos >= this.types.length) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		@SuppressWarnings("unchecked")
-		TypeInformation<X> typed = (TypeInformation<X>) this.types[pos];
-		return typed;
-	}
-	
-	@Override
-	public boolean isKeyType() {
-		return this.isValidKeyType(this);
-	}
-	
 	@Override
 	public TupleSerializer<T> createSerializer() {
 		TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[getArity()];
@@ -118,14 +66,7 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
 		{
 			throw new IllegalArgumentException();
 		}
-		
-		// special case for tuples where field zero is the key field
-		if (logicalKeyFields.length == 1 && logicalKeyFields[0] == 0 && !types[0].isTupleType()) {
-			return createLeadingFieldComparator(orders[0], types[0]);
-		}
-		
-		// --- general case ---
-		
+
 		int maxKey = -1;
 		for (int key : logicalKeyFields){
 			maxKey = Math.max(key, maxKey);
@@ -169,39 +110,12 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
 	}
 	
 	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof TupleTypeInfo) {
-			@SuppressWarnings("unchecked")
-			TupleTypeInfo<T> other = (TupleTypeInfo<T>) obj;
-			return ((this.tupleType == null && other.tupleType == null) || this.tupleType.equals(other.tupleType)) &&
-					Arrays.deepEquals(this.types, other.types);
-			
-		} else {
-			return false;
-		}
-	}
-	
-	@Override
-	public int hashCode() {
-		return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
-	}
-	
+
 	@Override
 	public String toString() {
-		StringBuilder bld = new StringBuilder("Tuple");
-		bld.append(types.length).append('<');
-		bld.append(types[0]);
-		
-		for (int i = 1; i < types.length; i++) {
-			bld.append(", ").append(types[i]);
-		}
-		
-		bld.append('>');
-		return bld.toString();
+		return "Java " + super.toString();
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	
 	public static <X extends Tuple> TupleTypeInfo<X> getBasicTupleTypeInfo(Class<?>... basicTypes) {
@@ -227,23 +141,7 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
 		TupleTypeInfo<X> tupleInfo = (TupleTypeInfo<X>) new TupleTypeInfo<Tuple>(infos);
 		return tupleInfo;
 	}
-	
-	private boolean isValidKeyType(TypeInformation<?> typeInfo) {
-		if(typeInfo instanceof TupleTypeInfo) {
-			TupleTypeInfo<?> tupleType = ((TupleTypeInfo<?>)typeInfo);
-			for(int i=0;i<tupleType.getArity();i++) {
-				if (!isValidKeyType(tupleType.getTypeAt(i))) {
-					return false;
-				}
-			}
-			return true;
-		} else if(typeInfo.isKeyType()) {
-				return true;
-		} else {
-			return false;
-		}
-	}
-	
+
 	// --------------------------------------------------------------------------------------------	
 	// The following lines are generated.
 	// --------------------------------------------------------------------------------------------
@@ -254,15 +152,4 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
 		Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class
 	};
 	// END_OF_TUPLE_DEPENDENT_CODE
-	
-	
-	private static final <T extends Tuple, K> TypeComparator<T> createLeadingFieldComparator(boolean ascending, TypeInformation<?> info) {
-		if (!(info.isKeyType() && info instanceof AtomicType)) {
-			throw new IllegalArgumentException("The field at position 0 (" + info + ") is no atomic key type.");
-		}
-		
-		@SuppressWarnings("unchecked")
-		AtomicType<K> typedInfo = (AtomicType<K>) info;
-		return new TupleLeadingFieldComparator<T, K>(typedInfo.createComparator(ascending));
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
new file mode 100644
index 0000000..bca29dc
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.util.Arrays;
+
+import org.apache.flink.types.TypeInformation;
+
+public abstract class TupleTypeInfoBase<T> extends TypeInformation<T> implements CompositeType<T> {
+	
+	protected final TypeInformation<?>[] types;
+	
+	protected final Class<T> tupleType;
+	
+	public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... types) {
+		this.tupleType = tupleType;
+		this.types = types;
+	}
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return true;
+	}
+
+	@Override
+	public int getArity() {
+		return types.length;
+	}
+
+	@Override
+	public Class<T> getTypeClass() {
+		return tupleType;
+	}
+
+	
+	public <X> TypeInformation<X> getTypeAt(int pos) {
+		if (pos < 0 || pos >= this.types.length) {
+			throw new IndexOutOfBoundsException();
+		}
+
+		@SuppressWarnings("unchecked")
+		TypeInformation<X> typed = (TypeInformation<X>) this.types[pos];
+		return typed;
+	}
+	
+	@Override
+	public boolean isKeyType() {
+		return isValidKeyType(this);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof TupleTypeInfoBase) {
+			@SuppressWarnings("unchecked")
+			TupleTypeInfoBase<T> other = (TupleTypeInfoBase<T>) obj;
+			return ((this.tupleType == null && other.tupleType == null) || this.tupleType.equals(other.tupleType)) &&
+					Arrays.deepEquals(this.types, other.types);
+			
+		} else {
+			return false;
+		}
+	}
+	
+	@Override
+	public int hashCode() {
+		return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
+	}
+
+	private boolean isValidKeyType(TypeInformation<?> typeInfo) {
+		if(typeInfo instanceof TupleTypeInfoBase) {
+			TupleTypeInfoBase<?> tupleType = ((TupleTypeInfoBase<?>)typeInfo);
+			for(int i=0;i<tupleType.getArity();i++) {
+				if (!isValidKeyType(tupleType.getTypeAt(i))) {
+					return false;
+				}
+			}
+			return true;
+		} else  {
+			return typeInfo.isKeyType();
+		}
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder bld = new StringBuilder("Tuple");
+		bld.append(types.length).append('<');
+		bld.append(types[0]);
+		
+		for (int i = 1; i < types.length; i++) {
+			bld.append(", ").append(types[i]);
+		}
+		
+		bld.append('>');
+		return bld.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
index cf692ac..6911580 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
@@ -42,8 +42,11 @@ public class CopyableValueComparator<T extends CopyableValue<T> & Comparable<T>>
 	private transient T reference;
 	
 	private transient T tempReference;
-	
-	
+
+	private final Comparable[] extractedKey = new Comparable[1];
+
+	private final TypeComparator[] comparators = new TypeComparator[] {this};
+
 	public CopyableValueComparator(boolean ascending, Class<T> type) {
 		this.type = type;
 		this.ascendingComparison = ascending;
@@ -79,7 +82,7 @@ public class CopyableValueComparator<T extends CopyableValue<T> & Comparable<T>>
 	}
 	
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		if (tempReference == null) {
 			tempReference = InstantiationUtil.instantiate(type, CopyableValue.class);
 		}
@@ -121,6 +124,17 @@ public class CopyableValueComparator<T extends CopyableValue<T> & Comparable<T>>
 	public TypeComparator<T> duplicate() {
 		return new CopyableValueComparator<T>(ascendingComparison, type);
 	}
+
+	@Override
+	public Object[] extractKeys(T record) {
+		extractedKey[0] = record;
+		return extractedKey;
+	}
+
+	@Override
+	public TypeComparator[] getComparators() {
+		return comparators;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	// unsupported normalization

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparator.java
new file mode 100644
index 0000000..d47c1ef
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparator.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+
+
+public class GenericPairComparator<T1, T2> extends TypePairComparator<T1, T2>
+		implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeComparator<T1> comparator1;
+	private final TypeComparator<T2> comparator2;
+
+	private final TypeComparator<Object>[] comparators1;
+	private final TypeComparator<Object>[] comparators2;
+
+	private final Object[] referenceKeyFields;
+
+	@SuppressWarnings("unchecked")
+	public GenericPairComparator(TypeComparator<T1> comparator1, TypeComparator<T2> comparator2) {
+		this.comparator1 = comparator1;
+		this.comparator2 = comparator2;
+		this.comparators1 = comparator1.getComparators();
+		this.comparators2 = comparator2.getComparators();
+
+		if(comparators1.length != comparators2.length) {
+			throw new IllegalArgumentException("Number of key fields and comparators differ.");
+		}
+
+		int numKeys = comparators1.length;
+		
+		for(int i = 0; i < numKeys; i++) {
+			this.comparators1[i] = comparators1[i].duplicate();
+			this.comparators2[i] = comparators2[i].duplicate();
+		}
+
+		this.referenceKeyFields = new Object[numKeys];
+	}
+	
+	@Override
+	public void setReference(T1 reference) {
+		Object[] keys = comparator1.extractKeys(reference);
+		System.arraycopy(keys, 0, referenceKeyFields, 0, keys.length);
+	}
+
+	@Override
+	public boolean equalToReference(T2 candidate) {
+		Object[] keys = comparator2.extractKeys(candidate);
+		for (int i = 0; i < this.comparators1.length; i++) {
+			if (this.comparators1[i].compare(referenceKeyFields[i], keys[i]) != 0) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Override
+	public int compareToReference(T2 candidate) {
+		Object[] keys = comparator2.extractKeys(candidate);
+		for (int i = 0; i < this.comparators1.length; i++) {
+			// We reverse ordering here because our "compareToReference" does work in a mirrored
+			// way compared to Comparable.compareTo
+			int res = this.comparators1[i].compare(keys[i], referenceKeyFields[i]);
+			if(res != 0) {
+				return res;
+			}
+		}
+		return 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
index d78b491..602b1cb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
@@ -52,6 +52,10 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat
 
 	private transient Kryo kryo;
 
+	private final Comparable[] extractedKey = new Comparable[1];
+
+	private final TypeComparator[] comparators = new TypeComparator[] {this};
+
 	// ------------------------------------------------------------------------
 
 	public GenericTypeComparator(boolean ascending, TypeSerializer<T> serializer, Class<T> type) {
@@ -100,7 +104,7 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat
 	}
 
 	@Override
-	public int compare(final DataInputView firstSource, final DataInputView secondSource) throws IOException {
+	public int compareSerialized(final DataInputView firstSource, final DataInputView secondSource) throws IOException {
 		if (this.serializer == null) {
 			this.serializer = this.serializerFactory.getSerializer();
 		}
@@ -164,6 +168,17 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat
 		}
 	}
 
+	@Override
+	public Object[] extractKeys(T record) {
+		extractedKey[0] = record;
+		return extractedKey;
+	}
+
+	@Override
+	public TypeComparator[] getComparators() {
+		return comparators;
+	}
+
 	// ------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
index 41f73c4..a61e9ef 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -52,6 +52,7 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 
 	private final Class<T> type;
 
+	private final Comparable[] extractedKeys;
 
 	@SuppressWarnings("unchecked")
 	public PojoComparator(Field[] keyFields, TypeComparator<?>[] comparators, TypeSerializer<T> serializer, Class<T> type) {
@@ -101,6 +102,8 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 		this.numLeadingNormalizableKeys = nKeys;
 		this.normalizableKeyPrefixLen = nKeyLen;
 		this.invertNormKey = inverted;
+
+		extractedKeys = new Comparable[keyFields.length];
 	}
 
 	@SuppressWarnings("unchecked")
@@ -128,7 +131,7 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 			throw new RuntimeException("Cannot copy serializer", e);
 		}
 
-
+		extractedKeys = new Comparable[keyFields.length];
 	}
 
 	private void writeObject(ObjectOutputStream out)
@@ -172,7 +175,7 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 		return this.keyFields;
 	}
 
-	public TypeComparator<Object>[] getComparators() {
+	public TypeComparator[] getComparators() {
 		return this.comparators;
 	}
 
@@ -269,7 +272,7 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 	}
 
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		T first = this.serializer.createInstance();
 		T second = this.serializer.createInstance();
 
@@ -343,6 +346,23 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 		return new PojoComparator<T>(this);
 	}
 
+	@Override
+	public Object[] extractKeys(T record) {
+		int i = 0;
+		try {
+			for (; i < keyFields.length; i++) {
+				extractedKeys[i] = (Comparable) keyFields[i].get(record);
+			}
+		}
+		catch (IllegalAccessException iaex) {
+			throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo.");
+		}
+		catch (NullPointerException npex) {
+			throw new NullKeyFieldException(this.keyFields[i].toString());
+		}
+		return extractedKeys;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoPairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoPairComparator.java
deleted file mode 100644
index 29266be..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoPairComparator.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-
-
-public class PojoPairComparator<T1, T2> extends TypePairComparator<T1, T2> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	@SuppressWarnings("unused")
-	private final int[] keyPositions1, keyPositions2;
-	private transient Field[] keyFields1, keyFields2;
-	private final TypeComparator<Object>[] comparators1;
-	private final TypeComparator<Object>[] comparators2;
-
-	@SuppressWarnings("unchecked")
-	public PojoPairComparator(int[] keyPositions1, Field[] keyFields1, int[] keyPositions2, Field[] keyFields2, TypeComparator<Object>[] comparators1, TypeComparator<Object>[] comparators2) {
-
-		if(keyPositions1.length != keyPositions2.length
-			|| keyPositions1.length != comparators1.length
-			|| keyPositions2.length != comparators2.length) {
-
-			throw new IllegalArgumentException("Number of key fields and comparators differ.");
-		}
-
-		int numKeys = keyPositions1.length;
-
-		this.keyPositions1 = keyPositions1;
-		this.keyPositions2 = keyPositions2;
-		this.keyFields1 = keyFields1;
-		this.keyFields2 = keyFields2;
-		this.comparators1 = new TypeComparator[numKeys];
-		this.comparators2 = new TypeComparator[numKeys];
-
-		for(int i = 0; i < numKeys; i++) {
-			this.comparators1[i] = comparators1[i].duplicate();
-			this.comparators2[i] = comparators2[i].duplicate();
-		}
-	}
-
-	private void writeObject(ObjectOutputStream out)
-			throws IOException, ClassNotFoundException {
-		out.defaultWriteObject();
-		out.writeInt(keyFields1.length);
-		for (Field field: keyFields1) {
-			out.writeObject(field.getDeclaringClass());
-			out.writeUTF(field.getName());
-		}
-		out.writeInt(keyFields2.length);
-		for (Field field: keyFields2) {
-			out.writeObject(field.getDeclaringClass());
-			out.writeUTF(field.getName());
-		}
-	}
-
-	private void readObject(ObjectInputStream in)
-			throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		int numKeyFields = in.readInt();
-		keyFields1 = new Field[numKeyFields];
-		for (int i = 0; i < numKeyFields; i++) {
-			Class<?> clazz = (Class<?>)in.readObject();
-			String fieldName = in.readUTF();
-			try {
-				keyFields1[i] = clazz.getField(fieldName);
-				keyFields1[i].setAccessible(true);
-			} catch (NoSuchFieldException e) {
-				throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup.");
-			}
-		}
-		numKeyFields = in.readInt();
-		keyFields2 = new Field[numKeyFields];
-		for (int i = 0; i < numKeyFields; i++) {
-			Class<?> clazz = (Class<?>)in.readObject();
-			String fieldName = in.readUTF();
-			try {
-				keyFields2[i] = clazz.getField(fieldName);
-				keyFields2[i].setAccessible(true);
-			} catch (NoSuchFieldException e) {
-				throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup.");
-			}
-		}
-	}
-
-	@Override
-	public void setReference(T1 reference) {
-		for(int i=0; i < this.comparators1.length; i++) {
-			try {
-				this.comparators1[i].setReference(keyFields1[i].get(reference));
-			} catch (IllegalAccessException e) {
-				throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo.");
-			}
-		}
-	}
-
-	@Override
-	public boolean equalToReference(T2 candidate) {
-		for(int i=0; i < this.comparators1.length; i++) {
-			try {
-				if(!this.comparators1[i].equalToReference(keyFields2[i].get(candidate))) {
-					return false;
-				}
-			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup.");
-			}
-		}
-		return true;
-	}
-
-	@Override
-	public int compareToReference(T2 candidate) {
-		for(int i=0; i < this.comparators1.length; i++) {
-			try {
-				this.comparators2[i].setReference(keyFields2[i].get(candidate));
-			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup.");
-			}
-			int res = this.comparators1[i].compareToReference(this.comparators2[i]);
-			if(res != 0) {
-				return res;
-			}
-		}
-		return 0;
-	}
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
index f210cdb..17c939c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
@@ -16,129 +16,30 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.java.tuple.Tuple;
-
 
-public final class RuntimePairComparatorFactory<T1 extends Tuple, T2 extends Tuple> implements TypePairComparatorFactory<T1, T2>, java.io.Serializable {
+public final class RuntimePairComparatorFactory<T1, T2>
+		implements TypePairComparatorFactory<T1, T2>, java.io.Serializable {
 
 	private static final long serialVersionUID = 1L;
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public TypePairComparator<T1, T2> createComparator12(TypeComparator<T1> comparator1, TypeComparator<T2> comparator2) {
-
-		if ((comparator1 instanceof TupleLeadingFieldComparator) && (comparator2 instanceof TupleLeadingFieldComparator)) {
-
-			TypeComparator<?> comp1 = ((TupleLeadingFieldComparator<?,?>) comparator1).getFieldComparator();
-			TypeComparator<?> comp2 = ((TupleLeadingFieldComparator<?,?>) comparator2).getFieldComparator();
-
-			return createLeadingFieldPairComp(comp1, comp2);
-		}
-		else {
-			int[] keyPos1;
-			int[] keyPos2;
-			TypeComparator<Object>[] comps1;
-			TypeComparator<Object>[] comps2;
-			
-			// get info from first comparator
-			if (comparator1 instanceof TupleComparator) {
-				TupleComparator<?> tupleComp1 = (TupleComparator<?>) comparator1;
-				keyPos1 = tupleComp1.getKeyPositions();
-				comps1 = tupleComp1.getComparators();
-			}
-			else if (comparator1 instanceof TupleLeadingFieldComparator) {
-				TupleLeadingFieldComparator<?, ?> tupleComp1 = (TupleLeadingFieldComparator<?, ?>) comparator1;
-				keyPos1 = new int[] {0};
-				comps1 = new TypeComparator[] { tupleComp1.getFieldComparator() };
-			}
-			else {
-				throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparator: " + comparator1);
-			}
-			
-			// get info from second comparator
-			if (comparator2 instanceof TupleComparator) {
-				TupleComparator<?> tupleComp2 = (TupleComparator<?>) comparator2;
-				keyPos2 = tupleComp2.getKeyPositions();
-				comps2 = tupleComp2.getComparators();
-			}
-			else if (comparator2 instanceof TupleLeadingFieldComparator) {
-				TupleLeadingFieldComparator<?, ?> tupleComp2 = (TupleLeadingFieldComparator<?, ?>) comparator2;
-				keyPos2 = new int[] {0};
-				comps2 = new TypeComparator[] { tupleComp2.getFieldComparator() };
-			}
-			else {
-				throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparator: " + comparator1);
-			}
-
-			return (TypePairComparator<T1, T2>) new TuplePairComparator<Tuple, Tuple>(keyPos1, keyPos2, comps1, comps2);
-		}
+	public TypePairComparator<T1, T2> createComparator12(
+			TypeComparator<T1> comparator1,
+			TypeComparator<T2> comparator2) {
+		return new GenericPairComparator<T1, T2>(comparator1, comparator2);
 	}
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public TypePairComparator<T2, T1> createComparator21(TypeComparator<T1> comparator1, TypeComparator<T2> comparator2) {
-		
-		if ((comparator1 instanceof TupleLeadingFieldComparator) && (comparator2 instanceof TupleLeadingFieldComparator)) {
-
-			TypeComparator<?> comp1 = ((TupleLeadingFieldComparator<?,?>) comparator1).getFieldComparator();
-			TypeComparator<?> comp2 = ((TupleLeadingFieldComparator<?,?>) comparator2).getFieldComparator();
-
-			return createLeadingFieldPairComp(comp2, comp1);
-		}
-		else {
-			int[] keyPos1;
-			int[] keyPos2;
-			TypeComparator<Object>[] comps1;
-			TypeComparator<Object>[] comps2;
-			
-			// get info from first comparator
-			if (comparator1 instanceof TupleComparator) {
-				TupleComparator<?> tupleComp1 = (TupleComparator<?>) comparator1;
-				keyPos1 = tupleComp1.getKeyPositions();
-				comps1 = tupleComp1.getComparators();
-			}
-			else if (comparator1 instanceof TupleLeadingFieldComparator) {
-				TupleLeadingFieldComparator<?, ?> tupleComp1 = (TupleLeadingFieldComparator<?, ?>) comparator1;
-				keyPos1 = new int[] {0};
-				comps1 = new TypeComparator[] { tupleComp1.getFieldComparator() };
-			}
-			else {
-				throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparator: " + comparator1);
-			}
-			
-			// get info from second comparator
-			if (comparator2 instanceof TupleComparator) {
-				TupleComparator<?> tupleComp2 = (TupleComparator<?>) comparator2;
-				keyPos2 = tupleComp2.getKeyPositions();
-				comps2 = tupleComp2.getComparators();
-			}
-			else if (comparator2 instanceof TupleLeadingFieldComparator) {
-				TupleLeadingFieldComparator<?, ?> tupleComp2 = (TupleLeadingFieldComparator<?, ?>) comparator2;
-				keyPos2 = new int[] {0};
-				comps2 = new TypeComparator[] { tupleComp2.getFieldComparator() };
-			}
-			else {
-				throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparator: " + comparator1);
-			}
-
-			return (TypePairComparator<T2, T1>) new TuplePairComparator<Tuple, Tuple>(keyPos2, keyPos1, comps2, comps1);
-		}
-	}
-	
-	private static <K, T1 extends Tuple, T2 extends Tuple> TupleLeadingFieldPairComparator<K, T1, T2> createLeadingFieldPairComp(
-			TypeComparator<?> comp1, TypeComparator<?> comp2)
-	{
-		@SuppressWarnings("unchecked")
-		TypeComparator<K> c1 = (TypeComparator<K>) comp1;
-		@SuppressWarnings("unchecked")
-		TypeComparator<K> c2 = (TypeComparator<K>) comp2;
-		
-		return new TupleLeadingFieldPairComparator<K, T1, T2>(c1, c2);
+	public TypePairComparator<T2, T1> createComparator21(
+			TypeComparator<T1> comparator1,
+			TypeComparator<T2> comparator2) {
+		return new GenericPairComparator<T2, T1>(comparator2, comparator1);
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index 9de9824..7b3976f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -18,135 +18,33 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.types.KeyFieldOutOfBoundsException;
 import org.apache.flink.types.NullFieldException;
 import org.apache.flink.types.NullKeyFieldException;
 
 
-public final class TupleComparator<T extends Tuple> extends TypeComparator<T> implements java.io.Serializable {
+public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	/** key positions describe which fields are keys in what order */
-	private final int[] keyPositions;
-	
-	/** comparators for the key fields, in the same order as the key fields */
-	private final TypeComparator<Object>[] comparators;
+	private final Object[] extractedKeys;
 
-	/** serializer factories to duplicate non thread-safe serializers */
-	private final TypeSerializerFactory<Object>[] serializerFactories;
-	
-	
-	private final int[] normalizedKeyLengths;
-	
-	private final int numLeadingNormalizableKeys;
-	
-	private final int normalizableKeyPrefixLen;
-	
-	private final boolean invertNormKey;
-	
-	
-	/** serializers to deserialize the first n fields for comparison */
-	private transient TypeSerializer<Object>[] serializers;
-	
-	// cache for the deserialized field objects
-	private transient Object[] deserializedFields1;
-	private transient Object[] deserializedFields2;
-	
-	
 	@SuppressWarnings("unchecked")
 	public TupleComparator(int[] keyPositions, TypeComparator<?>[] comparators, TypeSerializer<?>[] serializers) {
-		// set the default utils
-		this.keyPositions = keyPositions;
-		this.comparators = (TypeComparator<Object>[]) comparators;
-		this.serializers = (TypeSerializer<Object>[]) serializers;
-	
-		// set the serializer factories.
-		this.serializerFactories = new TypeSerializerFactory[this.serializers.length];
-		for (int i = 0; i < serializers.length; i++) {
-			this.serializerFactories[i] = this.serializers[i].isStateful() ?
-					new RuntimeStatefulSerializerFactory<Object>(this.serializers[i], Object.class) :
-					new RuntimeStatelessSerializerFactory<Object>(this.serializers[i], Object.class);
-		}
-		
-		// set up auxiliary fields for normalized key support
-		this.normalizedKeyLengths = new int[keyPositions.length];
-		int nKeys = 0;
-		int nKeyLen = 0;
-		boolean inverted = false;
-		
-		for (int i = 0; i < this.keyPositions.length; i++) {
-			TypeComparator<?> k = this.comparators[i];
-			
-			// as long as the leading keys support normalized keys, we can build up the composite key
-			if (k.supportsNormalizedKey()) {
-				if (i == 0) {
-					// the first comparator decides whether we need to invert the key direction
-					inverted = k.invertNormalizedKey();
-				}
-				else if (k.invertNormalizedKey() != inverted) {
-					// if a successor does not agree on the inversion direction, it cannot be part of the normalized key
-					break;
-				}
-				
-				nKeys++;
-				final int len = k.getNormalizeKeyLen();
-				if (len < 0) {
-					throw new RuntimeException("Comparator " + k.getClass().getName() + " specifies an invalid length for the normalized key: " + len);
-				}
-				this.normalizedKeyLengths[i] = len;
-				nKeyLen += len;
-				
-				if (nKeyLen < 0) {
-					// overflow, which means we are out of budget for normalized key space anyways
-					nKeyLen = Integer.MAX_VALUE;
-					break;
-				}
-			} else {
-				break;
-			}
-		}
-		this.numLeadingNormalizableKeys = nKeys;
-		this.normalizableKeyPrefixLen = nKeyLen;
-		this.invertNormKey = inverted;
+		super(keyPositions, comparators, serializers);
+		extractedKeys = new Object[keyPositions.length];
 	}
 	
 	@SuppressWarnings("unchecked")
 	private TupleComparator(TupleComparator<T> toClone) {
-		// copy fields and serializer factories
-		this.keyPositions = toClone.keyPositions;
-		this.serializerFactories = toClone.serializerFactories;
-		
-		this.comparators = new TypeComparator[toClone.comparators.length];
-		for (int i = 0; i < toClone.comparators.length; i++) {
-			this.comparators[i] = toClone.comparators[i].duplicate();
-		}
-		
-		this.normalizedKeyLengths = toClone.normalizedKeyLengths;
-		this.numLeadingNormalizableKeys = toClone.numLeadingNormalizableKeys;
-		this.normalizableKeyPrefixLen = toClone.normalizableKeyPrefixLen;
-		this.invertNormKey = toClone.invertNormKey;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Comparator Methods
-	// --------------------------------------------------------------------------------------------
-	
-	protected int[] getKeyPositions() {
-		return this.keyPositions;
-	}
-	
-	protected TypeComparator<Object>[] getComparators() {
-		return this.comparators;
+		super(toClone);
+		extractedKeys = new Object[keyPositions.length];
+
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -207,28 +105,6 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
 		}
 	}
-
-	@Override
-	public int compareToReference(TypeComparator<T> referencedComparator) {
-		TupleComparator<T> other = (TupleComparator<T>) referencedComparator;
-		
-		int i = 0;
-		try {
-			for (; i < this.keyPositions.length; i++) {
-				int cmp = this.comparators[i].compareToReference(other.comparators[i]);
-				if (cmp != 0) {
-					return cmp;
-				}
-			}
-			return 0;
-		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(keyPositions[i]);
-		}
-		catch (IndexOutOfBoundsException iobex) {
-			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
-		}
-	}
 	
 	@Override
 	public int compare(T first, T second) {
@@ -253,131 +129,32 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 	}
 
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
-		if (deserializedFields1 == null) {
-			instantiateDeserializationUtils();
-		}
-		
-		int i = 0;
-		try {
-			for (; i < serializers.length; i++) {
-				deserializedFields1[i] = serializers[i].deserialize(deserializedFields1[i], firstSource);
-				deserializedFields2[i] = serializers[i].deserialize(deserializedFields2[i], secondSource);
-			}
-			
-			for (i = 0; i < keyPositions.length; i++) {
-				int keyPos = keyPositions[i];
-				int cmp = comparators[i].compare(deserializedFields1[keyPos], deserializedFields2[keyPos]);
-				if (cmp != 0) {
-					return cmp;
-				}
-			}
-			
-			return 0;
-		} catch (NullPointerException npex) {
-			throw new NullKeyFieldException(keyPositions[i]);
-		} catch (IndexOutOfBoundsException iobex) {
-			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
-		}
-	}
-	
-	@Override
-	public boolean supportsNormalizedKey() {
-		return this.numLeadingNormalizableKeys > 0;
-	}
-
-	@Override
-	public int getNormalizeKeyLen() {
-		return this.normalizableKeyPrefixLen;
-	}
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return this.numLeadingNormalizableKeys < this.keyPositions.length ||
-				this.normalizableKeyPrefixLen == Integer.MAX_VALUE ||
-				this.normalizableKeyPrefixLen > keyBytes;
-	}
-
-	@Override
 	public void putNormalizedKey(T value, MemorySegment target, int offset, int numBytes) {
 		int i = 0;
 		try {
-			for (; i < this.numLeadingNormalizableKeys && numBytes > 0; i++)
-			{
-				int len = this.normalizedKeyLengths[i]; 
+			for (; i < this.numLeadingNormalizableKeys && numBytes > 0; i++) {
+				int len = this.normalizedKeyLengths[i];
 				len = numBytes >= len ? len : numBytes;
 				this.comparators[i].putNormalizedKey(value.getFieldNotNull(this.keyPositions[i]), target, offset, len);
 				numBytes -= len;
 				offset += len;
 			}
-		}
-		catch (NullFieldException nfex) {
+		} catch (NullFieldException nfex) {
 			throw new NullKeyFieldException(nfex);
-		}
-		catch (NullPointerException npex) {
+		} catch (NullPointerException npex) {
 			throw new NullKeyFieldException(this.keyPositions[i]);
 		}
 	}
 
 	@Override
-	public boolean invertNormalizedKey() {
-		return this.invertNormKey;
-	}
-	
-	
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-	
-	@Override
-	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
-		throw new UnsupportedOperationException();
+	public Object[] extractKeys(T record) {
+		for (int i = 0; i < keyPositions.length; i++) {
+			extractedKeys[i] = record.getField(keyPositions[i]);
+		}
+		return extractedKeys;
 	}
 
-	@Override
-	public TupleComparator<T> duplicate() {
+	public TypeComparator<T> duplicate() {
 		return new TupleComparator<T>(this);
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@SuppressWarnings("unchecked")
-	private final void instantiateDeserializationUtils() {
-		if (this.serializers == null) {
-			this.serializers = new TypeSerializer[this.serializerFactories.length];
-			for (int i = 0; i < this.serializers.length; i++) {
-				this.serializers[i] = this.serializerFactories[i].getSerializer();
-			}
-		}
-		
-		this.deserializedFields1 = new Object[this.serializers.length];
-		this.deserializedFields2 = new Object[this.serializers.length];
-		
-		for (int i = 0; i < this.serializers.length; i++) {
-			this.deserializedFields1[i] = this.serializers[i].createInstance();
-			this.deserializedFields2[i] = this.serializers[i].createInstance();
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * A sequence of prime numbers to be used for salting the computed hash values.
-	 * Based on some empirical evidence, we are using a 32-element subsequence of the  
-	 * OEIS sequence #A068652 (numbers such that every cyclic permutation is a prime).
-	 * 
-	 * @see: http://en.wikipedia.org/wiki/List_of_prime_numbers
-	 * @see: http://oeis.org/A068652
-	 */
-	private static final int[] HASH_SALT = new int[] { 
-		73   , 79   , 97   , 113  , 131  , 197  , 199  , 311   , 
-		337  , 373  , 719  , 733  , 919  , 971  , 991  , 1193  , 
-		1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 , 
-		19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 };
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
new file mode 100644
index 0000000..cea9879
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.NullKeyFieldException;
+
+import java.io.IOException;
+
+
+public abstract class TupleComparatorBase<T> extends TypeComparator<T> implements java.io.Serializable {
+
+	/** key positions describe which fields are keys in what order */
+	protected int[] keyPositions;
+
+	/** comparators for the key fields, in the same order as the key fields */
+	protected TypeComparator[] comparators;
+
+	/** serializer factories to duplicate non thread-safe serializers */
+	protected TypeSerializerFactory<Object>[] serializerFactories;
+
+
+	protected int[] normalizedKeyLengths;
+
+	protected int numLeadingNormalizableKeys;
+
+	protected int normalizableKeyPrefixLen;
+
+	protected boolean invertNormKey;
+
+
+	/** serializers to deserialize the first n fields for comparison */
+	protected transient TypeSerializer[] serializers;
+
+	// cache for the deserialized field objects
+	protected transient Object[] deserializedFields1;
+	protected transient Object[] deserializedFields2;
+
+
+	@SuppressWarnings("unchecked")
+	public TupleComparatorBase(int[] keyPositions, TypeComparator<?>[] comparators, TypeSerializer<?>[] serializers) {
+		// set the default utils
+		this.keyPositions = keyPositions;
+		this.comparators = (TypeComparator<Object>[]) comparators;
+		this.serializers = (TypeSerializer<Object>[]) serializers;
+
+		// set the serializer factories.
+		this.serializerFactories = new TypeSerializerFactory[this.serializers.length];
+		for (int i = 0; i < serializers.length; i++) {
+			this.serializerFactories[i] = this.serializers[i].isStateful() ?
+					new RuntimeStatefulSerializerFactory<Object>(this.serializers[i], Object.class) :
+					new RuntimeStatelessSerializerFactory<Object>(this.serializers[i], Object.class);
+		}
+
+		// set up auxiliary fields for normalized key support
+		this.normalizedKeyLengths = new int[keyPositions.length];
+		int nKeys = 0;
+		int nKeyLen = 0;
+		boolean inverted = false;
+
+		for (int i = 0; i < this.keyPositions.length; i++) {
+			TypeComparator<?> k = this.comparators[i];
+
+			// as long as the leading keys support normalized keys, we can build up the composite key
+			if (k.supportsNormalizedKey()) {
+				if (i == 0) {
+					// the first comparator decides whether we need to invert the key direction
+					inverted = k.invertNormalizedKey();
+				}
+				else if (k.invertNormalizedKey() != inverted) {
+					// if a successor does not agree on the inversion direction, it cannot be part of the normalized key
+					break;
+				}
+
+				nKeys++;
+				final int len = k.getNormalizeKeyLen();
+				if (len < 0) {
+					throw new RuntimeException("Comparator " + k.getClass().getName() + " specifies an invalid length for the normalized key: " + len);
+				}
+				this.normalizedKeyLengths[i] = len;
+				nKeyLen += len;
+
+				if (nKeyLen < 0) {
+					// overflow, which means we are out of budget for normalized key space anyways
+					nKeyLen = Integer.MAX_VALUE;
+					break;
+				}
+			} else {
+				break;
+			}
+		}
+		this.numLeadingNormalizableKeys = nKeys;
+		this.normalizableKeyPrefixLen = nKeyLen;
+		this.invertNormKey = inverted;
+	}
+
+	@SuppressWarnings("unchecked")
+	protected TupleComparatorBase(TupleComparatorBase<T> toClone) {
+		privateDuplicate(toClone);
+	}
+
+	// We need this because we cannot call the cloning constructor from the
+	// ScalaTupleComparator
+	protected void privateDuplicate(TupleComparatorBase<T> toClone) {
+		// copy fields and serializer factories
+		this.keyPositions = toClone.keyPositions;
+		this.serializerFactories = toClone.serializerFactories;
+
+		this.comparators = new TypeComparator[toClone.comparators.length];
+		for (int i = 0; i < toClone.comparators.length; i++) {
+			this.comparators[i] = toClone.comparators[i].duplicate();
+		}
+
+		this.normalizedKeyLengths = toClone.normalizedKeyLengths;
+		this.numLeadingNormalizableKeys = toClone.numLeadingNormalizableKeys;
+		this.normalizableKeyPrefixLen = toClone.normalizableKeyPrefixLen;
+		this.invertNormKey = toClone.invertNormKey;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Comparator Methods
+	// --------------------------------------------------------------------------------------------
+	
+	protected int[] getKeyPositions() {
+		return this.keyPositions;
+	}
+	
+	public TypeComparator[] getComparators() {
+		return this.comparators;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Comparator Methods
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int compareToReference(TypeComparator<T> referencedComparator) {
+		TupleComparatorBase<T> other = (TupleComparatorBase<T>) referencedComparator;
+		
+		int i = 0;
+		try {
+			for (; i < this.keyPositions.length; i++) {
+				int cmp = this.comparators[i].compareToReference(other.comparators[i]);
+				if (cmp != 0) {
+					return cmp;
+				}
+			}
+			return 0;
+		}
+		catch (NullPointerException npex) {
+			throw new NullKeyFieldException(keyPositions[i]);
+		}
+		catch (IndexOutOfBoundsException iobex) {
+			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+		}
+	}
+	
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		if (deserializedFields1 == null) {
+			instantiateDeserializationUtils();
+		}
+		
+		int i = 0;
+		try {
+			for (; i < serializers.length; i++) {
+				deserializedFields1[i] = serializers[i].deserialize(deserializedFields1[i], firstSource);
+				deserializedFields2[i] = serializers[i].deserialize(deserializedFields2[i], secondSource);
+			}
+			
+			for (i = 0; i < keyPositions.length; i++) {
+				int keyPos = keyPositions[i];
+				int cmp = comparators[i].compare(deserializedFields1[keyPos], deserializedFields2[keyPos]);
+				if (cmp != 0) {
+					return cmp;
+				}
+			}
+			
+			return 0;
+		} catch (NullPointerException npex) {
+			throw new NullKeyFieldException(keyPositions[i]);
+		} catch (IndexOutOfBoundsException iobex) {
+			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+		}
+	}
+	
+	@Override
+	public boolean supportsNormalizedKey() {
+		return this.numLeadingNormalizableKeys > 0;
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return this.normalizableKeyPrefixLen;
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return this.numLeadingNormalizableKeys < this.keyPositions.length ||
+				this.normalizableKeyPrefixLen == Integer.MAX_VALUE ||
+				this.normalizableKeyPrefixLen > keyBytes;
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return this.invertNormKey;
+	}
+	
+	
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+	
+	@Override
+	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@SuppressWarnings("unchecked")
+	private final void instantiateDeserializationUtils() {
+		if (this.serializers == null) {
+			this.serializers = new TypeSerializer[this.serializerFactories.length];
+			for (int i = 0; i < this.serializers.length; i++) {
+				this.serializers[i] = this.serializerFactories[i].getSerializer();
+			}
+		}
+		
+		this.deserializedFields1 = new Object[this.serializers.length];
+		this.deserializedFields2 = new Object[this.serializers.length];
+		
+		for (int i = 0; i < this.serializers.length; i++) {
+			this.deserializedFields1[i] = this.serializers[i].createInstance();
+			this.deserializedFields2[i] = this.serializers[i].createInstance();
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * A sequence of prime numbers to be used for salting the computed hash values.
+	 * Based on some empirical evidence, we are using a 32-element subsequence of the  
+	 * OEIS sequence #A068652 (numbers such that every cyclic permutation is a prime).
+	 * 
+	 * @see: http://en.wikipedia.org/wiki/List_of_prime_numbers
+	 * @see: http://oeis.org/A068652
+	 */
+	protected static final int[] HASH_SALT = new int[] {
+		73   , 79   , 97   , 113  , 131  , 197  , 199  , 311   , 
+		337  , 373  , 719  , 733  , 919  , 971  , 991  , 1193  , 
+		1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 , 
+		19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 };
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
deleted file mode 100644
index d63fccb..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparator.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.NullFieldException;
-import org.apache.flink.types.NullKeyFieldException;
-
-
-public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeComparator<T>
-	implements java.io.Serializable
-{
-	private static final long serialVersionUID = 1L;
-	
-	private final TypeComparator<K> comparator;
-	
-	
-		
-	public TupleLeadingFieldComparator(TypeComparator<K> comparator) {
-		this.comparator = comparator;
-	}
-	
-	public TypeComparator<K> getComparator() {
-		return this.comparator;
-	}
-	
-	@Override
-	public int hash(T value) {
-		try {
-			return comparator.hash(value.<K> getFieldNotNull(0));
-		} catch (NullFieldException nfex) {
-			throw new NullKeyFieldException(nfex);
-		}
-
-	}
-
-	@Override
-	public void setReference(T toCompare) {
-		try {
-			this.comparator.setReference(toCompare.<K> getFieldNotNull(0));
-		} catch (NullFieldException nfex) {
-			throw new NullKeyFieldException(nfex);
-		}
-	}
-	
-
-	@Override
-	public boolean equalToReference(T candidate) {
-		try {
-			return this.comparator.equalToReference(candidate
-					.<K> getFieldNotNull(0));
-		} catch (NullFieldException nfex) {
-			throw new NullKeyFieldException(nfex);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public int compareToReference(TypeComparator<T> referencedComparator) {
-		return this.comparator.compareToReference(((TupleLeadingFieldComparator<T, K>) referencedComparator).comparator);
-	}
-	
-	@Override
-	public int compare(T first, T second) {
-		try {
-			return this.comparator.compare(first.<K> getFieldNotNull(0),
-					second.<K> getFieldNotNull(0));
-		} catch (NullFieldException nfex) {
-			throw new NullKeyFieldException(nfex);
-		}
-	}
-
-	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
-		return this.comparator.compare(firstSource, secondSource);
-	}
-
-	@Override
-	public boolean supportsNormalizedKey() {
-		return this.comparator.supportsNormalizedKey();
-	}
-
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-
-	@Override
-	public int getNormalizeKeyLen() {
-		return this.comparator.getNormalizeKeyLen();
-	}
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return this.comparator.isNormalizedKeyPrefixOnly(keyBytes);
-	}
-
-	@Override
-	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
-		try {
-			this.comparator.putNormalizedKey(record.<K> getFieldNotNull(0),
-					target, offset, numBytes);
-		} catch (NullFieldException nfex) {
-			throw new NullKeyFieldException(nfex);
-		}
-	}
-
-	@Override
-	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public boolean invertNormalizedKey() {
-		return this.comparator.invertNormalizedKey();
-	}
-
-	@Override
-	public TypeComparator<T> duplicate() {
-		return new TupleLeadingFieldComparator<T, K>(comparator.duplicate());
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	protected TypeComparator<K> getFieldComparator() {
-		return this.comparator;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
deleted file mode 100644
index 3611f70..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparator.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.types.NullFieldException;
-import org.apache.flink.types.NullKeyFieldException;
-
-
-public class TupleLeadingFieldPairComparator<K, T1 extends Tuple, T2 extends Tuple> extends TypePairComparator<T1, T2> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-	private final TypeComparator<K> comparator1;
-	private final TypeComparator<K> comparator2;
-	
-	public TupleLeadingFieldPairComparator(TypeComparator<K> comparator1, TypeComparator<K> comparator2) {
-		this.comparator1 = comparator1;
-		this.comparator2 = comparator2;
-	}
-	
-	@Override
-	public void setReference(T1 reference) {
-		try {
-			this.comparator1.setReference(reference.<K> getFieldNotNull(0));
-		} catch (NullFieldException nfex) {
-			throw new NullKeyFieldException(nfex);
-		}
-	}
-
-	@Override
-	public boolean equalToReference(T2 candidate) {
-		try {
-			return this.comparator1.equalToReference(candidate
-					.<K> getFieldNotNull(0));
-		} catch (NullFieldException nfex) {
-			throw new NullKeyFieldException(nfex);
-		}
-	}
-
-	@Override
-	public int compareToReference(T2 candidate) {
-		try {
-			this.comparator2.setReference(candidate.<K> getFieldNotNull(0));
-			return this.comparator1.compareToReference(this.comparator2);
-		} catch (NullFieldException nfex) {
-			throw new NullKeyFieldException(nfex);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
deleted file mode 100644
index 4c4094d..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparator.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.types.NullFieldException;
-import org.apache.flink.types.NullKeyFieldException;
-
-
-public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple> extends TypePairComparator<T1, T2> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-	private final int[] keyFields1, keyFields2;
-	private final TypeComparator<Object>[] comparators1;
-	private final TypeComparator<Object>[] comparators2;
-	
-	@SuppressWarnings("unchecked")
-	public TuplePairComparator(int[] keyFields1, int[] keyFields2, TypeComparator<Object>[] comparators1, TypeComparator<Object>[] comparators2) {
-		
-		if(keyFields1.length != keyFields2.length 
-			|| keyFields1.length != comparators1.length
-			|| keyFields2.length != comparators2.length) {
-			
-			throw new IllegalArgumentException("Number of key fields and comparators differ.");
-		}
-		
-		int numKeys = keyFields1.length;
-		
-		this.keyFields1 = keyFields1;
-		this.keyFields2 = keyFields2;
-		this.comparators1 = new TypeComparator[numKeys];
-		this.comparators2 = new TypeComparator[numKeys];
-		
-		for(int i = 0; i < numKeys; i++) {
-			this.comparators1[i] = comparators1[i].duplicate();
-			this.comparators2[i] = comparators2[i].duplicate();
-		}
-	}
-	
-	@Override
-	public void setReference(T1 reference) {
-		for (int i = 0; i < this.comparators1.length; i++) {
-			try {
-				this.comparators1[i].setReference(reference
-						.getFieldNotNull(keyFields1[i]));
-			} catch (NullFieldException nfex) {
-				throw new NullKeyFieldException(nfex);
-			}
-		}
-	}
-
-	@Override
-	public boolean equalToReference(T2 candidate) {
-		for (int i = 0; i < this.comparators1.length; i++) {
-			try {
-				if (!this.comparators1[i].equalToReference(candidate
-						.getFieldNotNull(keyFields2[i]))) {
-					return false;
-				}
-			} catch (NullFieldException nfex) {
-				throw new NullKeyFieldException(nfex);
-			}
-		}
-		return true;
-	}
-
-	@Override
-	public int compareToReference(T2 candidate) {
-		for (int i = 0; i < this.comparators1.length; i++) {
-			try {
-				this.comparators2[i].setReference(candidate
-						.getFieldNotNull(keyFields2[i]));
-				int res = this.comparators1[i]
-						.compareToReference(this.comparators2[i]);
-				if (res != 0) {
-					return res;
-				}
-			} catch (NullFieldException nfex) {
-				throw new NullKeyFieldException(nfex);
-			}
-		}
-		return 0;
-	}
-}