You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2013/08/01 19:48:14 UTC
svn commit: r1509349 - in
/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/
hadoop-mapreduce-client/hadoop-mapreduce-client-job...
Author: sseth
Date: Thu Aug 1 17:48:14 2013
New Revision: 1509349
URL: http://svn.apache.org/r1509349
Log:
merge MAPREDUCE-5352. Optimize node local splits generated by CombineFileInputFormat. (sseth)
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt?rev=1509349&r1=1509348&r2=1509349&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt Thu Aug 1 17:48:14 2013
@@ -10,6 +10,9 @@ Release 2.1.1-beta - UNRELEASED
OPTIMIZATIONS
+ MAPREDUCE-5352. Optimize node local splits generated by
+ CombineFileInputFormat. (sseth)
+
BUG FIXES
MAPREDUCE-5385. Fixed a bug with JobContext getCacheFiles API. (Omkar Vinit
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1509349&r1=1509348&r2=1509349&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Thu Aug 1 17:48:14 2013
@@ -22,13 +22,18 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
import java.util.HashSet;
import java.util.List;
import java.util.HashMap;
import java.util.Set;
import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -50,6 +55,8 @@ import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
/**
* An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in
@@ -79,6 +86,8 @@ import com.google.common.annotations.Vis
public abstract class CombineFileInputFormat<K, V>
extends FileInputFormat<K, V> {
+ private static final Log LOG = LogFactory.getLog(CombineFileInputFormat.class);
+
public static final String SPLIT_MINSIZE_PERNODE =
"mapreduce.input.fileinputformat.split.minsize.per.node";
public static final String SPLIT_MINSIZE_PERRACK =
@@ -186,6 +195,8 @@ public abstract class CombineFileInputFo
maxSize = maxSplitSize;
} else {
maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
+ // If maxSize is not configured, a single split will be generated per
+ // node.
}
if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
throw new IOException("Minimum split size pernode " + minSizeNode +
@@ -271,8 +282,8 @@ public abstract class CombineFileInputFo
new HashMap<OneBlockInfo, String[]>();
// mapping from a node to the list of blocks that it contains
- HashMap<String, List<OneBlockInfo>> nodeToBlocks =
- new HashMap<String, List<OneBlockInfo>>();
+ HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
+ new HashMap<String, Set<OneBlockInfo>>();
files = new OneFileInfo[paths.length];
if (paths.length == 0) {
@@ -292,9 +303,9 @@ public abstract class CombineFileInputFo
}
@VisibleForTesting
- void createSplits(HashMap<String, List<OneBlockInfo>> nodeToBlocks,
- HashMap<OneBlockInfo, String[]> blockToNodes,
- HashMap<String, List<OneBlockInfo>> rackToBlocks,
+ void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
+ Map<OneBlockInfo, String[]> blockToNodes,
+ Map<String, List<OneBlockInfo>> rackToBlocks,
long totLength,
long maxSize,
long minSizeNode,
@@ -302,83 +313,118 @@ public abstract class CombineFileInputFo
List<InputSplit> splits
) {
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
- Set<String> nodes = new HashSet<String>();
long curSplitSize = 0;
- int numNodes = nodeToBlocks.size();
+ int totalNodes = nodeToBlocks.size();
long totalLength = totLength;
+ Multiset<String> splitsPerNode = HashMultiset.create();
+ Set<String> completedNodes = new HashSet<String>();
+
while(true) {
// it is allowed for maxSize to be 0. Disable smoothing load for such cases
- int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ?
- ((int) (totalLength/maxSize))/numNodes
- : Integer.MAX_VALUE;
- int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1;
- numNodes = 0;
- // process all nodes and create splits that are local to a node.
- for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
+ // process all nodes and create splits that are local to a node. Generate
+ // one split per node iteration, and walk over nodes multiple times to
+ // distribute the splits across nodes.
+ for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
.entrySet().iterator(); iter.hasNext();) {
- Map.Entry<String, List<OneBlockInfo>> one = iter.next();
- nodes.add(one.getKey());
- List<OneBlockInfo> blocksInNode = one.getValue();
+ Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
+
+ String node = one.getKey();
+
+ // Skip the node if it has previously been marked as completed.
+ if (completedNodes.contains(node)) {
+ continue;
+ }
+
+ Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
// for each block, copy it into validBlocks. Delete it from
// blockToNodes so that the same block does not appear in
// two different splits.
- int splitsInNode = 0;
- for (OneBlockInfo oneblock : blocksInNode) {
- if (blockToNodes.containsKey(oneblock)) {
- validBlocks.add(oneblock);
- blockToNodes.remove(oneblock);
- curSplitSize += oneblock.length;
-
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(splits, nodes, validBlocks);
- totalLength -= curSplitSize;
- curSplitSize = 0;
- validBlocks.clear();
- splitsInNode++;
- if (splitsInNode == maxSplitsByNodeOnly) {
- // stop grouping on a node so as not to create
- // disproportionately more splits on a node because it happens
- // to have many blocks
- // consider only these nodes in next round of grouping because
- // they have leftover blocks that may need to be grouped
- numNodes++;
- break;
- }
- }
+ Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
+ while (oneBlockIter.hasNext()) {
+ OneBlockInfo oneblock = oneBlockIter.next();
+
+ // Remove all blocks which may already have been assigned to other
+ // splits.
+ if(!blockToNodes.containsKey(oneblock)) {
+ oneBlockIter.remove();
+ continue;
+ }
+
+ validBlocks.add(oneblock);
+ blockToNodes.remove(oneblock);
+ curSplitSize += oneblock.length;
+
+ // if the accumulated split size exceeds the maximum, then
+ // create this split.
+ if (maxSize != 0 && curSplitSize >= maxSize) {
+ // create an input split and add it to the splits array
+ addCreatedSplit(splits, Collections.singleton(node), validBlocks);
+ totalLength -= curSplitSize;
+ curSplitSize = 0;
+
+ splitsPerNode.add(node);
+
+ // Remove entries from blocksInNode so that we don't walk these
+ // again.
+ blocksInCurrentNode.removeAll(validBlocks);
+ validBlocks.clear();
+
+ // Done creating a single split for this node. Move on to the next
+ // node so that splits are distributed across nodes.
+ break;
}
+
}
- // if there were any blocks left over and their combined size is
- // larger than minSplitNode, then combine them into one split.
- // Otherwise add them back to the unprocessed pool. It is likely
- // that they will be combined with other blocks from the
- // same rack later on.
- if (minSizeNode != 0 && curSplitSize >= minSizeNode
- && splitsInNode == 0) {
- // haven't created any split on this machine. so its ok to add a
- // smaller
- // one for parallelism. Otherwise group it in the rack for balanced
- // size
- // create an input split and add it to the splits array
- addCreatedSplit(splits, nodes, validBlocks);
- totalLength -= curSplitSize;
- } else {
- for (OneBlockInfo oneblock : validBlocks) {
- blockToNodes.put(oneblock, oneblock.hosts);
+ if (validBlocks.size() != 0) {
+ // This implies that the last few blocks (or all in case maxSize=0)
+ // were not part of a split. The node is complete.
+
+ // if there were any blocks left over and their combined size is
+ // larger than minSplitNode, then combine them into one split.
+ // Otherwise add them back to the unprocessed pool. It is likely
+ // that they will be combined with other blocks from the
+ // same rack later on.
+ // This condition also kicks in when max split size is not set. All
+ // blocks on a node will be grouped together into a single split.
+ if (minSizeNode != 0 && curSplitSize >= minSizeNode
+ && splitsPerNode.count(node) == 0) {
+ // haven't created any split on this machine. so its ok to add a
+ // smaller one for parallelism. Otherwise group it in the rack for
+ // balanced size create an input split and add it to the splits
+ // array
+ addCreatedSplit(splits, Collections.singleton(node), validBlocks);
+ totalLength -= curSplitSize;
+ splitsPerNode.add(node);
+ // Remove entries from blocksInNode so that we don't walk this again.
+ blocksInCurrentNode.removeAll(validBlocks);
+ // The node is done. This was the last set of blocks for this node.
+ } else {
+ // Put the unplaced blocks back into the pool for later rack-allocation.
+ for (OneBlockInfo oneblock : validBlocks) {
+ blockToNodes.put(oneblock, oneblock.hosts);
+ }
}
+ validBlocks.clear();
+ curSplitSize = 0;
+ completedNodes.add(node);
+ } else { // No in-flight blocks.
+ if (blocksInCurrentNode.size() == 0) {
+ // Node is done. All blocks were fit into node-local splits.
+ completedNodes.add(node);
+ } // else Run through the node again.
}
- validBlocks.clear();
- nodes.clear();
- curSplitSize = 0;
}
-
- if(!(numNodes>0 && totalLength>0)) {
+
+ // Check if node-local assignments are complete.
+ if (completedNodes.size() == totalNodes || totalLength == 0) {
+ // All nodes have been walked over and marked as completed or all blocks
+ // have been assigned. The rest should be handled via rackLock assignment.
+ LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
+ + completedNodes.size() + ", size left: " + totalLength);
break;
}
}
@@ -527,7 +573,7 @@ public abstract class CombineFileInputFo
boolean isSplitable,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
- HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+ HashMap<String, Set<OneBlockInfo>> nodeToBlocks,
HashMap<String, Set<String>> rackToNodes,
long maxSize)
throws IOException {
@@ -598,10 +644,10 @@ public abstract class CombineFileInputFo
@VisibleForTesting
static void populateBlockInfo(OneBlockInfo[] blocks,
- HashMap<String, List<OneBlockInfo>> rackToBlocks,
- HashMap<OneBlockInfo, String[]> blockToNodes,
- HashMap<String, List<OneBlockInfo>> nodeToBlocks,
- HashMap<String, Set<String>> rackToNodes) {
+ Map<String, List<OneBlockInfo>> rackToBlocks,
+ Map<OneBlockInfo, String[]> blockToNodes,
+ Map<String, Set<OneBlockInfo>> nodeToBlocks,
+ Map<String, Set<String>> rackToNodes) {
for (OneBlockInfo oneblock : blocks) {
// add this block to the block --> node locations map
blockToNodes.put(oneblock, oneblock.hosts);
@@ -633,9 +679,9 @@ public abstract class CombineFileInputFo
// add this block to the node --> block map
for (int j = 0; j < oneblock.hosts.length; j++) {
String node = oneblock.hosts[j];
- List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+ Set<OneBlockInfo> blklist = nodeToBlocks.get(node);
if (blklist == null) {
- blklist = new ArrayList<OneBlockInfo>();
+ blklist = new LinkedHashSet<OneBlockInfo>();
nodeToBlocks.put(node, blklist);
}
blklist.add(oneblock);
@@ -696,7 +742,7 @@ public abstract class CombineFileInputFo
return fs.getFileBlockLocations(stat, 0, stat.getLen());
}
- private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
+ private static void addHostToRack(Map<String, Set<String>> rackToNodes,
String rack, String host) {
Set<String> hosts = rackToNodes.get(rack);
if (hosts == null) {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1509349&r1=1509348&r2=1509349&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Thu Aug 1 17:48:14 2013
@@ -20,23 +20,31 @@ package org.apache.hadoop.mapreduce.lib.
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.ArrayList;
+import java.util.Map;
import java.util.Set;
-import java.util.zip.GZIPOutputStream;
+import java.util.TreeMap;
import java.util.concurrent.TimeoutException;
+import java.util.zip.GZIPOutputStream;
import junit.framework.Assert;
import junit.framework.TestCase;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HdfsBlockLocation;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -715,6 +723,69 @@ public class TestCombineFileInputFormat
out.close();
DFSTestUtil.waitReplication(fileSys, name, replication);
}
+
+ public void testNodeDistribution() throws IOException, InterruptedException {
+ DummyInputFormat inFormat = new DummyInputFormat();
+ int numBlocks = 60;
+ long totLength = 0;
+ long blockSize = 100;
+ int numNodes = 10;
+
+ long minSizeNode = 50;
+ long minSizeRack = 50;
+ int maxSplitSize = 200; // 4 blocks per split.
+
+ String[] locations = new String[numNodes];
+ for (int i = 0; i < numNodes; i++) {
+ locations[i] = "h" + i;
+ }
+ String[] racks = new String[0];
+ Path path = new Path("hdfs://file");
+
+ OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];
+
+ int hostCountBase = 0;
+ // Generate block list. Replication 3 per block.
+ for (int i = 0; i < numBlocks; i++) {
+ int localHostCount = hostCountBase;
+ String[] blockHosts = new String[3];
+ for (int j = 0; j < 3; j++) {
+ int hostNum = localHostCount % numNodes;
+ blockHosts[j] = "h" + hostNum;
+ localHostCount++;
+ }
+ hostCountBase++;
+ blocks[i] = new OneBlockInfo(path, i * blockSize, blockSize, blockHosts,
+ racks);
+ totLength += blockSize;
+ }
+
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ HashMap<String, Set<String>> rackToNodes = new HashMap<String, Set<String>>();
+ HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>();
+ HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>();
+ Map<String, Set<OneBlockInfo>> nodeToBlocks = new TreeMap<String, Set<OneBlockInfo>>();
+
+ OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes,
+ nodeToBlocks, rackToNodes);
+
+ inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
+ maxSplitSize, minSizeNode, minSizeRack, splits);
+
+ int expectedSplitCount = (int) (totLength / maxSplitSize);
+ Assert.assertEquals(expectedSplitCount, splits.size());
+
+ // Ensure 90+% of the splits have node local blocks.
+ // 100% locality may not always be achieved.
+ int numLocalSplits = 0;
+ for (InputSplit inputSplit : splits) {
+ Assert.assertEquals(maxSplitSize, inputSplit.getLength());
+ if (inputSplit.getLocations().length == 1) {
+ numLocalSplits++;
+ }
+ }
+ Assert.assertTrue(numLocalSplits >= 0.9 * splits.size());
+ }
public void testNodeInputSplit() throws IOException, InterruptedException {
// Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on
@@ -744,8 +815,8 @@ public class TestCombineFileInputFormat
new HashMap<String, List<OneBlockInfo>>();
HashMap<OneBlockInfo, String[]> blockToNodes =
new HashMap<OneBlockInfo, String[]>();
- HashMap<String, List<OneBlockInfo>> nodeToBlocks =
- new HashMap<String, List<OneBlockInfo>>();
+ HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
+ new HashMap<String, Set<OneBlockInfo>>();
OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes,
nodeToBlocks, rackToNodes);