You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/04/23 22:57:34 UTC

svn commit: r768050 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java

Author: cdouglas
Date: Thu Apr 23 20:57:33 2009
New Revision: 768050

URL: http://svn.apache.org/viewvc?rev=768050&view=rev
Log:
HADOOP-5705. Improve TotalOrderPartitioner efficiency by updating the trie construction. Contributed by Dick King.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=768050&r1=768049&r2=768050&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Apr 23 20:57:33 2009
@@ -250,6 +250,9 @@
 
     HADOOP-5625. Add operation duration to clienttrace. (Lei Xu via cdouglas)
 
+    HADOOP-5705. Improve TotalOrderPartitioner efficiency by updating the trie
+    construction. (Dick King via cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java?rev=768050&r1=768049&r2=768050&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java Thu Apr 23 20:57:33 2009
@@ -83,7 +83,14 @@
       if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
         partitions = buildTrie((BinaryComparable[])splitPoints, 0,
             splitPoints.length, new byte[0],
-            job.getInt("total.order.partitioner.max.trie.depth", 2));
+            // Now that blocks of identical splitless trie nodes are 
+            // represented reentrantly, and we develop a leaf for any trie
+            // node with only one split point, the only reason for a depth
+            // limit is to refute stack overflow or bloat in the pathological
+            // case where the split points are long and mostly look like bytes 
+            // iii...iixii...iii   .  Therefore, we make the default depth
+            // limit large but not huge.
+            job.getInt("total.order.partitioner.max.trie.depth", 200));
       } else {
         partitions = new BinarySearchNode(splitPoints, comparator);
       }
@@ -177,11 +184,42 @@
       return child[0xFF & key.getBytes()[level]].findPartition(key);
     }
   }
+  
+  /**
+   * @param level        the tree depth at this node
+   * @param splitPoints  the full split point vector, which holds
+   *                     the split point or points this leaf node
+   *                     should contain
+   * @param lower        first INcluded element of splitPoints
+   * @param upper        first EXcluded element of splitPoints
+   * @return  a leaf node.  They come in three kinds: no split points 
+   *          [and the findParttion returns a canned index], one split
+   *          point [and we compare with a single comparand], or more
+   *          than one [and we do a binary search].  The last case is
+   *          rare.
+   */
+  private TrieNode LeafTrieNodeFactory
+             (int level, BinaryComparable[] splitPoints, int lower, int upper) {
+      switch (upper - lower) {
+      case 0:
+          return new UnsplitTrieNode(level, lower);
+          
+      case 1:
+          return new SinglySplitTrieNode(level, splitPoints, lower);
+          
+      default:
+          return new LeafTrieNode(level, splitPoints, lower, upper);
+      }
+  }
 
   /**
    * A leaf trie node that scans for the key between lower..upper.
+   * 
+   * We don't generate many of these now, since we usually continue trie-ing 
+   * when more than one split point remains at this level. and we make different
+   * objects for nodes with 0 or 1 split point.
    */
-  class LeafTrieNode extends TrieNode {
+  private class LeafTrieNode extends TrieNode {
     final int lower;
     final int upper;
     final BinaryComparable[] splitPoints;
@@ -196,6 +234,34 @@
       return (pos < 0) ? -pos : pos;
     }
   }
+  
+  private class UnsplitTrieNode extends TrieNode {
+      final int result;
+      
+      UnsplitTrieNode(int level, int value) {
+          super(level);
+          this.result = value;
+      }
+      
+      public int findPartition(BinaryComparable key) {
+          return result;
+      }
+  }
+  
+  private class SinglySplitTrieNode extends TrieNode {
+      final int               lower;
+      final BinaryComparable  mySplitPoint;
+      
+      SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
+          super(level);
+          this.lower = lower;
+          this.mySplitPoint = splitPoints[lower];
+      }
+      
+      public int findPartition(BinaryComparable key) {
+          return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
+      }
+  }
 
 
   /**
@@ -221,7 +287,25 @@
     reader.close();
     return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
   }
+  
+  /**
+   * 
+   * This object contains a TrieNodeRef if there is such a thing that
+   * can be repeated.  Two adjacent trie node slots that contain no 
+   * split points can be filled with the same trie node, even if they
+   * are not on the same level.  See buildTreeRec, below.
+   *
+   */  
+  private class CarriedTrieNodeRef
+  {
+      TrieNode   content;
+      
+      CarriedTrieNodeRef() {
+          content = null;
+      }
+  }
 
+  
   /**
    * Given a sorted set of cut points, build a trie that will find the correct
    * partition quickly.
@@ -233,16 +317,51 @@
    * @return the trie node that will divide the splits correctly
    */
   private TrieNode buildTrie(BinaryComparable[] splits, int lower,
-      int upper, byte[] prefix, int maxDepth) {
+          int upper, byte[] prefix, int maxDepth) {
+      return buildTrieRec
+               (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
+  }
+  
+  /**
+   * This is the core of buildTrie.  The interface, and stub, above, just adds
+   * an empty CarriedTrieNodeRef.  
+   * 
+   * We build trie nodes in depth first order, which is also in key space
+   * order.  Every leaf node is referenced as a slot in a parent internal
+   * node.  If two adjacent slots [in the DFO] hold leaf nodes that have
+   * no split point, then they are not separated by a split point either, 
+   * because there's no place in key space for that split point to exist.
+   * 
+   * When that happens, the leaf nodes would be semantically identical, and
+   * we reuse the object.  A single CarriedTrieNodeRef "ref" lives for the 
+   * duration of the tree-walk.  ref carries a potentially reusable, unsplit
+   * leaf node for such reuse until a leaf node with a split arises, which 
+   * breaks the chain until we need to make a new unsplit leaf node.
+   * 
+   * Note that this use of CarriedTrieNodeRef means that for internal nodes, 
+   * for internal nodes if this code is modified in any way we still need 
+   * to make or fill in the subnodes in key space order.
+   */
+  private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
+      int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
     final int depth = prefix.length;
-    if (depth >= maxDepth || lower == upper) {
-      return new LeafTrieNode(depth, splits, lower, upper);
+    // We generate leaves for a single split point as well as for 
+    // no split points.
+    if (depth >= maxDepth || lower >= upper - 1) {
+        // If we have two consecutive requests for an unsplit trie node, we
+        // can deliver the same one the second time.
+        if (lower == upper && ref.content != null) {
+            return ref.content;
+        }
+        TrieNode  result = LeafTrieNodeFactory(depth, splits, lower, upper);
+        ref.content = lower == upper ? result : null;
+        return result;
     }
     InnerTrieNode result = new InnerTrieNode(depth);
     byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
     // append an extra byte on to the prefix
-    int currentBound = lower;
-    for(int ch = 0; ch < 255; ++ch) {
+    int         currentBound = lower;
+    for(int ch = 0; ch < 0xFF; ++ch) {
       trial[depth] = (byte) (ch + 1);
       lower = currentBound;
       while (currentBound < upper) {
@@ -252,13 +371,14 @@
         currentBound += 1;
       }
       trial[depth] = (byte) ch;
-      result.child[0xFF & ch] = buildTrie(splits, lower, currentBound, trial,
-                                   maxDepth);
+      result.child[0xFF & ch]
+                   = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
     }
     // pick up the rest
-    trial[depth] = 127;
-    result.child[255] = buildTrie(splits, currentBound, upper, trial,
-                                  maxDepth);
+    trial[depth] = (byte)0xFF;
+    result.child[0xFF] 
+                 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
+    
     return result;
   }
 }