You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:59 UTC

[17/60] git commit: Make DistinctOperator and Keys use TupleComparatorBase

Make DistinctOperator and Keys use TupleComparatorBase

Was TupleComparator before which does not work when used from the Scala API.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/785f2c4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/785f2c4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/785f2c4f

Branch: refs/heads/master
Commit: 785f2c4f8f3053a376083f66a2a2033edb9a8a8a
Parents: dee54cb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Aug 26 17:22:18 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:57 2014 +0200

----------------------------------------------------------------------
 .../api/java/operators/DistinctOperator.java    |  5 +-
 .../apache/flink/api/java/operators/Keys.java   | 85 ++++++++++----------
 .../typeutils/runtime/TupleComparatorBase.java  |  4 +-
 3 files changed, 47 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/785f2c4f/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index fd35773..7d8a28f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 
@@ -52,8 +53,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 		// if keys is null distinction is done on all tuple fields
 		if (keys == null) {
 			if (input.getType().isTupleType()) {
-				
-				TupleTypeInfo<?> tupleType = (TupleTypeInfo<?>) input.getType();
+
+				TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) input.getType();
 				int[] allFields = new int[tupleType.getArity()];
 				for(int i = 0; i < tupleType.getArity(); i++) {
 					allFields[i] = i;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/785f2c4f/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 9ac2b2b..630f674 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -23,8 +23,7 @@ import java.util.Arrays;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.types.TypeInformation;
 
 
@@ -32,46 +31,46 @@ public abstract class Keys<T> {
 
 
 	public abstract int getNumberOfKeyFields();
-	
+
 	public boolean isEmpty() {
 		return getNumberOfKeyFields() == 0;
 	}
-	
+
 	public abstract boolean areCompatibale(Keys<?> other);
-	
+
 	public abstract int[] computeLogicalKeyPositions();
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Specializations for field indexed / expression-based / extractor-based grouping
 	// --------------------------------------------------------------------------------------------
-	
+
 	public static class FieldPositionKeys<T> extends Keys<T> {
-		
+
 		private final int[] fieldPositions;
 		private final TypeInformation<?>[] types;
-		
+
 		public FieldPositionKeys(int[] groupingFields, TypeInformation<T> type) {
 			this(groupingFields, type, false);
 		}
-		
+
 		public FieldPositionKeys(int[] groupingFields, TypeInformation<T> type, boolean allowEmpty) {
 			if (!type.isTupleType()) {
 				throw new InvalidProgramException("Specifying keys via field positions is only valid for tuple data types");
 			}
-			
+
 			if (!allowEmpty && (groupingFields == null || groupingFields.length == 0)) {
 				throw new IllegalArgumentException("The grouping fields must not be empty.");
 			}
-			
-			TupleTypeInfo<?> tupleType = (TupleTypeInfo<?>)type;
-	
-			this.fieldPositions = makeFields(groupingFields, (TupleTypeInfo<?>) type);
-			
+
+			TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>)type;
+
+			this.fieldPositions = makeFields(groupingFields, (TupleTypeInfoBase<?>) type);
+
 			types = new TypeInformation[this.fieldPositions.length];
 			for(int i = 0; i < this.fieldPositions.length; i++) {
 				types[i] = tupleType.getTypeAt(this.fieldPositions[i]);
 			}
-			
+
 		}
 
 		@Override
@@ -81,10 +80,10 @@ public abstract class Keys<T> {
 
 		@Override
 		public boolean areCompatibale(Keys<?> other) {
-			
+
 			if (other instanceof FieldPositionKeys) {
 				FieldPositionKeys<?> oKey = (FieldPositionKeys<?>) other;
-				
+
 				if(oKey.types.length != this.types.length) {
 					return false;
 				}
@@ -94,14 +93,14 @@ public abstract class Keys<T> {
 					}
 				}
 				return true;
-				
+
 			} else if (other instanceof SelectorFunctionKeys) {
 				if(this.types.length != 1) {
 					return false;
 				}
-				
+
 				SelectorFunctionKeys<?, ?> sfk = (SelectorFunctionKeys<?, ?>) other;
-				
+
 				return sfk.keyType.equals(this.types[0]);
 			}
 			else {
@@ -113,15 +112,15 @@ public abstract class Keys<T> {
 		public int[] computeLogicalKeyPositions() {
 			return this.fieldPositions;
 		}
-	
+
 		@Override
 		public String toString() {
 			return Arrays.toString(fieldPositions);
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public static class SelectorFunctionKeys<T, K> extends Keys<T> {
 
 		private final KeySelector<T, K> keyExtractor;
@@ -155,20 +154,20 @@ public abstract class Keys<T> {
 
 		@Override
 		public boolean areCompatibale(Keys<?> other) {
-			
+
 			if (other instanceof SelectorFunctionKeys) {
 				@SuppressWarnings("unchecked")
 				SelectorFunctionKeys<?, K> sfk = (SelectorFunctionKeys<?, K>) other;
-				
+
 				return sfk.keyType.equals(this.keyType);
 			}
 			else if (other instanceof FieldPositionKeys) {
 				FieldPositionKeys<?> fpk = (FieldPositionKeys<?>) other;
-						
+
 				if(fpk.types.length != 1) {
 					return false;
 				}
-				
+
 				return fpk.types[0].equals(this.keyType);
 			}
 			else {
@@ -180,15 +179,15 @@ public abstract class Keys<T> {
 		public int[] computeLogicalKeyPositions() {
 			return new int[] {0};
 		}
-		
+
 		@Override
 		public String toString() {
 			return keyExtractor + " (" + keyType + ")";
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public static class ExpressionKeys<T> extends Keys<T> {
 
 		private int[] logicalPositions;
@@ -249,15 +248,15 @@ public abstract class Keys<T> {
 			return logicalPositions;
 		}
 	}
-	
-	
+
+
 	// --------------------------------------------------------------------------------------------
 	//  Utilities
 	// --------------------------------------------------------------------------------------------
-	
-	private static int[] makeFields(int[] fields, TupleTypeInfo<?> type) {
+
+	private static int[] makeFields(int[] fields, TupleTypeInfoBase<?> type) {
 		int inLength = type.getArity();
-		
+
 		// null parameter means all fields are considered
 		if (fields == null || fields.length == 0) {
 			fields = new int[inLength];
@@ -269,30 +268,30 @@ public abstract class Keys<T> {
 			return rangeCheckAndOrderFields(fields, inLength-1);
 		}
 	}
-	
+
 	private static final int[] rangeCheckAndOrderFields(int[] fields, int maxAllowedField) {
 		// order
 		Arrays.sort(fields);
-		
+
 		// range check and duplicate eliminate
 		int i = 1, k = 0;
 		int last = fields[0];
-		
+
 		if (last < 0 || last > maxAllowedField) {
 			throw new IllegalArgumentException("Tuple position is out of range.");
 		}
-		
+
 		for (; i < fields.length; i++) {
 			if (fields[i] < 0 || i > maxAllowedField) {
 				throw new IllegalArgumentException("Tuple position is out of range.");
 			}
-			
+
 			if (fields[i] != last) {
 				k++;
 				fields[k] = fields[i];
 			}
 		}
-		
+
 		// check if we eliminated something
 		if (k == fields.length - 1) {
 			return fields;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/785f2c4f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
index cea9879..7377d72 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
@@ -246,7 +246,7 @@ public abstract class TupleComparatorBase<T> extends TypeComparator<T> implement
 	// --------------------------------------------------------------------------------------------
 	
 	@SuppressWarnings("unchecked")
-	private final void instantiateDeserializationUtils() {
+	protected final void instantiateDeserializationUtils() {
 		if (this.serializers == null) {
 			this.serializers = new TypeSerializer[this.serializerFactories.length];
 			for (int i = 0; i < this.serializers.length; i++) {
@@ -273,7 +273,7 @@ public abstract class TupleComparatorBase<T> extends TypeComparator<T> implement
 	 * @see: http://en.wikipedia.org/wiki/List_of_prime_numbers
 	 * @see: http://oeis.org/A068652
 	 */
-	protected static final int[] HASH_SALT = new int[] {
+	public static final int[] HASH_SALT = new int[] {
 		73   , 79   , 97   , 113  , 131  , 197  , 199  , 311   , 
 		337  , 373  , 719  , 733  , 919  , 971  , 991  , 1193  , 
 		1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 ,