You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/01 18:39:32 UTC
incubator-flink git commit: [FLINK-1292] Allow for longer normalized
keys when using composite keys
Repository: incubator-flink
Updated Branches:
refs/heads/master 33724f01c -> bc4119799
[FLINK-1292] Allow for longer normalized keys when using composite keys
This closes #241
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bc411979
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bc411979
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bc411979
Branch: refs/heads/master
Commit: bc4119799c4179044f5480bf38314e7f33b82708
Parents: 33724f0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 27 20:27:26 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 1 16:57:12 2014 +0100
----------------------------------------------------------------------
.../typeutils/runtime/WritableComparator.java | 3 ++-
.../operators/sort/NormalizedKeySorter.java | 20 ++++++++++++++++----
.../VertexWithAdjacencyListComparator.java | 4 +++-
.../VertexWithRankAndDanglingComparator.java | 2 ++
.../types/VertexWithRankComparator.java | 2 ++
5 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc411979/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
index 4ca84b5..6bb8d8b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
@@ -134,7 +134,8 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
}
@SuppressWarnings("rawtypes")
- @Override public TypeComparator[] getFlatComparators() {
+ @Override
+ public TypeComparator[] getFlatComparators() {
return comparators;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc411979/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index b406a61..c69474a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -36,11 +36,13 @@ import org.apache.flink.util.MutableObjectIterator;
/**
*
*/
-public final class NormalizedKeySorter<T> implements InMemorySorter<T>
-{
+public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
+
private static final int OFFSET_LEN = 8;
- private static final int DEFAULT_MAX_NORMALIZED_KEY_LEN = 8;
+ private static final int DEFAULT_MAX_NORMALIZED_KEY_LEN = 16;
+
+ private static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8;
private static final int MIN_REQUIRED_BUFFERS = 3;
@@ -143,7 +145,17 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T>
// set up normalized key characteristics
if (this.comparator.supportsNormalizedKey()) {
- this.numKeyBytes = Math.min(this.comparator.getNormalizeKeyLen(), maxNormalizedKeyBytes);
+ // compute the max normalized key length
+ int numPartialKeys;
+ try {
+ numPartialKeys = this.comparator.getFlatComparators().length;
+ } catch (Throwable t) {
+ numPartialKeys = 1;
+ }
+
+ int maxLen = Math.min(maxNormalizedKeyBytes, MAX_NORMALIZED_KEY_LEN_PER_ELEMENT * numPartialKeys);
+
+ this.numKeyBytes = Math.min(this.comparator.getNormalizeKeyLen(), maxLen);
this.normalizedKeyFullyDetermines = !this.comparator.isNormalizedKeyPrefixOnly(this.numKeyBytes);
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc411979/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
index e607ba7..7d11530 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
@@ -32,6 +32,7 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert
private long reference;
+ @SuppressWarnings("rawtypes")
private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
@Override
@@ -138,8 +139,9 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert
target[index] = ((VertexWithAdjacencyList) record).getVertexID();
return 1;
}
-
+
@Override
+ @SuppressWarnings("rawtypes")
public TypeComparator[] getFlatComparators() {
return comparators;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc411979/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
index dd9d4dd..d83c3fb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
@@ -32,6 +32,7 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve
private long reference;
+ @SuppressWarnings("rawtypes")
private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
@Override
@@ -145,6 +146,7 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve
}
@Override
+ @SuppressWarnings("rawtypes")
public TypeComparator[] getFlatComparators() {
return comparators;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc411979/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
index b63f724..9107f4b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
@@ -32,6 +32,7 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan
private long reference;
+ @SuppressWarnings("rawtypes")
private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
@Override
@@ -143,6 +144,7 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan
}
@Override
+ @SuppressWarnings("rawtypes")
public TypeComparator[] getFlatComparators() {
return comparators;
}