You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/11/18 10:57:35 UTC

svn commit: r718539 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/FileInputFormat.java

Author: ddas
Date: Tue Nov 18 01:57:35 2008
New Revision: 718539

URL: http://svn.apache.org/viewvc?rev=718539&view=rev
Log:
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits based on the rack/host that has the most number of bytes. Contributed by Jothi Padmanabhan.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=718539&r1=718538&r2=718539&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Nov 18 01:57:35 2008
@@ -117,6 +117,10 @@
 
   OPTIMIZATIONS
 
+    HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
+    based on the rack/host that has the most number of bytes.
+    (Jothi Padmanabhan via ddas)
+
   BUG FIXES
 
     HADOOP-4204. Fix findbugs warnings related to unused variables, naive

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java?rev=718539&r1=718538&r2=718539&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java Tue Nov 18 01:57:35 2008
@@ -20,8 +20,15 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +38,9 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -214,9 +224,10 @@
 
         long bytesRemaining = length;
         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
-          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
+          String[] splitHosts = getSplitHosts(blkLocations, 
+              length-bytesRemaining, splitSize);
           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
-                                   blkLocations[blkIndex].getHosts()));
+              splitHosts));
           bytesRemaining -= splitSize;
         }
         
@@ -225,7 +236,8 @@
                      blkLocations[blkLocations.length-1].getHosts()));
         }
       } else if (length != 0) {
-        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+        String[] splitHosts = getSplitHosts(blkLocations,0,length);
+        splits.add(new FileSplit(path, 0, length, splitHosts));
       } else { 
         //Create empty hosts array for zero length files
         splits.add(new FileSplit(path, 0, length, new String[0]));
@@ -371,5 +383,213 @@
     }
     return result;
   }
+  
+
+  private void sortInDescendingOrder(List<NodeInfo> mylist) {
+    Collections.sort(mylist, new Comparator<NodeInfo> () {
+      public int compare(NodeInfo obj1, NodeInfo obj2) {
+
+        if (obj1 == null || obj2 == null)
+          return -1;
+
+        if (obj1.getValue() == obj2.getValue()) {
+          return 0;
+        }
+        else {
+          return ((obj1.getValue() < obj2.getValue()) ? 1 : -1);
+        }
+      }
+    }
+    );
+  }
 
+  /** 
+   * This function identifies and returns the hosts that contribute 
+   * most for a given split. For calculating the contribution, rack
+   * locality is treated on par with host locality, so hosts from racks
+   * that contribute the most are preferred over hosts on racks that 
+   * contribute less
+   * @param blkLocations The list of block locations
+   * @param offset 
+   * @param splitSize 
+   * @return array of hosts that contribute most to this split
+   * @throws IOException
+   */
+  protected String[] getSplitHosts(BlockLocation[] blkLocations, 
+      long offset, long splitSize)   throws IOException {
+
+    int startIndex = getBlockIndex(blkLocations, offset);
+
+    long bytesInThisBlock = blkLocations[startIndex].getOffset() + 
+                          blkLocations[startIndex].getLength() - offset;
+
+    //If this is the only block, just return
+    if (bytesInThisBlock >= splitSize) {
+      return blkLocations[startIndex].getHosts();
+    }
+
+    long bytesInFirstBlock = bytesInThisBlock;
+    int index = startIndex + 1;
+    splitSize -= bytesInThisBlock;
+
+    while (splitSize > 0) {
+      bytesInThisBlock =
+        Math.min(splitSize, blkLocations[index++].getLength());
+      splitSize -= bytesInThisBlock;
+    }
+
+    long bytesInLastBlock = bytesInThisBlock;
+    int endIndex = index - 1;
+    
+    NetworkTopology clusterMap = new NetworkTopology();
+    Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
+    Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
+    String [] allTopos = new String[0];
+
+    // Build the hierarchy and aggregate the contribution of 
+    // bytes at each level. See TestGetSplitHosts.java 
+
+    for (index = startIndex; index <= endIndex; index++) {
+
+      // Establish the bytes in this block
+      if (index == startIndex) {
+        bytesInThisBlock = bytesInFirstBlock;
+      }
+      else if (index == endIndex) {
+        bytesInThisBlock = bytesInLastBlock;
+      }
+      else {
+        bytesInThisBlock = blkLocations[index].getLength();
+      }
+      
+      allTopos = blkLocations[index].getTopologyPaths();
+
+      // If no topology information is available, just
+      // prefix a fakeRack
+      if (allTopos.length == 0) {
+        allTopos = fakeRacks(blkLocations, index);
+      }
+
+      // NOTE: This code currently works only for one level of
+      // hierarchy (rack/host). However, it is relatively easy
+      // to extend this to support aggregation at different
+      // levels 
+      
+      for (String topo: allTopos) {
+
+        Node node, parentNode;
+        NodeInfo nodeInfo, parentNodeInfo;
+
+        node = clusterMap.getNode(topo);
+
+        if (node == null) {
+          node = new NodeBase(topo);
+          clusterMap.add(node);
+          nodeInfo = new NodeInfo(node);
+          hostsMap.put(node,nodeInfo);
+          parentNode = node.getParent();
+          parentNodeInfo = racksMap.get(parentNode);
+          if (parentNodeInfo == null) {
+            parentNodeInfo = new NodeInfo(parentNode);
+            racksMap.put(parentNode,parentNodeInfo);
+          }
+          parentNodeInfo.addLeaf(nodeInfo);
+        }
+        else {
+          nodeInfo = hostsMap.get(node);
+          parentNode = node.getParent();
+          parentNodeInfo = racksMap.get(parentNode);
+        }
+
+        nodeInfo.addValue(index, bytesInThisBlock);
+        parentNodeInfo.addValue(index, bytesInThisBlock);
+
+      } // for all topos
+    
+    } // for all indices
+
+    return identifyHosts(allTopos.length, racksMap);
+  }
+  
+  private String[] identifyHosts(int replicationFactor, 
+                                 Map<Node,NodeInfo> racksMap) {
+    
+    String [] retVal = new String[replicationFactor];
+   
+    List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 
+
+    rackList.addAll(racksMap.values());
+    
+    // Sort the racks based on their contribution to this split
+    sortInDescendingOrder(rackList);
+    
+    boolean done = false;
+    int index = 0;
+    
+    // Get the host list for all our aggregated items, sort
+    // them and return the top entries
+    for (NodeInfo ni: rackList) {
+
+      Set<NodeInfo> hostSet = ni.getLeaves();
+
+      List<NodeInfo>hostList = new LinkedList<NodeInfo>();
+      hostList.addAll(hostSet);
+    
+      // Sort the hosts in this rack based on their contribution
+      sortInDescendingOrder(hostList);
+
+      for (NodeInfo host: hostList) {
+        // Strip out the port number from the host name
+        retVal[index++] = host.node.getName().split(":")[0];
+        if (index == replicationFactor) {
+          done = true;
+          break;
+        }
+      }
+      
+      if (done == true) {
+        break;
+      }
+    }
+    return retVal;
+  }
+  
+  private String[] fakeRacks(BlockLocation[] blkLocations, int index) 
+  throws IOException {
+    String[] allHosts = blkLocations[index].getHosts();
+    String[] allTopos = new String[allHosts.length];
+    for (int i = 0; i < allHosts.length; i++) {
+      allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i];
+    }
+    return allTopos;
+  }
+
+
+  private static class NodeInfo {
+    final Node node;
+    final Set<Integer> blockIds;
+    final Set<NodeInfo> leaves;
+
+    private long value;
+    
+    NodeInfo(Node node) {
+      this.node = node;
+      blockIds = new HashSet<Integer>();
+      leaves = new HashSet<NodeInfo>();
+    }
+
+    long getValue() {return value;}
+
+    void addValue(int blockIndex, long value) {
+      if (blockIds.add(blockIndex) == true) {
+        this.value += value;
+      }
+    }
+
+    Set<NodeInfo> getLeaves() { return leaves;}
+
+    void addLeaf(NodeInfo nodeInfo) {
+      leaves.add(nodeInfo);
+    }
+  }
 }