You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/04/23 22:57:34 UTC
svn commit: r768050 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
Author: cdouglas
Date: Thu Apr 23 20:57:33 2009
New Revision: 768050
URL: http://svn.apache.org/viewvc?rev=768050&view=rev
Log:
HADOOP-5705. Improve TotalOrderPartitioner efficiency by updating the trie construction. Contributed by Dick King.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=768050&r1=768049&r2=768050&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Apr 23 20:57:33 2009
@@ -250,6 +250,9 @@
HADOOP-5625. Add operation duration to clienttrace. (Lei Xu via cdouglas)
+ HADOOP-5705. Improve TotalOrderPartitioner efficiency by updating the trie
+ construction. (Dick King via cdouglas)
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java?rev=768050&r1=768049&r2=768050&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java Thu Apr 23 20:57:33 2009
@@ -83,7 +83,14 @@
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[])splitPoints, 0,
splitPoints.length, new byte[0],
- job.getInt("total.order.partitioner.max.trie.depth", 2));
+ // Now that blocks of identical splitless trie nodes are
+ // represented reentrantly, and we develop a leaf for any trie
+ // node with only one split point, the only reason for a depth
+ // limit is to refute stack overflow or bloat in the pathological
+ // case where the split points are long and mostly look like bytes
+ // iii...iixii...iii . Therefore, we make the default depth
+ // limit large but not huge.
+ job.getInt("total.order.partitioner.max.trie.depth", 200));
} else {
partitions = new BinarySearchNode(splitPoints, comparator);
}
@@ -177,11 +184,42 @@
return child[0xFF & key.getBytes()[level]].findPartition(key);
}
}
+
+ /**
+ * @param level the tree depth at this node
+ * @param splitPoints the full split point vector, which holds
+ * the split point or points this leaf node
+ * should contain
+ * @param lower first INcluded element of splitPoints
+ * @param upper first EXcluded element of splitPoints
+ * @return a leaf node. They come in three kinds: no split points
+ * [and the findParttion returns a canned index], one split
+ * point [and we compare with a single comparand], or more
+ * than one [and we do a binary search]. The last case is
+ * rare.
+ */
+ private TrieNode LeafTrieNodeFactory
+ (int level, BinaryComparable[] splitPoints, int lower, int upper) {
+ switch (upper - lower) {
+ case 0:
+ return new UnsplitTrieNode(level, lower);
+
+ case 1:
+ return new SinglySplitTrieNode(level, splitPoints, lower);
+
+ default:
+ return new LeafTrieNode(level, splitPoints, lower, upper);
+ }
+ }
/**
* A leaf trie node that scans for the key between lower..upper.
+ *
+ * We don't generate many of these now, since we usually continue trie-ing
+ * when more than one split point remains at this level. and we make different
+ * objects for nodes with 0 or 1 split point.
*/
- class LeafTrieNode extends TrieNode {
+ private class LeafTrieNode extends TrieNode {
final int lower;
final int upper;
final BinaryComparable[] splitPoints;
@@ -196,6 +234,34 @@
return (pos < 0) ? -pos : pos;
}
}
+
+ private class UnsplitTrieNode extends TrieNode {
+ final int result;
+
+ UnsplitTrieNode(int level, int value) {
+ super(level);
+ this.result = value;
+ }
+
+ public int findPartition(BinaryComparable key) {
+ return result;
+ }
+ }
+
+ private class SinglySplitTrieNode extends TrieNode {
+ final int lower;
+ final BinaryComparable mySplitPoint;
+
+ SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
+ super(level);
+ this.lower = lower;
+ this.mySplitPoint = splitPoints[lower];
+ }
+
+ public int findPartition(BinaryComparable key) {
+ return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
+ }
+ }
/**
@@ -221,7 +287,25 @@
reader.close();
return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
}
+
+ /**
+ *
+ * This object contains a TrieNodeRef if there is such a thing that
+ * can be repeated. Two adjacent trie node slots that contain no
+ * split points can be filled with the same trie node, even if they
+ * are not on the same level. See buildTreeRec, below.
+ *
+ */
+ private class CarriedTrieNodeRef
+ {
+ TrieNode content;
+
+ CarriedTrieNodeRef() {
+ content = null;
+ }
+ }
+
/**
* Given a sorted set of cut points, build a trie that will find the correct
* partition quickly.
@@ -233,16 +317,51 @@
* @return the trie node that will divide the splits correctly
*/
private TrieNode buildTrie(BinaryComparable[] splits, int lower,
- int upper, byte[] prefix, int maxDepth) {
+ int upper, byte[] prefix, int maxDepth) {
+ return buildTrieRec
+ (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
+ }
+
+ /**
+ * This is the core of buildTrie. The interface, and stub, above, just adds
+ * an empty CarriedTrieNodeRef.
+ *
+ * We build trie nodes in depth first order, which is also in key space
+ * order. Every leaf node is referenced as a slot in a parent internal
+ * node. If two adjacent slots [in the DFO] hold leaf nodes that have
+ * no split point, then they are not separated by a split point either,
+ * because there's no place in key space for that split point to exist.
+ *
+ * When that happens, the leaf nodes would be semantically identical, and
+ * we reuse the object. A single CarriedTrieNodeRef "ref" lives for the
+ * duration of the tree-walk. ref carries a potentially reusable, unsplit
+ * leaf node for such reuse until a leaf node with a split arises, which
+ * breaks the chain until we need to make a new unsplit leaf node.
+ *
+ * Note that this use of CarriedTrieNodeRef means that for internal nodes,
+ * for internal nodes if this code is modified in any way we still need
+ * to make or fill in the subnodes in key space order.
+ */
+ private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
+ int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
final int depth = prefix.length;
- if (depth >= maxDepth || lower == upper) {
- return new LeafTrieNode(depth, splits, lower, upper);
+ // We generate leaves for a single split point as well as for
+ // no split points.
+ if (depth >= maxDepth || lower >= upper - 1) {
+ // If we have two consecutive requests for an unsplit trie node, we
+ // can deliver the same one the second time.
+ if (lower == upper && ref.content != null) {
+ return ref.content;
+ }
+ TrieNode result = LeafTrieNodeFactory(depth, splits, lower, upper);
+ ref.content = lower == upper ? result : null;
+ return result;
}
InnerTrieNode result = new InnerTrieNode(depth);
byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
// append an extra byte on to the prefix
- int currentBound = lower;
- for(int ch = 0; ch < 255; ++ch) {
+ int currentBound = lower;
+ for(int ch = 0; ch < 0xFF; ++ch) {
trial[depth] = (byte) (ch + 1);
lower = currentBound;
while (currentBound < upper) {
@@ -252,13 +371,14 @@
currentBound += 1;
}
trial[depth] = (byte) ch;
- result.child[0xFF & ch] = buildTrie(splits, lower, currentBound, trial,
- maxDepth);
+ result.child[0xFF & ch]
+ = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
}
// pick up the rest
- trial[depth] = 127;
- result.child[255] = buildTrie(splits, currentBound, upper, trial,
- maxDepth);
+ trial[depth] = (byte)0xFF;
+ result.child[0xFF]
+ = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
+
return result;
}
}