You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:59 UTC
[17/60] git commit: Make DistinctOperator and Keys use
TupleComparatorBase
Make DistinctOperator and Keys use TupleComparatorBase
Was TupleComparator before which does not work when used from the Scala API.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/785f2c4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/785f2c4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/785f2c4f
Branch: refs/heads/master
Commit: 785f2c4f8f3053a376083f66a2a2033edb9a8a8a
Parents: dee54cb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Aug 26 17:22:18 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:57 2014 +0200
----------------------------------------------------------------------
.../api/java/operators/DistinctOperator.java | 5 +-
.../apache/flink/api/java/operators/Keys.java | 85 ++++++++++----------
.../typeutils/runtime/TupleComparatorBase.java | 4 +-
3 files changed, 47 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/785f2c4f/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index fd35773..7d8a28f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.types.TypeInformation;
import org.apache.flink.util.Collector;
@@ -52,8 +53,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
// if keys is null distinction is done on all tuple fields
if (keys == null) {
if (input.getType().isTupleType()) {
-
- TupleTypeInfo<?> tupleType = (TupleTypeInfo<?>) input.getType();
+
+ TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) input.getType();
int[] allFields = new int[tupleType.getArity()];
for(int i = 0; i < tupleType.getArity(); i++) {
allFields[i] = i;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/785f2c4f/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 9ac2b2b..630f674 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -23,8 +23,7 @@ import java.util.Arrays;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.types.TypeInformation;
@@ -32,46 +31,46 @@ public abstract class Keys<T> {
public abstract int getNumberOfKeyFields();
-
+
public boolean isEmpty() {
return getNumberOfKeyFields() == 0;
}
-
+
public abstract boolean areCompatibale(Keys<?> other);
-
+
public abstract int[] computeLogicalKeyPositions();
-
+
// --------------------------------------------------------------------------------------------
// Specializations for field indexed / expression-based / extractor-based grouping
// --------------------------------------------------------------------------------------------
-
+
public static class FieldPositionKeys<T> extends Keys<T> {
-
+
private final int[] fieldPositions;
private final TypeInformation<?>[] types;
-
+
public FieldPositionKeys(int[] groupingFields, TypeInformation<T> type) {
this(groupingFields, type, false);
}
-
+
public FieldPositionKeys(int[] groupingFields, TypeInformation<T> type, boolean allowEmpty) {
if (!type.isTupleType()) {
throw new InvalidProgramException("Specifying keys via field positions is only valid for tuple data types");
}
-
+
if (!allowEmpty && (groupingFields == null || groupingFields.length == 0)) {
throw new IllegalArgumentException("The grouping fields must not be empty.");
}
-
- TupleTypeInfo<?> tupleType = (TupleTypeInfo<?>)type;
-
- this.fieldPositions = makeFields(groupingFields, (TupleTypeInfo<?>) type);
-
+
+ TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>)type;
+
+ this.fieldPositions = makeFields(groupingFields, (TupleTypeInfoBase<?>) type);
+
types = new TypeInformation[this.fieldPositions.length];
for(int i = 0; i < this.fieldPositions.length; i++) {
types[i] = tupleType.getTypeAt(this.fieldPositions[i]);
}
-
+
}
@Override
@@ -81,10 +80,10 @@ public abstract class Keys<T> {
@Override
public boolean areCompatibale(Keys<?> other) {
-
+
if (other instanceof FieldPositionKeys) {
FieldPositionKeys<?> oKey = (FieldPositionKeys<?>) other;
-
+
if(oKey.types.length != this.types.length) {
return false;
}
@@ -94,14 +93,14 @@ public abstract class Keys<T> {
}
}
return true;
-
+
} else if (other instanceof SelectorFunctionKeys) {
if(this.types.length != 1) {
return false;
}
-
+
SelectorFunctionKeys<?, ?> sfk = (SelectorFunctionKeys<?, ?>) other;
-
+
return sfk.keyType.equals(this.types[0]);
}
else {
@@ -113,15 +112,15 @@ public abstract class Keys<T> {
public int[] computeLogicalKeyPositions() {
return this.fieldPositions;
}
-
+
@Override
public String toString() {
return Arrays.toString(fieldPositions);
}
}
-
+
// --------------------------------------------------------------------------------------------
-
+
public static class SelectorFunctionKeys<T, K> extends Keys<T> {
private final KeySelector<T, K> keyExtractor;
@@ -155,20 +154,20 @@ public abstract class Keys<T> {
@Override
public boolean areCompatibale(Keys<?> other) {
-
+
if (other instanceof SelectorFunctionKeys) {
@SuppressWarnings("unchecked")
SelectorFunctionKeys<?, K> sfk = (SelectorFunctionKeys<?, K>) other;
-
+
return sfk.keyType.equals(this.keyType);
}
else if (other instanceof FieldPositionKeys) {
FieldPositionKeys<?> fpk = (FieldPositionKeys<?>) other;
-
+
if(fpk.types.length != 1) {
return false;
}
-
+
return fpk.types[0].equals(this.keyType);
}
else {
@@ -180,15 +179,15 @@ public abstract class Keys<T> {
public int[] computeLogicalKeyPositions() {
return new int[] {0};
}
-
+
@Override
public String toString() {
return keyExtractor + " (" + keyType + ")";
}
}
-
+
// --------------------------------------------------------------------------------------------
-
+
public static class ExpressionKeys<T> extends Keys<T> {
private int[] logicalPositions;
@@ -249,15 +248,15 @@ public abstract class Keys<T> {
return logicalPositions;
}
}
-
-
+
+
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
-
- private static int[] makeFields(int[] fields, TupleTypeInfo<?> type) {
+
+ private static int[] makeFields(int[] fields, TupleTypeInfoBase<?> type) {
int inLength = type.getArity();
-
+
// null parameter means all fields are considered
if (fields == null || fields.length == 0) {
fields = new int[inLength];
@@ -269,30 +268,30 @@ public abstract class Keys<T> {
return rangeCheckAndOrderFields(fields, inLength-1);
}
}
-
+
private static final int[] rangeCheckAndOrderFields(int[] fields, int maxAllowedField) {
// order
Arrays.sort(fields);
-
+
// range check and duplicate eliminate
int i = 1, k = 0;
int last = fields[0];
-
+
if (last < 0 || last > maxAllowedField) {
throw new IllegalArgumentException("Tuple position is out of range.");
}
-
+
for (; i < fields.length; i++) {
if (fields[i] < 0 || i > maxAllowedField) {
throw new IllegalArgumentException("Tuple position is out of range.");
}
-
+
if (fields[i] != last) {
k++;
fields[k] = fields[i];
}
}
-
+
// check if we eliminated something
if (k == fields.length - 1) {
return fields;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/785f2c4f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
index cea9879..7377d72 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
@@ -246,7 +246,7 @@ public abstract class TupleComparatorBase<T> extends TypeComparator<T> implement
// --------------------------------------------------------------------------------------------
@SuppressWarnings("unchecked")
- private final void instantiateDeserializationUtils() {
+ protected final void instantiateDeserializationUtils() {
if (this.serializers == null) {
this.serializers = new TypeSerializer[this.serializerFactories.length];
for (int i = 0; i < this.serializers.length; i++) {
@@ -273,7 +273,7 @@ public abstract class TupleComparatorBase<T> extends TypeComparator<T> implement
* @see: http://en.wikipedia.org/wiki/List_of_prime_numbers
* @see: http://oeis.org/A068652
*/
- protected static final int[] HASH_SALT = new int[] {
+ public static final int[] HASH_SALT = new int[] {
73 , 79 , 97 , 113 , 131 , 197 , 199 , 311 ,
337 , 373 , 719 , 733 , 919 , 971 , 991 , 1193 ,
1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 ,