You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2011/05/03 17:11:36 UTC

svn commit: r1099088 - in /hadoop/common/branches/branch-0.20-security-203: CHANGES.txt src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java

Author: acmurthy
Date: Tue May  3 15:11:35 2011
New Revision: 1099088

URL: http://svn.apache.org/viewvc?rev=1099088&view=rev
Log:
HADOOP-5759. Fix for  IllegalArgumentException when CombineFileInputFormat is used as job InputFormat. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/common/branches/branch-0.20-security-203/CHANGES.txt
    hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
    hadoop/common/branches/branch-0.20-security-203/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java

Modified: hadoop/common/branches/branch-0.20-security-203/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-203/CHANGES.txt?rev=1099088&r1=1099087&r2=1099088&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-203/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-203/CHANGES.txt Tue May  3 15:11:35 2011
@@ -1717,6 +1717,9 @@ Release 0.20.2 - Unreleased
     HADOOP-6269. Fix threading issue with defaultResource in Configuration.
     (Sreekanth Ramakrishnan via cdouglas)
 
+    HADOOP-5759. Fix for  IllegalArgumentException when CombineFileInputFormat
+    is used as job InputFormat. (Amareshwari Sriramadasu via zshao)
+
 Release 0.20.1 - 2009-09-01
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=1099088&r1=1099087&r2=1099088&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java Tue May  3 15:11:35 2011
@@ -20,12 +20,12 @@ package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.HashMap;
 import java.util.Set;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -73,6 +73,9 @@ public abstract class CombineFileInputFo
   // across multiple pools.
   private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
 
+  // mapping from a rack name to the set of Nodes in the rack 
+  private static HashMap<String, Set<String>> rackToNodes = 
+                            new HashMap<String, Set<String>>();
   /**
    * Specify the maximum size (in bytes) of each split. Each split is
    * approximately equal to the specified size.
@@ -214,6 +217,8 @@ public abstract class CombineFileInputFo
     getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
                   maxSize, minSizeNode, minSizeRack, splits);
 
+    // free up rackToNodes map
+    rackToNodes.clear();
     return splits.toArray(new CombineFileSplit[splits.size()]);    
   }
 
@@ -341,7 +346,7 @@ public abstract class CombineFileInputFo
             // create this split.
             if (maxSize != 0 && curSplitSize >= maxSize) {
               // create an input split and add it to the splits array
-              addCreatedSplit(job, splits, racks, validBlocks);
+              addCreatedSplit(job, splits, getHosts(racks), validBlocks);
               createdSplit = true;
               break;
             }
@@ -360,7 +365,7 @@ public abstract class CombineFileInputFo
           if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
             // if there is a mimimum size specified, then create a single split
             // otherwise, store these blocks into overflow data structure
-            addCreatedSplit(job, splits, racks, validBlocks);
+            addCreatedSplit(job, splits, getHosts(racks), validBlocks);
           } else {
             // There were a few blocks in this rack that remained to be processed.
             // Keep them in 'overflow' block list. These will be combined later.
@@ -393,7 +398,7 @@ public abstract class CombineFileInputFo
       // create this split.
       if (maxSize != 0 && curSplitSize >= maxSize) {
         // create an input split and add it to the splits array
-        addCreatedSplit(job, splits, racks, validBlocks);
+        addCreatedSplit(job, splits, getHosts(racks), validBlocks);
         curSplitSize = 0;
         validBlocks.clear();
         racks.clear();
@@ -402,7 +407,7 @@ public abstract class CombineFileInputFo
 
     // Process any remaining blocks, if any.
     if (!validBlocks.isEmpty()) {
-      addCreatedSplit(job, splits, racks, validBlocks);
+      addCreatedSplit(job, splits, getHosts(racks), validBlocks);
     }
   }
 
@@ -412,13 +417,12 @@ public abstract class CombineFileInputFo
    */
   private void addCreatedSplit(JobConf job,
                                List<CombineFileSplit> splitList, 
-                               List<String> racks, 
+                               List<String> locations, 
                                ArrayList<OneBlockInfo> validBlocks) {
     // create an input split
     Path[] fl = new Path[validBlocks.size()];
     long[] offset = new long[validBlocks.size()];
     long[] length = new long[validBlocks.size()];
-    String[] rackLocations = racks.toArray(new String[racks.size()]);
     for (int i = 0; i < validBlocks.size(); i++) {
       fl[i] = validBlocks.get(i).onepath; 
       offset[i] = validBlocks.get(i).offset;
@@ -427,7 +431,7 @@ public abstract class CombineFileInputFo
 
      // add this split to the list that is returned
     CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset, 
-                                                      length, rackLocations);
+                                   length, locations.toArray(new String[0]));
     splitList.add(thissplit); 
   }
 
@@ -484,7 +488,9 @@ public abstract class CombineFileInputFo
               rackToBlocks.put(rack, blklist);
             }
             blklist.add(oneblock);
-          }
+            // Add this host to rackToNodes map
+            addHostToRack(oneblock.racks[j], oneblock.hosts[j]);
+         }
 
           // add this block to the node --> block map
           for (int j = 0; j < oneblock.hosts.length; j++) {
@@ -547,6 +553,23 @@ public abstract class CombineFileInputFo
     }
   }
 
+  private static void addHostToRack(String rack, String host) {
+    Set<String> hosts = rackToNodes.get(rack);
+    if (hosts == null) {
+      hosts = new HashSet<String>();
+      rackToNodes.put(rack, hosts);
+    }
+    hosts.add(host);
+  }
+  
+  private static List<String> getHosts(List<String> racks) {
+    List<String> hosts = new ArrayList<String>();
+    for (String rack : racks) {
+      hosts.addAll(rackToNodes.get(rack));
+    }
+    return hosts;
+  }
+  
   /**
    * Accept a path only if any one of filters given in the
    * constructor do. 

Modified: hadoop/common/branches/branch-0.20-security-203/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-203/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=1099088&r1=1099087&r2=1099088&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-203/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-203/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java Tue May  3 15:11:35 2011
@@ -18,11 +18,6 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
-import java.io.DataOutputStream;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Random;
 
 import junit.framework.TestCase;
 
@@ -30,17 +25,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -151,14 +141,14 @@ public class TestCombineFileInputFormat 
       assertEquals(fileSplit.getPath(1).getName(), file2.getName());
       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "/r2");
+      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
       fileSplit = (CombineFileSplit) splits[1];
       assertEquals(fileSplit.getNumPaths(), 1);
       assertEquals(fileSplit.getLocations().length, 1);
       assertEquals(fileSplit.getPath(0).getName(), file1.getName());
       assertEquals(fileSplit.getOffset(0), 0);
       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "/r1");
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
 
       // create another file on 3 datanodes and 3 racks.
       dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
@@ -186,7 +176,7 @@ public class TestCombineFileInputFormat 
       assertEquals(fileSplit.getPath(2).getName(), file3.getName());
       assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
       assertEquals(fileSplit.getLength(2), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "/r3");
+      assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
       fileSplit = (CombineFileSplit) splits[1];
       assertEquals(fileSplit.getNumPaths(), 2);
       assertEquals(fileSplit.getLocations().length, 1);
@@ -196,14 +186,14 @@ public class TestCombineFileInputFormat 
       assertEquals(fileSplit.getPath(1).getName(), file2.getName());
       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "/r2");
+      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
       fileSplit = (CombineFileSplit) splits[2];
       assertEquals(fileSplit.getNumPaths(), 1);
       assertEquals(fileSplit.getLocations().length, 1);
       assertEquals(fileSplit.getPath(0).getName(), file1.getName());
       assertEquals(fileSplit.getOffset(0), 0);
       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "/r1");
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
 
       // create file4 on all three racks
       Path file4 = new Path(dir4 + "/file4");
@@ -229,7 +219,7 @@ public class TestCombineFileInputFormat 
       assertEquals(fileSplit.getPath(2).getName(), file3.getName());
       assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
       assertEquals(fileSplit.getLength(2), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "/r3");
+      assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
       fileSplit = (CombineFileSplit) splits[1];
       assertEquals(fileSplit.getNumPaths(), 2);
       assertEquals(fileSplit.getLocations().length, 1);
@@ -239,14 +229,14 @@ public class TestCombineFileInputFormat 
       assertEquals(fileSplit.getPath(1).getName(), file2.getName());
       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "/r2");
+      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
       fileSplit = (CombineFileSplit) splits[2];
       assertEquals(fileSplit.getNumPaths(), 1);
       assertEquals(fileSplit.getLocations().length, 1);
       assertEquals(fileSplit.getPath(0).getName(), file1.getName());
       assertEquals(fileSplit.getOffset(0), 0);
       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "/r1");
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
 
       // maximum split size is 2 blocks 
       inFormat = new DummyInputFormat();
@@ -385,7 +375,7 @@ public class TestCombineFileInputFormat 
       assertEquals(fileSplit.getPath(0).getName(), file1.getName());
       assertEquals(fileSplit.getOffset(0), 0);
       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "/r1");
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
 
       // maximum split size is 7 blocks and min is 3 blocks
       inFormat = new DummyInputFormat();
@@ -431,15 +421,15 @@ public class TestCombineFileInputFormat 
       fileSplit = (CombineFileSplit) splits[0];
       assertEquals(fileSplit.getNumPaths(), 2);
       assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getLocations()[0], "/r2");
+      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
       fileSplit = (CombineFileSplit) splits[1];
       assertEquals(fileSplit.getNumPaths(), 1);
       assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getLocations()[0], "/r1");
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
       fileSplit = (CombineFileSplit) splits[2];
       assertEquals(fileSplit.getNumPaths(), 6);
       assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getLocations()[0], "/r3");
+      assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
     } finally {
       if (dfs != null) {
         dfs.shutdown();