You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:02 UTC

[20/60] Refactor TupleTypeInfo and add GenericPairComparator

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 1c777fc..58a9a2a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
-import java.util.Arrays;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -28,49 +27,16 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.NullFieldException;
 
 
-public final class TupleSerializer<T extends Tuple> extends TypeSerializer<T> {
+public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> {
 
 	private static final long serialVersionUID = 1L;
 	
-	
-	private final Class<T> tupleClass;
-	
-	private final TypeSerializer<Object>[] fieldSerializers;
-	
-	private final int arity;
-	
-	private final boolean stateful;
-	
-	
 	@SuppressWarnings("unchecked")
 	public TupleSerializer(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
-		this.tupleClass = tupleClass;
-		this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
-		this.arity = fieldSerializers.length;
-		
-		boolean stateful = false;
-		for (TypeSerializer<?> ser : fieldSerializers) {
-			if (ser.isStateful()) {
-				stateful = true;
-				break;
-			}
-		}
-		this.stateful = stateful;
-	}
-	
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
+		super(tupleClass, fieldSerializers);
 	}
 
 	@Override
-	public boolean isStateful() {
-		return this.stateful;
-	}
-	
-	
-	@Override
 	public T createInstance() {
 		try {
 			T t = tupleClass.newInstance();
@@ -97,12 +63,6 @@ public final class TupleSerializer<T extends Tuple> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public int getLength() {
-		return -1;
-	}
-
-
-	@Override
 	public void serialize(T value, DataOutputView target) throws IOException {
 		for (int i = 0; i < arity; i++) {
 			Object o = value.getField(i);
@@ -122,33 +82,4 @@ public final class TupleSerializer<T extends Tuple> extends TypeSerializer<T> {
 		}
 		return reuse;
 	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		for (int i = 0; i < arity; i++) {
-			fieldSerializers[i].copy(source, target);
-		}
-	}
-	
-	@Override
-	public int hashCode() {
-		int hashCode = arity * 47;
-		for (TypeSerializer<?> ser : this.fieldSerializers) {
-			hashCode = (hashCode << 7) | (hashCode >>> -7);
-			hashCode += ser.hashCode();
-		}
-		return hashCode;
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj != null && obj instanceof TupleSerializer) {
-			TupleSerializer<?> otherTS = (TupleSerializer<?>) obj;
-			return (otherTS.tupleClass == this.tupleClass) && 
-					Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
-		}
-		else {
-			return false;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
new file mode 100644
index 0000000..69133b6
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+
+public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
+
+	protected final Class<T> tupleClass;
+
+	protected final TypeSerializer<Object>[] fieldSerializers;
+
+	protected final int arity;
+
+	protected final boolean stateful;
+
+
+	@SuppressWarnings("unchecked")
+	public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
+		this.tupleClass = tupleClass;
+		this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
+		this.arity = fieldSerializers.length;
+		
+		boolean stateful = false;
+		for (TypeSerializer<?> ser : fieldSerializers) {
+			if (ser.isStateful()) {
+				stateful = true;
+				break;
+			}
+		}
+		this.stateful = stateful;
+	}
+	
+	
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public boolean isStateful() {
+		return this.stateful;
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		for (int i = 0; i < arity; i++) {
+			fieldSerializers[i].copy(source, target);
+		}
+	}
+	
+	@Override
+	public int hashCode() {
+		int hashCode = arity * 47;
+		for (TypeSerializer<?> ser : this.fieldSerializers) {
+			hashCode = (hashCode << 7) | (hashCode >>> -7);
+			hashCode += ser.hashCode();
+		}
+		return hashCode;
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj instanceof TupleSerializerBase) {
+			TupleSerializerBase<?> otherTS = (TupleSerializerBase<?>) obj;
+			return (otherTS.tupleClass == this.tupleClass) && 
+					Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
+		}
+		else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
index 3ca3831..a912844 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
@@ -46,8 +46,11 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
 	private transient T tempReference;
 	
 	private transient Kryo kryo;
-	
-	
+
+	private final Comparable[] extractedKey = new Comparable[1];
+
+	private final TypeComparator[] comparators = new TypeComparator[] {this};
+
 	public ValueComparator(boolean ascending, Class<T> type) {
 		this.type = type;
 		this.ascendingComparison = ascending;
@@ -83,7 +86,7 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
 	}
 	
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		if (reference == null) {
 			reference = InstantiationUtil.instantiate(type, Value.class);
 		}
@@ -140,6 +143,17 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
 			this.kryo.register(type);
 		}
 	}
+
+	@Override
+	public Object[] extractKeys(T record) {
+		extractedKey[0] = record;
+		return extractedKey;
+	}
+
+	@Override
+	public TypeComparator[] getComparators() {
+		return comparators;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	// unsupported normalization

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
index 6d983b3..49774dc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
@@ -43,7 +43,11 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
 	private transient T tempReference;
 	
 	private transient Kryo kryo;
-	
+
+	private final Comparable[] extractedKey = new Comparable[1];
+
+	private final TypeComparator[] comparators = new TypeComparator[] {this};
+
 	public WritableComparator(boolean ascending, Class<T> type) {
 		this.type = type;
 		this.ascendingComparison = ascending;
@@ -79,7 +83,7 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
 	}
 	
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		ensureReferenceInstantiated();
 		ensureTempReferenceInstantiated();
 		
@@ -123,6 +127,16 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
 	public TypeComparator<T> duplicate() {
 		return new WritableComparator<T>(ascendingComparison, type);
 	}
+
+	@Override
+	public Object[] extractKeys(T record) {
+		extractedKey[0] = record;
+		return extractedKey;
+	}
+
+	@Override public TypeComparator[] getComparators() {
+		return comparators;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	// unsupported normalization

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java
index 6591502..bf8d56c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java
@@ -264,7 +264,7 @@ public final class RecordComparator extends TypeComparator<Record> {
 	}
 	
 	@Override
-	public int compare(DataInputView source1, DataInputView source2) throws IOException {
+	public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
 		this.temp1.read(source1);
 		this.temp2.read(source2);
 		
@@ -391,6 +391,18 @@ public final class RecordComparator extends TypeComparator<Record> {
 	}
 
 	@Override
+	public Object[] extractKeys(Record record) {
+		throw new UnsupportedOperationException("Record does not support extactKeys and " +
+				"getComparators. This cannot be used with the GenericPairComparator.");
+	}
+
+	@Override
+	public TypeComparator[] getComparators() {
+		throw new UnsupportedOperationException("Record does not support extactKeys and " +
+				"getComparators. This cannot be used with the GenericPairComparator.");
+	}
+
+	@Override
 	public boolean supportsCompareAgainstReference() {
 		return true;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
index 0e06226..1c7b6a9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
@@ -146,7 +146,7 @@ public class TypeInfoParserTest {
 		Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, ((TupleTypeInfo<?>)ti).getTypeAt(1));
 		
 		ti = TypeInfoParser.parse("Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>");
-		Assert.assertEquals("Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>", ti.toString());
+		Assert.assertEquals("Java Tuple3<Java Tuple1<String>, Java Tuple1<Integer>, Java Tuple2<Long, Long>>", ti.toString());
 	}
 	
 	@Test
@@ -190,13 +190,13 @@ public class TypeInfoParserTest {
 		Assert.assertTrue(((ObjectArrayTypeInfo<?, ?>) ti2).getComponentInfo() instanceof TupleTypeInfo);
 		
 		TypeInformation<?> ti3 = TypeInfoParser.parse("Tuple2<Integer[],Double>[]");
-		Assert.assertEquals("ObjectArrayTypeInfo<Tuple2<BasicArrayTypeInfo<Integer>, Double>>", ti3.toString());
+		Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple2<BasicArrayTypeInfo<Integer>, Double>>", ti3.toString());
 	}
 	
 	@Test
 	public void testLargeMixedTuple() {
 		TypeInformation<?> ti = TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple4<Double,java.lang.Class[],StringValue,Tuple1<int>>[]");
-		Assert.assertEquals("ObjectArrayTypeInfo<Tuple4<Double, ObjectArrayTypeInfo<GenericType<java.lang.Class>>, ValueType<org.apache.flink.types.StringValue>, Tuple1<Integer>>>", ti.toString());
+		Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple4<Double, ObjectArrayTypeInfo<GenericType<java.lang.Class>>, ValueType<org.apache.flink.types.StringValue>, Java Tuple1<Integer>>>", ti.toString());
 	}
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java
new file mode 100644
index 0000000..500d8db
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+
+import org.apache.flink.api.java.typeutils.runtime.tuple.base.TuplePairComparatorTestBase;
+
+public class GenericPairComparatorTest extends TuplePairComparatorTestBase<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> {
+
+	@SuppressWarnings("unchecked")
+	private Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
+		new Tuple3<Integer, String, Double>(4, "hello", 20.0),
+		new Tuple3<Integer, String, Double>(4, "world", 23.2),
+		new Tuple3<Integer, String, Double>(5, "hello", 18.0),
+		new Tuple3<Integer, String, Double>(5, "world", 19.2),
+		new Tuple3<Integer, String, Double>(6, "hello", 16.0),
+		new Tuple3<Integer, String, Double>(6, "world", 17.2),
+		new Tuple3<Integer, String, Double>(7, "hello", 14.0),
+		new Tuple3<Integer, String, Double>(7, "world", 15.2)
+	};
+
+	@SuppressWarnings("unchecked")
+	private Tuple4<Integer, Float, Long, Double>[] dataIDL = new Tuple4[]{
+		new Tuple4<Integer, Float, Long, Double>(4, 0.11f, 14L, 20.0),
+		new Tuple4<Integer, Float, Long, Double>(4, 0.221f, 15L, 23.2),
+		new Tuple4<Integer, Float, Long, Double>(5, 0.33f, 15L, 18.0),
+		new Tuple4<Integer, Float, Long, Double>(5, 0.44f, 20L, 19.2),
+		new Tuple4<Integer, Float, Long, Double>(6, 0.55f, 20L, 16.0),
+		new Tuple4<Integer, Float, Long, Double>(6, 0.66f, 29L, 17.2),
+		new Tuple4<Integer, Float, Long, Double>(7, 0.77f, 29L, 14.0),
+		new Tuple4<Integer, Float, Long, Double>(7, 0.88f, 34L, 15.2)
+	};
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected GenericPairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> createComparator(boolean ascending) {
+		int[] fields1 = new int[]{0, 2};
+		int[] fields2 = new int[]{0, 3};
+		TypeComparator[] comps1 = new TypeComparator[]{
+				new IntComparator(ascending),
+				new DoubleComparator(ascending)
+		};
+		TypeComparator[] comps2 = new TypeComparator[]{
+				new IntComparator(ascending),
+				new DoubleComparator(ascending)
+		};
+		TypeSerializer[] sers1 = new TypeSerializer[]{
+				IntSerializer.INSTANCE,
+				DoubleSerializer.INSTANCE
+		};
+		TypeSerializer[] sers2= new TypeSerializer[]{
+				IntSerializer.INSTANCE,
+				DoubleSerializer.INSTANCE
+		};
+		TypeComparator<Tuple3<Integer, String, Double>> comp1 = new TupleComparator<Tuple3<Integer, String, Double>>(fields1, comps1, sers1);
+		TypeComparator<Tuple4<Integer, Float, Long, Double>> comp2 = new TupleComparator<Tuple4<Integer, Float, Long, Double>>(fields2, comps2, sers2);
+		return new GenericPairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>>(comp1, comp2);
+	}
+
+	@Override
+	protected Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]> getSortedTestData() {
+		return new Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]>(dataISD, dataIDL);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparatorTest.java
deleted file mode 100644
index 0d9c29d..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparatorTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleLeadingFieldComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-
-public class TupleLeadingFieldComparatorTest extends ComparatorTestBase<Tuple3<Integer, String, Double>> {
-
-	@SuppressWarnings("unchecked")
-	Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
-		new Tuple3<Integer, String, Double>(4, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(5, "hello", 23.2),
-		new Tuple3<Integer, String, Double>(6, "world", 20.0),
-		new Tuple3<Integer, String, Double>(7, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(8, "hello", 23.2),
-		new Tuple3<Integer, String, Double>(9, "world", 20.0),
-		new Tuple3<Integer, String, Double>(10, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(11, "hello", 23.2)
-	};
-
-	@Override
-	protected TupleLeadingFieldComparator<Tuple3<Integer, String, Double>, Integer> createComparator(boolean ascending) {
-		return new TupleLeadingFieldComparator<Tuple3<Integer,String,Double>, Integer>(new IntComparator(ascending));
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected TupleSerializer<Tuple3<Integer, String, Double>> createSerializer() {
-		return new TupleSerializer<Tuple3<Integer, String, Double>>(
-				(Class<Tuple3<Integer, String, Double>>) (Class<?>) Tuple3.class,
-				new TypeSerializer[]{
-					new IntSerializer(),
-					new StringSerializer(),
-					new DoubleSerializer()});
-	}
-
-	@Override
-	protected Tuple3<Integer, String, Double>[] getSortedTestData() {
-		return dataISD;
-	}
-
-	@Override
-	protected void deepEquals(String message, Tuple3<Integer, String, Double> should, Tuple3<Integer, String, Double> is) {
-		for (int x = 0; x < should.getArity(); x++) {
-			assertEquals(should.getField(x), is.getField(x));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparatorTest.java
deleted file mode 100644
index 9847e76..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparatorTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.typeutils.runtime.TupleLeadingFieldPairComparator;
-
-import org.apache.flink.api.java.typeutils.runtime.tuple.base.TuplePairComparatorTestBase;
-
-public class TupleLeadingFieldPairComparatorTest extends TuplePairComparatorTestBase<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> {
-
-	@SuppressWarnings("unchecked")
-	private Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
-		new Tuple3<Integer, String, Double>(4, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(5, "world", 23.2),
-		new Tuple3<Integer, String, Double>(6, "hello", 18.0),
-		new Tuple3<Integer, String, Double>(7, "world", 19.2),
-		new Tuple3<Integer, String, Double>(8, "hello", 16.0),
-		new Tuple3<Integer, String, Double>(9, "world", 17.2),
-		new Tuple3<Integer, String, Double>(10, "hello", 14.0),
-		new Tuple3<Integer, String, Double>(11, "world", 15.2)
-	};
-
-	@SuppressWarnings("unchecked")
-	private Tuple4<Integer, Float, Long, Double>[] dataIDL = new Tuple4[]{
-		new Tuple4<Integer, Float, Long, Double>(4, 0.11f, 14L, 20.0),
-		new Tuple4<Integer, Float, Long, Double>(5, 0.221f, 15L, 23.2),
-		new Tuple4<Integer, Float, Long, Double>(6, 0.33f, 15L, 18.0),
-		new Tuple4<Integer, Float, Long, Double>(7, 0.44f, 20L, 19.2),
-		new Tuple4<Integer, Float, Long, Double>(8, 0.55f, 20L, 16.0),
-		new Tuple4<Integer, Float, Long, Double>(9, 0.66f, 29L, 17.2),
-		new Tuple4<Integer, Float, Long, Double>(10, 0.77f, 29L, 14.0),
-		new Tuple4<Integer, Float, Long, Double>(11, 0.88f, 34L, 15.2)
-	};
-
-	@Override
-	protected TypePairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> createComparator(boolean ascending) {
-		return new TupleLeadingFieldPairComparator<Integer, Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>>(
-				new IntComparator(ascending), new IntComparator(ascending));
-	}
-
-	@Override
-	protected Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]> getSortedTestData() {
-		return new Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]>(dataISD, dataIDL);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparatorTest.java
deleted file mode 100644
index 1d7dccd..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparatorTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.typeutils.runtime.TuplePairComparator;
-
-import org.apache.flink.api.java.typeutils.runtime.tuple.base.TuplePairComparatorTestBase;
-
-public class TuplePairComparatorTest extends TuplePairComparatorTestBase<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> {
-
-	@SuppressWarnings("unchecked")
-	private Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
-		new Tuple3<Integer, String, Double>(4, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(4, "world", 23.2),
-		new Tuple3<Integer, String, Double>(5, "hello", 18.0),
-		new Tuple3<Integer, String, Double>(5, "world", 19.2),
-		new Tuple3<Integer, String, Double>(6, "hello", 16.0),
-		new Tuple3<Integer, String, Double>(6, "world", 17.2),
-		new Tuple3<Integer, String, Double>(7, "hello", 14.0),
-		new Tuple3<Integer, String, Double>(7, "world", 15.2)
-	};
-
-	@SuppressWarnings("unchecked")
-	private Tuple4<Integer, Float, Long, Double>[] dataIDL = new Tuple4[]{
-		new Tuple4<Integer, Float, Long, Double>(4, 0.11f, 14L, 20.0),
-		new Tuple4<Integer, Float, Long, Double>(4, 0.221f, 15L, 23.2),
-		new Tuple4<Integer, Float, Long, Double>(5, 0.33f, 15L, 18.0),
-		new Tuple4<Integer, Float, Long, Double>(5, 0.44f, 20L, 19.2),
-		new Tuple4<Integer, Float, Long, Double>(6, 0.55f, 20L, 16.0),
-		new Tuple4<Integer, Float, Long, Double>(6, 0.66f, 29L, 17.2),
-		new Tuple4<Integer, Float, Long, Double>(7, 0.77f, 29L, 14.0),
-		new Tuple4<Integer, Float, Long, Double>(7, 0.88f, 34L, 15.2)
-	};
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected TuplePairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> createComparator(boolean ascending) {
-		return new TuplePairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>>(
-				new int[]{0, 2},
-				new int[]{0, 3},
-				new TypeComparator[]{
-					new IntComparator(ascending),
-					new DoubleComparator(ascending)
-				},
-				new TypeComparator[]{
-					new IntComparator(ascending),
-					new DoubleComparator(ascending)
-				}
-		);
-	}
-
-	@Override
-	protected Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]> getSortedTestData() {
-		return new Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]>(dataISD, dataIDL);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index 6db9035..34226a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -316,7 +316,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T>
 		this.recordBufferForComparison.setReadPosition(pointer2);
 		
 		try {
-			return this.comparator.compare(this.recordBuffer, this.recordBufferForComparison);
+			return this.comparator.compareSerialized(this.recordBuffer, this.recordBufferForComparison);
 		} catch (IOException ioex) {
 			throw new RuntimeException("Error comparing two records.", ioex);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListComparator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListComparator.java
index 53e596c..11064fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListComparator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListComparator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.testutils.types;
 import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
@@ -31,6 +32,10 @@ public class IntListComparator extends TypeComparator<IntList> {
 	
 	private int reference;
 
+	private Comparable[] extractedKey = new Comparable[1];
+
+	private final TypeComparator[] comparators = new TypeComparator[] {new IntComparator(true)};
+
 	@Override
 	public int hash(IntList record) {
 		return record.getKey() * 73;
@@ -58,7 +63,7 @@ public class IntListComparator extends TypeComparator<IntList> {
 	}
 
 	@Override
-	public int compare(DataInputView source1, DataInputView source2) throws IOException {
+	public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
 		return source1.readInt() - source2.readInt();
 	}
 
@@ -134,4 +139,15 @@ public class IntListComparator extends TypeComparator<IntList> {
 	public TypeComparator<IntList> duplicate() {
 		return new IntListComparator();
 	}
+
+	@Override
+	public Object[] extractKeys(IntList record) {
+		extractedKey[0] = record.getKey();
+		return extractedKey;
+	}
+
+	@Override public TypeComparator[] getComparators() {
+		return comparators;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairComparator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairComparator.java
index f5d31d6..53c76fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairComparator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairComparator.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.operators.testutils.types;
 import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
@@ -32,7 +33,11 @@ public class IntPairComparator extends TypeComparator<IntPair> {
 	private static final long serialVersionUID = 1L;
 	
 	private int reference;
-	
+
+	private final Comparable[] extractedKey = new Comparable[1];
+
+	private final TypeComparator[] comparators = new TypeComparator[] {new IntComparator(true)};
+
 	@Override
 	public int hash(IntPair object) {
 		return object.getKey() * 73;
@@ -60,7 +65,7 @@ public class IntPairComparator extends TypeComparator<IntPair> {
 	}
 	
 	@Override
-	public int compare(DataInputView source1, DataInputView source2) throws IOException {
+	public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
 		return source1.readInt() - source2.readInt();
 	}
 
@@ -113,6 +118,15 @@ public class IntPairComparator extends TypeComparator<IntPair> {
 	}
 
 	@Override
+	public Object[] extractKeys(IntPair pair) {
+		extractedKey[0] = pair.getKey();
+		return extractedKey;
+	}
+	@Override public TypeComparator[] getComparators() {
+		return comparators;
+	}
+
+	@Override
 	public boolean supportsSerializationWithKeyNormalization() {
 		return true;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairComparator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairComparator.java
index 4397ba5..66c87bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairComparator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairComparator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.testutils.types;
 import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
@@ -32,6 +33,10 @@ public class StringPairComparator extends TypeComparator<StringPair> {
 	
 	private String reference;
 
+	private Comparable[] extractedKey = new Comparable[1];
+
+	private final TypeComparator[] comparators = new TypeComparator[] {new StringComparator(true)};
+
 	@Override
 	public int hash(StringPair record) {
 		return record.getKey().hashCode();
@@ -58,7 +63,7 @@ public class StringPairComparator extends TypeComparator<StringPair> {
 	}
 
 	@Override
-	public int compare(DataInputView firstSource, DataInputView secondSource)
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource)
 			throws IOException {
 		return StringValue.readString(firstSource).compareTo(StringValue.readString(secondSource));
 	}
@@ -110,4 +115,14 @@ public class StringPairComparator extends TypeComparator<StringPair> {
 	public TypeComparator<StringPair> duplicate() {
 		return new StringPairComparator();
 	}
+
+	@Override
+	public Object[] extractKeys(StringPair record) {
+		extractedKey[0] = record.getKey();
+		return extractedKey;
+	}
+
+	@Override public TypeComparator[] getComparators() {
+		return comparators;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
index f109ca9..f55786c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
@@ -27,6 +27,7 @@ import java.io.PipedOutputStream;
 
 import junit.framework.TestCase;
 
+import org.apache.flink.api.common.typeutils.base.IntComparator;
 import org.junit.Assert;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -381,6 +382,9 @@ public class OutputEmitterTest extends TestCase {
 	
 	@SuppressWarnings("serial")
 	private static class TestIntComparator extends TypeComparator<Integer> {
+		private final Comparable[] extractedKey = new Comparable[1];
+
+		private TypeComparator[] comparators = new TypeComparator[]{new IntComparator(true)};
 
 		@Override
 		public int hash(Integer record) {
@@ -402,7 +406,7 @@ public class OutputEmitterTest extends TestCase {
 		public int compare(Integer first, Integer second) { throw new UnsupportedOperationException(); }
 
 		@Override
-		public int compare(DataInputView firstSource, DataInputView secondSource) {
+		public int compareSerialized(DataInputView firstSource, DataInputView secondSource) {
 			throw new UnsupportedOperationException();
 		}
 
@@ -438,7 +442,17 @@ public class OutputEmitterTest extends TestCase {
 
 		@Override
 		public TypeComparator<Integer> duplicate() { throw new UnsupportedOperationException(); }
-		
+
+		@Override
+		public Object[] extractKeys(Integer record) {
+			extractedKey[0] = record;
+			return extractedKey;
+		}
+
+		@Override
+		public TypeComparator[] getComparators() {
+			return comparators;
+		}
 	}
 	
 //	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
index 0322024..5b32e12 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
 import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
@@ -30,7 +31,11 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert
 	private static final long serialVersionUID = 1L;
 	
 	private long reference;
-	
+
+	private Comparable[] extractedKey = new Comparable[1];
+
+	private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
+
 	@Override
 	public int hash(VertexWithAdjacencyList record) {
 		final long value = record.getVertexID();
@@ -61,7 +66,7 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert
 	}
 
 	@Override
-	public int compare(DataInputView source1, DataInputView source2) throws IOException {
+	public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
 		final long diff = source1.readLong() - source2.readLong();
 		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
 	}
@@ -129,4 +134,15 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert
 	public VertexWithAdjacencyListComparator duplicate() {
 		return new VertexWithAdjacencyListComparator();
 	}
+
+	@Override
+	public Object[] extractKeys(VertexWithAdjacencyList record) {
+		extractedKey[0] = record.getVertexID();
+		return extractedKey;
+	}
+
+	@Override
+	public TypeComparator[] getComparators() {
+		return comparators;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
index 5aa81ea..e46c36a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
 import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
@@ -30,7 +31,11 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve
 	private static final long serialVersionUID = 1L;
 	
 	private long reference;
-	
+
+	private Comparable[] extractedKey = new Comparable[1];
+
+	private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
+
 	@Override
 	public int hash(VertexWithRankAndDangling record) {
 		final long value = record.getVertexID();
@@ -61,7 +66,7 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve
 	}
 
 	@Override
-	public int compare(DataInputView source1, DataInputView source2) throws IOException {
+	public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
 		final long diff = source1.readLong() - source2.readLong();
 		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
 	}
@@ -134,4 +139,15 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve
 	public VertexWithRankAndDanglingComparator duplicate() {
 		return new VertexWithRankAndDanglingComparator();
 	}
+
+	@Override
+	public Object[] extractKeys(VertexWithRankAndDangling record) {
+		extractedKey[0] = record.getVertexID();
+		return extractedKey;
+	}
+
+	@Override
+	public TypeComparator[] getComparators() {
+		return comparators;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
index 0d94c21..77a6a97 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
 import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
@@ -30,7 +31,11 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan
 	private static final long serialVersionUID = 1L;
 	
 	private long reference;
-	
+
+	private Comparable[] extractedKey = new Comparable[1];
+
+	private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
+
 	@Override
 	public int hash(VertexWithRank record) {
 		final long value = record.getVertexID();
@@ -61,7 +66,7 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan
 	}
 
 	@Override
-	public int compare(DataInputView source1, DataInputView source2) throws IOException {
+	public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
 		final long diff = source1.readLong() - source2.readLong();
 		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
 	}
@@ -132,4 +137,15 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan
 	public VertexWithRankComparator duplicate() {
 		return new VertexWithRankComparator();
 	}
+
+	@Override
+	public Object[] extractKeys(VertexWithRank record) {
+		extractedKey[0] = record.getVertexID();
+		return extractedKey;
+	}
+
+	@Override
+	public TypeComparator[] getComparators() {
+		return comparators;
+	}
 }