You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/01 18:39:32 UTC

incubator-flink git commit: [FLINK-1292] Allow for longer normalized keys when using composite keys

Repository: incubator-flink
Updated Branches:
  refs/heads/master 33724f01c -> bc4119799


[FLINK-1292] Allow for longer normalized keys when using composite keys

This closes #241


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bc411979
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bc411979
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bc411979

Branch: refs/heads/master
Commit: bc4119799c4179044f5480bf38314e7f33b82708
Parents: 33724f0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 27 20:27:26 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 1 16:57:12 2014 +0100

----------------------------------------------------------------------
 .../typeutils/runtime/WritableComparator.java   |  3 ++-
 .../operators/sort/NormalizedKeySorter.java     | 20 ++++++++++++++++----
 .../VertexWithAdjacencyListComparator.java      |  4 +++-
 .../VertexWithRankAndDanglingComparator.java    |  2 ++
 .../types/VertexWithRankComparator.java         |  2 ++
 5 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc411979/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
index 4ca84b5..6bb8d8b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
@@ -134,7 +134,8 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
 	}
 
 	@SuppressWarnings("rawtypes")
-	@Override public TypeComparator[] getFlatComparators() {
+	@Override
+	public TypeComparator[] getFlatComparators() {
 		return comparators;
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc411979/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index b406a61..c69474a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -36,11 +36,13 @@ import org.apache.flink.util.MutableObjectIterator;
 /**
  * 
  */
-public final class NormalizedKeySorter<T> implements InMemorySorter<T>
-{
+public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
+	
 	private static final int OFFSET_LEN = 8;
 	
-	private static final int DEFAULT_MAX_NORMALIZED_KEY_LEN = 8;
+	private static final int DEFAULT_MAX_NORMALIZED_KEY_LEN = 16;
+	
+	private static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8;
 	
 	private static final int MIN_REQUIRED_BUFFERS = 3;
 
@@ -143,7 +145,17 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T>
 		
 		// set up normalized key characteristics
 		if (this.comparator.supportsNormalizedKey()) {
-			this.numKeyBytes = Math.min(this.comparator.getNormalizeKeyLen(), maxNormalizedKeyBytes);
+			// compute the max normalized key length
+			int numPartialKeys;
+			try {
+				numPartialKeys = this.comparator.getFlatComparators().length;
+			} catch (Throwable t) {
+				numPartialKeys = 1;
+			}
+			
+			int maxLen = Math.min(maxNormalizedKeyBytes, MAX_NORMALIZED_KEY_LEN_PER_ELEMENT * numPartialKeys);
+			
+			this.numKeyBytes = Math.min(this.comparator.getNormalizeKeyLen(), maxLen);
 			this.normalizedKeyFullyDetermines = !this.comparator.isNormalizedKeyPrefixOnly(this.numKeyBytes);
 		}
 		else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc411979/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
index e607ba7..7d11530 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
@@ -32,6 +32,7 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert
 	
 	private long reference;
 
+	@SuppressWarnings("rawtypes")
 	private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
 
 	@Override
@@ -138,8 +139,9 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert
 		target[index] = ((VertexWithAdjacencyList) record).getVertexID();
 		return 1;
 	}
-
+	
 	@Override
+	@SuppressWarnings("rawtypes")
 	public TypeComparator[] getFlatComparators() {
 		return comparators;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc411979/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
index dd9d4dd..d83c3fb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
@@ -32,6 +32,7 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve
 	
 	private long reference;
 
+	@SuppressWarnings("rawtypes")
 	private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
 
 	@Override
@@ -145,6 +146,7 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve
 	}
 
 	@Override
+	@SuppressWarnings("rawtypes")
 	public TypeComparator[] getFlatComparators() {
 		return comparators;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc411979/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
index b63f724..9107f4b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
@@ -32,6 +32,7 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan
 	
 	private long reference;
 
+	@SuppressWarnings("rawtypes")
 	private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
 
 	@Override
@@ -143,6 +144,7 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan
 	}
 
 	@Override
+	@SuppressWarnings("rawtypes")
 	public TypeComparator[] getFlatComparators() {
 		return comparators;
 	}