You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2013/08/01 19:48:14 UTC

svn commit: r1509349 - in /hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/ hadoop-mapreduce-client/hadoop-mapreduce-client-job...

Author: sseth
Date: Thu Aug  1 17:48:14 2013
New Revision: 1509349

URL: http://svn.apache.org/r1509349
Log:
merge MAPREDUCE-5352. Optimize node local splits generated by CombineFileInputFormat. (sseth)

Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
    hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt?rev=1509349&r1=1509348&r2=1509349&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt Thu Aug  1 17:48:14 2013
@@ -10,6 +10,9 @@ Release 2.1.1-beta - UNRELEASED
 
   OPTIMIZATIONS
 
+    MAPREDUCE-5352. Optimize node local splits generated by
+    CombineFileInputFormat. (sseth)
+
   BUG FIXES
 
     MAPREDUCE-5385. Fixed a bug with JobContext getCacheFiles API. (Omkar Vinit

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1509349&r1=1509348&r2=1509349&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Thu Aug  1 17:48:14 2013
@@ -22,13 +22,18 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.HashMap;
 import java.util.Set;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -50,6 +55,8 @@ import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
 
 /**
  * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 
@@ -79,6 +86,8 @@ import com.google.common.annotations.Vis
 public abstract class CombineFileInputFormat<K, V>
   extends FileInputFormat<K, V> {
   
+  private static final Log LOG = LogFactory.getLog(CombineFileInputFormat.class);
+  
   public static final String SPLIT_MINSIZE_PERNODE = 
     "mapreduce.input.fileinputformat.split.minsize.per.node";
   public static final String SPLIT_MINSIZE_PERRACK = 
@@ -186,6 +195,8 @@ public abstract class CombineFileInputFo
       maxSize = maxSplitSize;
     } else {
       maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
+      // If maxSize is not configured, a single split will be generated per
+      // node.
     }
     if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
       throw new IOException("Minimum split size pernode " + minSizeNode +
@@ -271,8 +282,8 @@ public abstract class CombineFileInputFo
                               new HashMap<OneBlockInfo, String[]>();
 
     // mapping from a node to the list of blocks that it contains
-    HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
-                              new HashMap<String, List<OneBlockInfo>>();
+    HashMap<String, Set<OneBlockInfo>> nodeToBlocks = 
+                              new HashMap<String, Set<OneBlockInfo>>();
     
     files = new OneFileInfo[paths.length];
     if (paths.length == 0) {
@@ -292,9 +303,9 @@ public abstract class CombineFileInputFo
   }
 
   @VisibleForTesting
-  void createSplits(HashMap<String, List<OneBlockInfo>> nodeToBlocks,
-                     HashMap<OneBlockInfo, String[]> blockToNodes,
-                     HashMap<String, List<OneBlockInfo>> rackToBlocks,
+  void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
+                     Map<OneBlockInfo, String[]> blockToNodes,
+                     Map<String, List<OneBlockInfo>> rackToBlocks,
                      long totLength,
                      long maxSize,
                      long minSizeNode,
@@ -302,83 +313,118 @@ public abstract class CombineFileInputFo
                      List<InputSplit> splits                     
                     ) {
     ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
-    Set<String> nodes = new HashSet<String>();
     long curSplitSize = 0;
     
-    int numNodes = nodeToBlocks.size();
+    int totalNodes = nodeToBlocks.size();
     long totalLength = totLength;
 
+    Multiset<String> splitsPerNode = HashMultiset.create();
+    Set<String> completedNodes = new HashSet<String>();
+    
     while(true) {
       // it is allowed for maxSize to be 0. Disable smoothing load for such cases
-      int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ?
-                                        ((int) (totalLength/maxSize))/numNodes
-                                        : Integer.MAX_VALUE;
-      int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1;
-      numNodes = 0;
 
-      // process all nodes and create splits that are local to a node.
-      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
+      // process all nodes and create splits that are local to a node. Generate
+      // one split per node iteration, and walk over nodes multiple times to
+      // distribute the splits across nodes. 
+      for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
           .entrySet().iterator(); iter.hasNext();) {
-        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-        nodes.add(one.getKey());
-        List<OneBlockInfo> blocksInNode = one.getValue();
+        Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
+        
+        String node = one.getKey();
+        
+        // Skip the node if it has previously been marked as completed.
+        if (completedNodes.contains(node)) {
+          continue;
+        }
+
+        Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
 
         // for each block, copy it into validBlocks. Delete it from
         // blockToNodes so that the same block does not appear in
         // two different splits.
-        int splitsInNode = 0;
-        for (OneBlockInfo oneblock : blocksInNode) {
-          if (blockToNodes.containsKey(oneblock)) {
-            validBlocks.add(oneblock);
-            blockToNodes.remove(oneblock);
-            curSplitSize += oneblock.length;
-
-            // if the accumulated split size exceeds the maximum, then
-            // create this split.
-            if (maxSize != 0 && curSplitSize >= maxSize) {
-              // create an input split and add it to the splits array
-              addCreatedSplit(splits, nodes, validBlocks);
-              totalLength -= curSplitSize;
-              curSplitSize = 0;
-              validBlocks.clear();
-              splitsInNode++;
-              if (splitsInNode == maxSplitsByNodeOnly) {
-                // stop grouping on a node so as not to create
-                // disproportionately more splits on a node because it happens
-                // to have many blocks
-                // consider only these nodes in next round of grouping because
-                // they have leftover blocks that may need to be grouped
-                numNodes++;
-                break;
-              }
-            }
+        Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
+        while (oneBlockIter.hasNext()) {
+          OneBlockInfo oneblock = oneBlockIter.next();
+          
+          // Remove all blocks which may already have been assigned to other
+          // splits.
+          if(!blockToNodes.containsKey(oneblock)) {
+            oneBlockIter.remove();
+            continue;
+          }
+        
+          validBlocks.add(oneblock);
+          blockToNodes.remove(oneblock);
+          curSplitSize += oneblock.length;
+
+          // if the accumulated split size exceeds the maximum, then
+          // create this split.
+          if (maxSize != 0 && curSplitSize >= maxSize) {
+            // create an input split and add it to the splits array
+            addCreatedSplit(splits, Collections.singleton(node), validBlocks);
+            totalLength -= curSplitSize;
+            curSplitSize = 0;
+
+            splitsPerNode.add(node);
+
+            // Remove entries from blocksInNode so that we don't walk these
+            // again.
+            blocksInCurrentNode.removeAll(validBlocks);
+            validBlocks.clear();
+
+            // Done creating a single split for this node. Move on to the next
+            // node so that splits are distributed across nodes.
+            break;
           }
+
         }
-        // if there were any blocks left over and their combined size is
-        // larger than minSplitNode, then combine them into one split.
-        // Otherwise add them back to the unprocessed pool. It is likely
-        // that they will be combined with other blocks from the
-        // same rack later on.
-        if (minSizeNode != 0 && curSplitSize >= minSizeNode
-            && splitsInNode == 0) {
-          // haven't created any split on this machine. so its ok to add a
-          // smaller
-          // one for parallelism. Otherwise group it in the rack for balanced
-          // size
-          // create an input split and add it to the splits array
-          addCreatedSplit(splits, nodes, validBlocks);
-          totalLength -= curSplitSize;
-        } else {
-          for (OneBlockInfo oneblock : validBlocks) {
-            blockToNodes.put(oneblock, oneblock.hosts);
+        if (validBlocks.size() != 0) {
+          // This implies that the last few blocks (or all in case maxSize=0)
+          // were not part of a split. The node is complete.
+          
+          // if there were any blocks left over and their combined size is
+          // larger than minSplitNode, then combine them into one split.
+          // Otherwise add them back to the unprocessed pool. It is likely
+          // that they will be combined with other blocks from the
+          // same rack later on.
+          // This condition also kicks in when max split size is not set. All
+          // blocks on a node will be grouped together into a single split.
+          if (minSizeNode != 0 && curSplitSize >= minSizeNode
+              && splitsPerNode.count(node) == 0) {
+            // haven't created any split on this machine. so its ok to add a
+            // smaller one for parallelism. Otherwise group it in the rack for
+            // balanced size create an input split and add it to the splits
+            // array
+            addCreatedSplit(splits, Collections.singleton(node), validBlocks);
+            totalLength -= curSplitSize;
+            splitsPerNode.add(node);
+            // Remove entries from blocksInNode so that we don't walk this again.
+            blocksInCurrentNode.removeAll(validBlocks);
+            // The node is done. This was the last set of blocks for this node.
+          } else {
+            // Put the unplaced blocks back into the pool for later rack-allocation.
+            for (OneBlockInfo oneblock : validBlocks) {
+              blockToNodes.put(oneblock, oneblock.hosts);
+            }
           }
+          validBlocks.clear();
+          curSplitSize = 0;
+          completedNodes.add(node);
+        } else { // No in-flight blocks.
+          if (blocksInCurrentNode.size() == 0) {
+            // Node is done. All blocks were fit into node-local splits.
+            completedNodes.add(node);
+          } // else Run through the node again.
         }
-        validBlocks.clear();
-        nodes.clear();
-        curSplitSize = 0;
       }
-      
-      if(!(numNodes>0 && totalLength>0)) {
+
+      // Check if node-local assignments are complete.
+      if (completedNodes.size() == totalNodes || totalLength == 0) {
+        // All nodes have been walked over and marked as completed or all blocks
+        // have been assigned. The rest should be handled via rackLock assignment.
+        LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
+            + completedNodes.size() + ", size left: " + totalLength);
         break;
       }
     }
@@ -527,7 +573,7 @@ public abstract class CombineFileInputFo
                 boolean isSplitable,
                 HashMap<String, List<OneBlockInfo>> rackToBlocks,
                 HashMap<OneBlockInfo, String[]> blockToNodes,
-                HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                HashMap<String, Set<OneBlockInfo>> nodeToBlocks,
                 HashMap<String, Set<String>> rackToNodes,
                 long maxSize)
                 throws IOException {
@@ -598,10 +644,10 @@ public abstract class CombineFileInputFo
     
     @VisibleForTesting
     static void populateBlockInfo(OneBlockInfo[] blocks,
-                          HashMap<String, List<OneBlockInfo>> rackToBlocks,
-                          HashMap<OneBlockInfo, String[]> blockToNodes,
-                          HashMap<String, List<OneBlockInfo>> nodeToBlocks,
-                          HashMap<String, Set<String>> rackToNodes) {
+                          Map<String, List<OneBlockInfo>> rackToBlocks,
+                          Map<OneBlockInfo, String[]> blockToNodes,
+                          Map<String, Set<OneBlockInfo>> nodeToBlocks,
+                          Map<String, Set<String>> rackToNodes) {
       for (OneBlockInfo oneblock : blocks) {
         // add this block to the block --> node locations map
         blockToNodes.put(oneblock, oneblock.hosts);
@@ -633,9 +679,9 @@ public abstract class CombineFileInputFo
         // add this block to the node --> block map
         for (int j = 0; j < oneblock.hosts.length; j++) {
           String node = oneblock.hosts[j];
-          List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+          Set<OneBlockInfo> blklist = nodeToBlocks.get(node);
           if (blklist == null) {
-            blklist = new ArrayList<OneBlockInfo>();
+            blklist = new LinkedHashSet<OneBlockInfo>();
             nodeToBlocks.put(node, blklist);
           }
           blklist.add(oneblock);
@@ -696,7 +742,7 @@ public abstract class CombineFileInputFo
     return fs.getFileBlockLocations(stat, 0, stat.getLen());
   }
 
-  private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
+  private static void addHostToRack(Map<String, Set<String>> rackToNodes,
                                     String rack, String host) {
     Set<String> hosts = rackToNodes.get(rack);
     if (hosts == null) {

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1509349&r1=1509348&r2=1509349&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Thu Aug  1 17:48:14 2013
@@ -20,23 +20,31 @@ package org.apache.hadoop.mapreduce.lib.
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.Map;
 import java.util.Set;
-import java.util.zip.GZIPOutputStream;
+import java.util.TreeMap;
 import java.util.concurrent.TimeoutException;
+import java.util.zip.GZIPOutputStream;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HdfsBlockLocation;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -715,6 +723,69 @@ public class TestCombineFileInputFormat 
     out.close();
     DFSTestUtil.waitReplication(fileSys, name, replication);
   }
+
+  public void testNodeDistribution() throws IOException, InterruptedException {
+    DummyInputFormat inFormat = new DummyInputFormat();
+    int numBlocks = 60;
+    long totLength = 0;
+    long blockSize = 100;
+    int numNodes = 10;
+
+    long minSizeNode = 50;
+    long minSizeRack = 50;
+    int maxSplitSize = 200; // 4 blocks per split.
+
+    String[] locations = new String[numNodes];
+    for (int i = 0; i < numNodes; i++) {
+      locations[i] = "h" + i;
+    }
+    String[] racks = new String[0];
+    Path path = new Path("hdfs://file");
+
+    OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];
+
+    int hostCountBase = 0;
+    // Generate block list. Replication 3 per block.
+    for (int i = 0; i < numBlocks; i++) {
+      int localHostCount = hostCountBase;
+      String[] blockHosts = new String[3];
+      for (int j = 0; j < 3; j++) {
+        int hostNum = localHostCount % numNodes;
+        blockHosts[j] = "h" + hostNum;
+        localHostCount++;
+      }
+      hostCountBase++;
+      blocks[i] = new OneBlockInfo(path, i * blockSize, blockSize, blockHosts,
+          racks);
+      totLength += blockSize;
+    }
+
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    HashMap<String, Set<String>> rackToNodes = new HashMap<String, Set<String>>();
+    HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>();
+    HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>();
+    Map<String, Set<OneBlockInfo>> nodeToBlocks = new TreeMap<String, Set<OneBlockInfo>>();
+
+    OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes,
+        nodeToBlocks, rackToNodes);
+    
+    inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
+        maxSplitSize, minSizeNode, minSizeRack, splits);
+
+    int expectedSplitCount = (int) (totLength / maxSplitSize);
+    Assert.assertEquals(expectedSplitCount, splits.size());
+
+    // Ensure 90+% of the splits have node local blocks.
+    // 100% locality may not always be achieved.
+    int numLocalSplits = 0;
+    for (InputSplit inputSplit : splits) {
+      Assert.assertEquals(maxSplitSize, inputSplit.getLength());
+      if (inputSplit.getLocations().length == 1) {
+        numLocalSplits++;
+      }
+    }
+    Assert.assertTrue(numLocalSplits >= 0.9 * splits.size());
+  }
   
   public void testNodeInputSplit() throws IOException, InterruptedException {
     // Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on 
@@ -744,8 +815,8 @@ public class TestCombineFileInputFormat 
                               new HashMap<String, List<OneBlockInfo>>();
     HashMap<OneBlockInfo, String[]> blockToNodes = 
                               new HashMap<OneBlockInfo, String[]>();
-    HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
-                              new HashMap<String, List<OneBlockInfo>>();
+    HashMap<String, Set<OneBlockInfo>> nodeToBlocks = 
+                              new HashMap<String, Set<OneBlockInfo>>();
     
     OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes, 
                              nodeToBlocks, rackToNodes);