You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/11/18 10:57:35 UTC
svn commit: r718539 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
Author: ddas
Date: Tue Nov 18 01:57:35 2008
New Revision: 718539
URL: http://svn.apache.org/viewvc?rev=718539&view=rev
Log:
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits based on the rack/host that has the most number of bytes. Contributed by Jothi Padmanabhan.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=718539&r1=718538&r2=718539&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Nov 18 01:57:35 2008
@@ -117,6 +117,10 @@
OPTIMIZATIONS
+ HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
+ based on the rack/host that has the most number of bytes.
+ (Jothi Padmanabhan via ddas)
+
BUG FIXES
HADOOP-4204. Fix findbugs warnings related to unused variables, naive
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java?rev=718539&r1=718538&r2=718539&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java Tue Nov 18 01:57:35 2008
@@ -20,8 +20,15 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,6 +38,9 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -214,9 +224,10 @@
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
- int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
+ String[] splitHosts = getSplitHosts(blkLocations,
+ length-bytesRemaining, splitSize);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
- blkLocations[blkIndex].getHosts()));
+ splitHosts));
bytesRemaining -= splitSize;
}
@@ -225,7 +236,8 @@
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {
- splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+ String[] splitHosts = getSplitHosts(blkLocations,0,length);
+ splits.add(new FileSplit(path, 0, length, splitHosts));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
@@ -371,5 +383,213 @@
}
return result;
}
+
+
+ private void sortInDescendingOrder(List<NodeInfo> mylist) {
+ Collections.sort(mylist, new Comparator<NodeInfo> () {
+ public int compare(NodeInfo obj1, NodeInfo obj2) {
+
+ if (obj1 == null || obj2 == null)
+ return -1;
+
+ if (obj1.getValue() == obj2.getValue()) {
+ return 0;
+ }
+ else {
+ return ((obj1.getValue() < obj2.getValue()) ? 1 : -1);
+ }
+ }
+ }
+ );
+ }
+ /**
+ * This function identifies and returns the hosts that contribute
+ * most for a given split. For calculating the contribution, rack
+ * locality is treated on par with host locality, so hosts from racks
+ * that contribute the most are preferred over hosts on racks that
+ * contribute less
+ * @param blkLocations The list of block locations
+ * @param offset
+ * @param splitSize
+ * @return array of hosts that contribute most to this split
+ * @throws IOException
+ */
+ protected String[] getSplitHosts(BlockLocation[] blkLocations,
+ long offset, long splitSize) throws IOException {
+
+ int startIndex = getBlockIndex(blkLocations, offset);
+
+ long bytesInThisBlock = blkLocations[startIndex].getOffset() +
+ blkLocations[startIndex].getLength() - offset;
+
+ //If this is the only block, just return
+ if (bytesInThisBlock >= splitSize) {
+ return blkLocations[startIndex].getHosts();
+ }
+
+ long bytesInFirstBlock = bytesInThisBlock;
+ int index = startIndex + 1;
+ splitSize -= bytesInThisBlock;
+
+ while (splitSize > 0) {
+ bytesInThisBlock =
+ Math.min(splitSize, blkLocations[index++].getLength());
+ splitSize -= bytesInThisBlock;
+ }
+
+ long bytesInLastBlock = bytesInThisBlock;
+ int endIndex = index - 1;
+
+ NetworkTopology clusterMap = new NetworkTopology();
+ Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
+ Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
+ String [] allTopos = new String[0];
+
+ // Build the hierarchy and aggregate the contribution of
+ // bytes at each level. See TestGetSplitHosts.java
+
+ for (index = startIndex; index <= endIndex; index++) {
+
+ // Establish the bytes in this block
+ if (index == startIndex) {
+ bytesInThisBlock = bytesInFirstBlock;
+ }
+ else if (index == endIndex) {
+ bytesInThisBlock = bytesInLastBlock;
+ }
+ else {
+ bytesInThisBlock = blkLocations[index].getLength();
+ }
+
+ allTopos = blkLocations[index].getTopologyPaths();
+
+ // If no topology information is available, just
+ // prefix a fakeRack
+ if (allTopos.length == 0) {
+ allTopos = fakeRacks(blkLocations, index);
+ }
+
+ // NOTE: This code currently works only for one level of
+ // hierarchy (rack/host). However, it is relatively easy
+ // to extend this to support aggregation at different
+ // levels
+
+ for (String topo: allTopos) {
+
+ Node node, parentNode;
+ NodeInfo nodeInfo, parentNodeInfo;
+
+ node = clusterMap.getNode(topo);
+
+ if (node == null) {
+ node = new NodeBase(topo);
+ clusterMap.add(node);
+ nodeInfo = new NodeInfo(node);
+ hostsMap.put(node,nodeInfo);
+ parentNode = node.getParent();
+ parentNodeInfo = racksMap.get(parentNode);
+ if (parentNodeInfo == null) {
+ parentNodeInfo = new NodeInfo(parentNode);
+ racksMap.put(parentNode,parentNodeInfo);
+ }
+ parentNodeInfo.addLeaf(nodeInfo);
+ }
+ else {
+ nodeInfo = hostsMap.get(node);
+ parentNode = node.getParent();
+ parentNodeInfo = racksMap.get(parentNode);
+ }
+
+ nodeInfo.addValue(index, bytesInThisBlock);
+ parentNodeInfo.addValue(index, bytesInThisBlock);
+
+ } // for all topos
+
+ } // for all indices
+
+ return identifyHosts(allTopos.length, racksMap);
+ }
+
+ private String[] identifyHosts(int replicationFactor,
+ Map<Node,NodeInfo> racksMap) {
+
+ String [] retVal = new String[replicationFactor];
+
+ List <NodeInfo> rackList = new LinkedList<NodeInfo>();
+
+ rackList.addAll(racksMap.values());
+
+ // Sort the racks based on their contribution to this split
+ sortInDescendingOrder(rackList);
+
+ boolean done = false;
+ int index = 0;
+
+ // Get the host list for all our aggregated items, sort
+ // them and return the top entries
+ for (NodeInfo ni: rackList) {
+
+ Set<NodeInfo> hostSet = ni.getLeaves();
+
+ List<NodeInfo>hostList = new LinkedList<NodeInfo>();
+ hostList.addAll(hostSet);
+
+ // Sort the hosts in this rack based on their contribution
+ sortInDescendingOrder(hostList);
+
+ for (NodeInfo host: hostList) {
+ // Strip out the port number from the host name
+ retVal[index++] = host.node.getName().split(":")[0];
+ if (index == replicationFactor) {
+ done = true;
+ break;
+ }
+ }
+
+ if (done == true) {
+ break;
+ }
+ }
+ return retVal;
+ }
+
+ private String[] fakeRacks(BlockLocation[] blkLocations, int index)
+ throws IOException {
+ String[] allHosts = blkLocations[index].getHosts();
+ String[] allTopos = new String[allHosts.length];
+ for (int i = 0; i < allHosts.length; i++) {
+ allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i];
+ }
+ return allTopos;
+ }
+
+
+ private static class NodeInfo {
+ final Node node;
+ final Set<Integer> blockIds;
+ final Set<NodeInfo> leaves;
+
+ private long value;
+
+ NodeInfo(Node node) {
+ this.node = node;
+ blockIds = new HashSet<Integer>();
+ leaves = new HashSet<NodeInfo>();
+ }
+
+ long getValue() {return value;}
+
+ void addValue(int blockIndex, long value) {
+ if (blockIds.add(blockIndex) == true) {
+ this.value += value;
+ }
+ }
+
+ Set<NodeInfo> getLeaves() { return leaves;}
+
+ void addLeaf(NodeInfo nodeInfo) {
+ leaves.add(nodeInfo);
+ }
+ }
}