You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:02 UTC
[20/60] Refactor TupleTypeInfo and add GenericPairComparator
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 1c777fc..58a9a2a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -19,7 +19,6 @@
package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
-import java.util.Arrays;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
@@ -28,49 +27,16 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.NullFieldException;
-public final class TupleSerializer<T extends Tuple> extends TypeSerializer<T> {
+public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> {
private static final long serialVersionUID = 1L;
-
- private final Class<T> tupleClass;
-
- private final TypeSerializer<Object>[] fieldSerializers;
-
- private final int arity;
-
- private final boolean stateful;
-
-
@SuppressWarnings("unchecked")
public TupleSerializer(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
- this.tupleClass = tupleClass;
- this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
- this.arity = fieldSerializers.length;
-
- boolean stateful = false;
- for (TypeSerializer<?> ser : fieldSerializers) {
- if (ser.isStateful()) {
- stateful = true;
- break;
- }
- }
- this.stateful = stateful;
- }
-
-
- @Override
- public boolean isImmutableType() {
- return false;
+ super(tupleClass, fieldSerializers);
}
@Override
- public boolean isStateful() {
- return this.stateful;
- }
-
-
- @Override
public T createInstance() {
try {
T t = tupleClass.newInstance();
@@ -97,12 +63,6 @@ public final class TupleSerializer<T extends Tuple> extends TypeSerializer<T> {
}
@Override
- public int getLength() {
- return -1;
- }
-
-
- @Override
public void serialize(T value, DataOutputView target) throws IOException {
for (int i = 0; i < arity; i++) {
Object o = value.getField(i);
@@ -122,33 +82,4 @@ public final class TupleSerializer<T extends Tuple> extends TypeSerializer<T> {
}
return reuse;
}
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- for (int i = 0; i < arity; i++) {
- fieldSerializers[i].copy(source, target);
- }
- }
-
- @Override
- public int hashCode() {
- int hashCode = arity * 47;
- for (TypeSerializer<?> ser : this.fieldSerializers) {
- hashCode = (hashCode << 7) | (hashCode >>> -7);
- hashCode += ser.hashCode();
- }
- return hashCode;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj != null && obj instanceof TupleSerializer) {
- TupleSerializer<?> otherTS = (TupleSerializer<?>) obj;
- return (otherTS.tupleClass == this.tupleClass) &&
- Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
- }
- else {
- return false;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
new file mode 100644
index 0000000..69133b6
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+
+public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
+
+ protected final Class<T> tupleClass;
+
+ protected final TypeSerializer<Object>[] fieldSerializers;
+
+ protected final int arity;
+
+ protected final boolean stateful;
+
+
+ @SuppressWarnings("unchecked")
+ public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
+ this.tupleClass = tupleClass;
+ this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
+ this.arity = fieldSerializers.length;
+
+ boolean stateful = false;
+ for (TypeSerializer<?> ser : fieldSerializers) {
+ if (ser.isStateful()) {
+ stateful = true;
+ break;
+ }
+ }
+ this.stateful = stateful;
+ }
+
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public boolean isStateful() {
+ return this.stateful;
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ for (int i = 0; i < arity; i++) {
+ fieldSerializers[i].copy(source, target);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = arity * 47;
+ for (TypeSerializer<?> ser : this.fieldSerializers) {
+ hashCode = (hashCode << 7) | (hashCode >>> -7);
+ hashCode += ser.hashCode();
+ }
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj instanceof TupleSerializerBase) {
+ TupleSerializerBase<?> otherTS = (TupleSerializerBase<?>) obj;
+ return (otherTS.tupleClass == this.tupleClass) &&
+ Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
+ }
+ else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
index 3ca3831..a912844 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
@@ -46,8 +46,11 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
private transient T tempReference;
private transient Kryo kryo;
-
-
+
+ private final Comparable[] extractedKey = new Comparable[1];
+
+ private final TypeComparator[] comparators = new TypeComparator[] {this};
+
public ValueComparator(boolean ascending, Class<T> type) {
this.type = type;
this.ascendingComparison = ascending;
@@ -83,7 +86,7 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
}
@Override
- public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
if (reference == null) {
reference = InstantiationUtil.instantiate(type, Value.class);
}
@@ -140,6 +143,17 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
this.kryo.register(type);
}
}
+
+ @Override
+ public Object[] extractKeys(T record) {
+ extractedKey[0] = record;
+ return extractedKey;
+ }
+
+ @Override
+ public TypeComparator[] getComparators() {
+ return comparators;
+ }
// --------------------------------------------------------------------------------------------
// unsupported normalization
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
index 6d983b3..49774dc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
@@ -43,7 +43,11 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
private transient T tempReference;
private transient Kryo kryo;
-
+
+ private final Comparable[] extractedKey = new Comparable[1];
+
+ private final TypeComparator[] comparators = new TypeComparator[] {this};
+
public WritableComparator(boolean ascending, Class<T> type) {
this.type = type;
this.ascendingComparison = ascending;
@@ -79,7 +83,7 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
}
@Override
- public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
ensureReferenceInstantiated();
ensureTempReferenceInstantiated();
@@ -123,6 +127,16 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
public TypeComparator<T> duplicate() {
return new WritableComparator<T>(ascendingComparison, type);
}
+
+ @Override
+ public Object[] extractKeys(T record) {
+ extractedKey[0] = record;
+ return extractedKey;
+ }
+
+ @Override public TypeComparator[] getComparators() {
+ return comparators;
+ }
// --------------------------------------------------------------------------------------------
// unsupported normalization
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java
index 6591502..bf8d56c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/record/RecordComparator.java
@@ -264,7 +264,7 @@ public final class RecordComparator extends TypeComparator<Record> {
}
@Override
- public int compare(DataInputView source1, DataInputView source2) throws IOException {
+ public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
this.temp1.read(source1);
this.temp2.read(source2);
@@ -391,6 +391,18 @@ public final class RecordComparator extends TypeComparator<Record> {
}
@Override
+ public Object[] extractKeys(Record record) {
+ throw new UnsupportedOperationException("Record does not support extactKeys and " +
+ "getComparators. This cannot be used with the GenericPairComparator.");
+ }
+
+ @Override
+ public TypeComparator[] getComparators() {
+ throw new UnsupportedOperationException("Record does not support extactKeys and " +
+ "getComparators. This cannot be used with the GenericPairComparator.");
+ }
+
+ @Override
public boolean supportsCompareAgainstReference() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
index 0e06226..1c7b6a9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
@@ -146,7 +146,7 @@ public class TypeInfoParserTest {
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, ((TupleTypeInfo<?>)ti).getTypeAt(1));
ti = TypeInfoParser.parse("Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>");
- Assert.assertEquals("Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>", ti.toString());
+ Assert.assertEquals("Java Tuple3<Java Tuple1<String>, Java Tuple1<Integer>, Java Tuple2<Long, Long>>", ti.toString());
}
@Test
@@ -190,13 +190,13 @@ public class TypeInfoParserTest {
Assert.assertTrue(((ObjectArrayTypeInfo<?, ?>) ti2).getComponentInfo() instanceof TupleTypeInfo);
TypeInformation<?> ti3 = TypeInfoParser.parse("Tuple2<Integer[],Double>[]");
- Assert.assertEquals("ObjectArrayTypeInfo<Tuple2<BasicArrayTypeInfo<Integer>, Double>>", ti3.toString());
+ Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple2<BasicArrayTypeInfo<Integer>, Double>>", ti3.toString());
}
@Test
public void testLargeMixedTuple() {
TypeInformation<?> ti = TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple4<Double,java.lang.Class[],StringValue,Tuple1<int>>[]");
- Assert.assertEquals("ObjectArrayTypeInfo<Tuple4<Double, ObjectArrayTypeInfo<GenericType<java.lang.Class>>, ValueType<org.apache.flink.types.StringValue>, Tuple1<Integer>>>", ti.toString());
+ Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple4<Double, ObjectArrayTypeInfo<GenericType<java.lang.Class>>, ValueType<org.apache.flink.types.StringValue>, Java Tuple1<Integer>>>", ti.toString());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java
new file mode 100644
index 0000000..500d8db
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+
+import org.apache.flink.api.java.typeutils.runtime.tuple.base.TuplePairComparatorTestBase;
+
+public class GenericPairComparatorTest extends TuplePairComparatorTestBase<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> {
+
+ @SuppressWarnings("unchecked")
+ private Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
+ new Tuple3<Integer, String, Double>(4, "hello", 20.0),
+ new Tuple3<Integer, String, Double>(4, "world", 23.2),
+ new Tuple3<Integer, String, Double>(5, "hello", 18.0),
+ new Tuple3<Integer, String, Double>(5, "world", 19.2),
+ new Tuple3<Integer, String, Double>(6, "hello", 16.0),
+ new Tuple3<Integer, String, Double>(6, "world", 17.2),
+ new Tuple3<Integer, String, Double>(7, "hello", 14.0),
+ new Tuple3<Integer, String, Double>(7, "world", 15.2)
+ };
+
+ @SuppressWarnings("unchecked")
+ private Tuple4<Integer, Float, Long, Double>[] dataIDL = new Tuple4[]{
+ new Tuple4<Integer, Float, Long, Double>(4, 0.11f, 14L, 20.0),
+ new Tuple4<Integer, Float, Long, Double>(4, 0.221f, 15L, 23.2),
+ new Tuple4<Integer, Float, Long, Double>(5, 0.33f, 15L, 18.0),
+ new Tuple4<Integer, Float, Long, Double>(5, 0.44f, 20L, 19.2),
+ new Tuple4<Integer, Float, Long, Double>(6, 0.55f, 20L, 16.0),
+ new Tuple4<Integer, Float, Long, Double>(6, 0.66f, 29L, 17.2),
+ new Tuple4<Integer, Float, Long, Double>(7, 0.77f, 29L, 14.0),
+ new Tuple4<Integer, Float, Long, Double>(7, 0.88f, 34L, 15.2)
+ };
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected GenericPairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> createComparator(boolean ascending) {
+ int[] fields1 = new int[]{0, 2};
+ int[] fields2 = new int[]{0, 3};
+ TypeComparator[] comps1 = new TypeComparator[]{
+ new IntComparator(ascending),
+ new DoubleComparator(ascending)
+ };
+ TypeComparator[] comps2 = new TypeComparator[]{
+ new IntComparator(ascending),
+ new DoubleComparator(ascending)
+ };
+ TypeSerializer[] sers1 = new TypeSerializer[]{
+ IntSerializer.INSTANCE,
+ DoubleSerializer.INSTANCE
+ };
+ TypeSerializer[] sers2= new TypeSerializer[]{
+ IntSerializer.INSTANCE,
+ DoubleSerializer.INSTANCE
+ };
+ TypeComparator<Tuple3<Integer, String, Double>> comp1 = new TupleComparator<Tuple3<Integer, String, Double>>(fields1, comps1, sers1);
+ TypeComparator<Tuple4<Integer, Float, Long, Double>> comp2 = new TupleComparator<Tuple4<Integer, Float, Long, Double>>(fields2, comps2, sers2);
+ return new GenericPairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>>(comp1, comp2);
+ }
+
+ @Override
+ protected Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]> getSortedTestData() {
+ return new Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]>(dataISD, dataIDL);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparatorTest.java
deleted file mode 100644
index 0d9c29d..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldComparatorTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleLeadingFieldComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-
-public class TupleLeadingFieldComparatorTest extends ComparatorTestBase<Tuple3<Integer, String, Double>> {
-
- @SuppressWarnings("unchecked")
- Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
- new Tuple3<Integer, String, Double>(4, "hello", 20.0),
- new Tuple3<Integer, String, Double>(5, "hello", 23.2),
- new Tuple3<Integer, String, Double>(6, "world", 20.0),
- new Tuple3<Integer, String, Double>(7, "hello", 20.0),
- new Tuple3<Integer, String, Double>(8, "hello", 23.2),
- new Tuple3<Integer, String, Double>(9, "world", 20.0),
- new Tuple3<Integer, String, Double>(10, "hello", 20.0),
- new Tuple3<Integer, String, Double>(11, "hello", 23.2)
- };
-
- @Override
- protected TupleLeadingFieldComparator<Tuple3<Integer, String, Double>, Integer> createComparator(boolean ascending) {
- return new TupleLeadingFieldComparator<Tuple3<Integer,String,Double>, Integer>(new IntComparator(ascending));
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected TupleSerializer<Tuple3<Integer, String, Double>> createSerializer() {
- return new TupleSerializer<Tuple3<Integer, String, Double>>(
- (Class<Tuple3<Integer, String, Double>>) (Class<?>) Tuple3.class,
- new TypeSerializer[]{
- new IntSerializer(),
- new StringSerializer(),
- new DoubleSerializer()});
- }
-
- @Override
- protected Tuple3<Integer, String, Double>[] getSortedTestData() {
- return dataISD;
- }
-
- @Override
- protected void deepEquals(String message, Tuple3<Integer, String, Double> should, Tuple3<Integer, String, Double> is) {
- for (int x = 0; x < should.getArity(); x++) {
- assertEquals(should.getField(x), is.getField(x));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparatorTest.java
deleted file mode 100644
index 9847e76..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleLeadingFieldPairComparatorTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.typeutils.runtime.TupleLeadingFieldPairComparator;
-
-import org.apache.flink.api.java.typeutils.runtime.tuple.base.TuplePairComparatorTestBase;
-
-public class TupleLeadingFieldPairComparatorTest extends TuplePairComparatorTestBase<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> {
-
- @SuppressWarnings("unchecked")
- private Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
- new Tuple3<Integer, String, Double>(4, "hello", 20.0),
- new Tuple3<Integer, String, Double>(5, "world", 23.2),
- new Tuple3<Integer, String, Double>(6, "hello", 18.0),
- new Tuple3<Integer, String, Double>(7, "world", 19.2),
- new Tuple3<Integer, String, Double>(8, "hello", 16.0),
- new Tuple3<Integer, String, Double>(9, "world", 17.2),
- new Tuple3<Integer, String, Double>(10, "hello", 14.0),
- new Tuple3<Integer, String, Double>(11, "world", 15.2)
- };
-
- @SuppressWarnings("unchecked")
- private Tuple4<Integer, Float, Long, Double>[] dataIDL = new Tuple4[]{
- new Tuple4<Integer, Float, Long, Double>(4, 0.11f, 14L, 20.0),
- new Tuple4<Integer, Float, Long, Double>(5, 0.221f, 15L, 23.2),
- new Tuple4<Integer, Float, Long, Double>(6, 0.33f, 15L, 18.0),
- new Tuple4<Integer, Float, Long, Double>(7, 0.44f, 20L, 19.2),
- new Tuple4<Integer, Float, Long, Double>(8, 0.55f, 20L, 16.0),
- new Tuple4<Integer, Float, Long, Double>(9, 0.66f, 29L, 17.2),
- new Tuple4<Integer, Float, Long, Double>(10, 0.77f, 29L, 14.0),
- new Tuple4<Integer, Float, Long, Double>(11, 0.88f, 34L, 15.2)
- };
-
- @Override
- protected TypePairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> createComparator(boolean ascending) {
- return new TupleLeadingFieldPairComparator<Integer, Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>>(
- new IntComparator(ascending), new IntComparator(ascending));
- }
-
- @Override
- protected Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]> getSortedTestData() {
- return new Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]>(dataISD, dataIDL);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparatorTest.java
deleted file mode 100644
index 1d7dccd..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TuplePairComparatorTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.typeutils.runtime.TuplePairComparator;
-
-import org.apache.flink.api.java.typeutils.runtime.tuple.base.TuplePairComparatorTestBase;
-
-public class TuplePairComparatorTest extends TuplePairComparatorTestBase<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> {
-
- @SuppressWarnings("unchecked")
- private Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
- new Tuple3<Integer, String, Double>(4, "hello", 20.0),
- new Tuple3<Integer, String, Double>(4, "world", 23.2),
- new Tuple3<Integer, String, Double>(5, "hello", 18.0),
- new Tuple3<Integer, String, Double>(5, "world", 19.2),
- new Tuple3<Integer, String, Double>(6, "hello", 16.0),
- new Tuple3<Integer, String, Double>(6, "world", 17.2),
- new Tuple3<Integer, String, Double>(7, "hello", 14.0),
- new Tuple3<Integer, String, Double>(7, "world", 15.2)
- };
-
- @SuppressWarnings("unchecked")
- private Tuple4<Integer, Float, Long, Double>[] dataIDL = new Tuple4[]{
- new Tuple4<Integer, Float, Long, Double>(4, 0.11f, 14L, 20.0),
- new Tuple4<Integer, Float, Long, Double>(4, 0.221f, 15L, 23.2),
- new Tuple4<Integer, Float, Long, Double>(5, 0.33f, 15L, 18.0),
- new Tuple4<Integer, Float, Long, Double>(5, 0.44f, 20L, 19.2),
- new Tuple4<Integer, Float, Long, Double>(6, 0.55f, 20L, 16.0),
- new Tuple4<Integer, Float, Long, Double>(6, 0.66f, 29L, 17.2),
- new Tuple4<Integer, Float, Long, Double>(7, 0.77f, 29L, 14.0),
- new Tuple4<Integer, Float, Long, Double>(7, 0.88f, 34L, 15.2)
- };
-
- @SuppressWarnings("unchecked")
- @Override
- protected TuplePairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> createComparator(boolean ascending) {
- return new TuplePairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>>(
- new int[]{0, 2},
- new int[]{0, 3},
- new TypeComparator[]{
- new IntComparator(ascending),
- new DoubleComparator(ascending)
- },
- new TypeComparator[]{
- new IntComparator(ascending),
- new DoubleComparator(ascending)
- }
- );
- }
-
- @Override
- protected Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]> getSortedTestData() {
- return new Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]>(dataISD, dataIDL);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index 6db9035..34226a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -316,7 +316,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T>
this.recordBufferForComparison.setReadPosition(pointer2);
try {
- return this.comparator.compare(this.recordBuffer, this.recordBufferForComparison);
+ return this.comparator.compareSerialized(this.recordBuffer, this.recordBufferForComparison);
} catch (IOException ioex) {
throw new RuntimeException("Error comparing two records.", ioex);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListComparator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListComparator.java
index 53e596c..11064fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListComparator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListComparator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.testutils.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
@@ -31,6 +32,10 @@ public class IntListComparator extends TypeComparator<IntList> {
private int reference;
+ private Comparable[] extractedKey = new Comparable[1];
+
+ private final TypeComparator[] comparators = new TypeComparator[] {new IntComparator(true)};
+
@Override
public int hash(IntList record) {
return record.getKey() * 73;
@@ -58,7 +63,7 @@ public class IntListComparator extends TypeComparator<IntList> {
}
@Override
- public int compare(DataInputView source1, DataInputView source2) throws IOException {
+ public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
return source1.readInt() - source2.readInt();
}
@@ -134,4 +139,15 @@ public class IntListComparator extends TypeComparator<IntList> {
public TypeComparator<IntList> duplicate() {
return new IntListComparator();
}
+
+ @Override
+ public Object[] extractKeys(IntList record) {
+ extractedKey[0] = record.getKey();
+ return extractedKey;
+ }
+
+ @Override public TypeComparator[] getComparators() {
+ return comparators;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairComparator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairComparator.java
index f5d31d6..53c76fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairComparator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairComparator.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.operators.testutils.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
@@ -32,7 +33,11 @@ public class IntPairComparator extends TypeComparator<IntPair> {
private static final long serialVersionUID = 1L;
private int reference;
-
+
+ private final Comparable[] extractedKey = new Comparable[1];
+
+ private final TypeComparator[] comparators = new TypeComparator[] {new IntComparator(true)};
+
@Override
public int hash(IntPair object) {
return object.getKey() * 73;
@@ -60,7 +65,7 @@ public class IntPairComparator extends TypeComparator<IntPair> {
}
@Override
- public int compare(DataInputView source1, DataInputView source2) throws IOException {
+ public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
return source1.readInt() - source2.readInt();
}
@@ -113,6 +118,15 @@ public class IntPairComparator extends TypeComparator<IntPair> {
}
@Override
+ public Object[] extractKeys(IntPair pair) {
+ extractedKey[0] = pair.getKey();
+ return extractedKey;
+ }
+ @Override public TypeComparator[] getComparators() {
+ return comparators;
+ }
+
+ @Override
public boolean supportsSerializationWithKeyNormalization() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairComparator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairComparator.java
index 4397ba5..66c87bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairComparator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairComparator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.testutils.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
@@ -32,6 +33,10 @@ public class StringPairComparator extends TypeComparator<StringPair> {
private String reference;
+ private Comparable[] extractedKey = new Comparable[1];
+
+ private final TypeComparator[] comparators = new TypeComparator[] {new StringComparator(true)};
+
@Override
public int hash(StringPair record) {
return record.getKey().hashCode();
@@ -58,7 +63,7 @@ public class StringPairComparator extends TypeComparator<StringPair> {
}
@Override
- public int compare(DataInputView firstSource, DataInputView secondSource)
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource)
throws IOException {
return StringValue.readString(firstSource).compareTo(StringValue.readString(secondSource));
}
@@ -110,4 +115,14 @@ public class StringPairComparator extends TypeComparator<StringPair> {
public TypeComparator<StringPair> duplicate() {
return new StringPairComparator();
}
+
+ @Override
+ public Object[] extractKeys(StringPair record) {
+ extractedKey[0] = record.getKey();
+ return extractedKey;
+ }
+
+ @Override public TypeComparator[] getComparators() {
+ return comparators;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
index f109ca9..f55786c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
@@ -27,6 +27,7 @@ import java.io.PipedOutputStream;
import junit.framework.TestCase;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.junit.Assert;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -381,6 +382,9 @@ public class OutputEmitterTest extends TestCase {
@SuppressWarnings("serial")
private static class TestIntComparator extends TypeComparator<Integer> {
+ private final Comparable[] extractedKey = new Comparable[1];
+
+ private TypeComparator[] comparators = new TypeComparator[]{new IntComparator(true)};
@Override
public int hash(Integer record) {
@@ -402,7 +406,7 @@ public class OutputEmitterTest extends TestCase {
public int compare(Integer first, Integer second) { throw new UnsupportedOperationException(); }
@Override
- public int compare(DataInputView firstSource, DataInputView secondSource) {
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) {
throw new UnsupportedOperationException();
}
@@ -438,7 +442,17 @@ public class OutputEmitterTest extends TestCase {
@Override
public TypeComparator<Integer> duplicate() { throw new UnsupportedOperationException(); }
-
+
+ @Override
+ public Object[] extractKeys(Integer record) {
+ extractedKey[0] = record;
+ return extractedKey;
+ }
+
+ @Override
+ public TypeComparator[] getComparators() {
+ return comparators;
+ }
}
// @Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
index 0322024..5b32e12 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
@@ -30,7 +31,11 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert
private static final long serialVersionUID = 1L;
private long reference;
-
+
+ private Comparable[] extractedKey = new Comparable[1];
+
+ private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
+
@Override
public int hash(VertexWithAdjacencyList record) {
final long value = record.getVertexID();
@@ -61,7 +66,7 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert
}
@Override
- public int compare(DataInputView source1, DataInputView source2) throws IOException {
+ public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
final long diff = source1.readLong() - source2.readLong();
return diff < 0 ? -1 : diff > 0 ? 1 : 0;
}
@@ -129,4 +134,15 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert
public VertexWithAdjacencyListComparator duplicate() {
return new VertexWithAdjacencyListComparator();
}
+
+ @Override
+ public Object[] extractKeys(VertexWithAdjacencyList record) {
+ extractedKey[0] = record.getVertexID();
+ return extractedKey;
+ }
+
+ @Override
+ public TypeComparator[] getComparators() {
+ return comparators;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
index 5aa81ea..e46c36a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
@@ -30,7 +31,11 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve
private static final long serialVersionUID = 1L;
private long reference;
-
+
+ private Comparable[] extractedKey = new Comparable[1];
+
+ private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
+
@Override
public int hash(VertexWithRankAndDangling record) {
final long value = record.getVertexID();
@@ -61,7 +66,7 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve
}
@Override
- public int compare(DataInputView source1, DataInputView source2) throws IOException {
+ public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
final long diff = source1.readLong() - source2.readLong();
return diff < 0 ? -1 : diff > 0 ? 1 : 0;
}
@@ -134,4 +139,15 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve
public VertexWithRankAndDanglingComparator duplicate() {
return new VertexWithRankAndDanglingComparator();
}
+
+ @Override
+ public Object[] extractKeys(VertexWithRankAndDangling record) {
+ extractedKey[0] = record.getVertexID();
+ return extractedKey;
+ }
+
+ @Override
+ public TypeComparator[] getComparators() {
+ return comparators;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/57b8e66a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
index 0d94c21..77a6a97 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
@@ -30,7 +31,11 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan
private static final long serialVersionUID = 1L;
private long reference;
-
+
+ private Comparable[] extractedKey = new Comparable[1];
+
+ private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
+
@Override
public int hash(VertexWithRank record) {
final long value = record.getVertexID();
@@ -61,7 +66,7 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan
}
@Override
- public int compare(DataInputView source1, DataInputView source2) throws IOException {
+ public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
final long diff = source1.readLong() - source2.readLong();
return diff < 0 ? -1 : diff > 0 ? 1 : 0;
}
@@ -132,4 +137,15 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan
public VertexWithRankComparator duplicate() {
return new VertexWithRankComparator();
}
+
+ @Override
+ public Object[] extractKeys(VertexWithRank record) {
+ extractedKey[0] = record.getVertexID();
+ return extractedKey;
+ }
+
+ @Override
+ public TypeComparator[] getComparators() {
+ return comparators;
+ }
}