You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by en...@apache.org on 2007/09/11 12:50:34 UTC

svn commit: r574546 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java

Author: enis
Date: Tue Sep 11 03:50:34 2007
New Revision: 574546

URL: http://svn.apache.org/viewvc?rev=574546&view=rev
Log:
HADOOP-1818. Fix MultiFileInputFormat so that it does not return 
empty splits. Contributed by Thomas Friol. 

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=574546&r1=574545&r2=574546&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Sep 11 03:50:34 2007
@@ -108,6 +108,9 @@
     HADOOP-1853.  Fix contrib/streaming to accept multiple -cacheFile
     options.  (Prachi Gupta via cutting)
 
+    HADOOP-1818. Fix MultiFileInputFormat so that it does not return 
+    empty splits when numPaths < numSplits.  (Thomas Friol via enis)
+
   IMPROVEMENTS
 
     HADOOP-1266. Remove dependency of package org.apache.hadoop.net on 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java?rev=574546&r1=574545&r2=574546&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java Tue Sep 11 03:50:34 2007
@@ -19,6 +19,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -42,35 +44,40 @@
   public InputSplit[] getSplits(JobConf job, int numSplits) 
     throws IOException {
     
-    MultiFileSplit[] splits = new MultiFileSplit[numSplits];
     Path[] paths = listPaths(job);
-    long[] lengths = new long[paths.length];
-    long totLength = 0;
-    for(int i=0; i<paths.length; i++) {
-      FileSystem fs = paths[i].getFileSystem(job);
-      lengths[i] = fs.getContentLength(paths[i]);
-      totLength += lengths[i];
-    }
-    float avgLengthPerSplit = ((float)totLength) / numSplits;
-    long cumulativeLength = 0;
+    List<MultiFileSplit> splits = new ArrayList<MultiFileSplit>(Math.min(numSplits, paths.length));
+    if (paths.length != 0) {
+      // HADOOP-1818: Manage splits only if there are paths
+      long[] lengths = new long[paths.length];
+      long totLength = 0;
+      for(int i=0; i<paths.length; i++) {
+        FileSystem fs = paths[i].getFileSystem(job);
+        lengths[i] = fs.getContentLength(paths[i]);
+        totLength += lengths[i];
+      }
+      float avgLengthPerSplit = ((float)totLength) / numSplits;
+      long cumulativeLength = 0;
 
-    int startIndex = 0;
+      int startIndex = 0;
 
-    for(int i=0; i<numSplits; i++) {
-      int splitSize = findSize(i, avgLengthPerSplit, cumulativeLength
-          , startIndex, lengths);
-      Path[] splitPaths = new Path[splitSize];
-      long[] splitLengths = new long[splitSize];
-      System.arraycopy(paths, startIndex, splitPaths , 0, splitSize);
-      System.arraycopy(lengths, startIndex, splitLengths , 0, splitSize);
-      splits[i] = new MultiFileSplit(job, splitPaths, splitLengths);
-      startIndex += splitSize;
-      for(long l: splitLengths) {
-        cumulativeLength += l;
+      for(int i=0; i<numSplits; i++) {
+        int splitSize = findSize(i, avgLengthPerSplit, cumulativeLength
+            , startIndex, lengths);
+        if (splitSize != 0) {
+          // HADOOP-1818: Manage split only if split size is not equals to 0
+          Path[] splitPaths = new Path[splitSize];
+          long[] splitLengths = new long[splitSize];
+          System.arraycopy(paths, startIndex, splitPaths , 0, splitSize);
+          System.arraycopy(lengths, startIndex, splitLengths , 0, splitSize);
+          splits.add(new MultiFileSplit(job, splitPaths, splitLengths));
+          startIndex += splitSize;
+          for(long l: splitLengths) {
+            cumulativeLength += l;
+          }
+        }
       }
     }
-    return splits;
-    
+    return splits.toArray(new MultiFileSplit[splits.size()]);    
   }
 
   private int findSize(int splitIndex, float avgLengthPerSplit

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java?rev=574546&r1=574545&r2=574546&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java Tue Sep 11 03:50:34 2007
@@ -33,6 +33,7 @@
 public class TestMultiFileInputFormat extends TestCase{
 
   private static JobConf job = new JobConf();
+
   private static final Log LOG = LogFactory.getLog(TestMultiFileInputFormat.class);
   
   private static final int MAX_SPLIT_COUNT  = 10000;
@@ -53,7 +54,7 @@
     }
   }
   
-  private Path initFiles(FileSystem fs, int numFiles) throws IOException{
+  private Path initFiles(FileSystem fs, int numFiles, int numBytes) throws IOException{
     Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
     Path multiFileDir = new Path(dir, "test.multifile");
     fs.delete(multiFileDir);
@@ -62,7 +63,9 @@
     for(int i=0; i<numFiles ;i++) {
       Path path = new Path(multiFileDir, "file_" + i);
        FSDataOutputStream out = fs.create(path);
-       int numBytes = rand.nextInt(MAX_BYTES);
+       if (numBytes == -1) {
+         numBytes = rand.nextInt(MAX_BYTES);
+       }
        for(int j=0; j< numBytes; j++) {
          out.write(rand.nextInt());
        }
@@ -92,7 +95,7 @@
     for(int numFiles = 1; numFiles< MAX_NUM_FILES ; 
       numFiles+= (NUM_FILES_INCR / 2) + rand.nextInt(NUM_FILES_INCR / 2)) {
       
-      Path dir = initFiles(fs, numFiles);
+      Path dir = initFiles(fs, numFiles, -1);
       BitSet bits = new BitSet(numFiles);
       for(int i=1;i< MAX_SPLIT_COUNT ;i+= rand.nextInt(SPLIT_COUNT_INCR) + 1) {
         LOG.info("Running for Num Files=" + numFiles + ", split count=" + i);
@@ -119,6 +122,19 @@
       fs.delete(dir);
     }
     LOG.info("Test Finished");
+  }
+  
+  public void testFormatWithLessPathsThanSplits() throws Exception {
+    MultiFileInputFormat format = new DummyMultiFileInputFormat();
+    FileSystem fs = FileSystem.getLocal(job);     
+    
+    // Test with no path
+    initFiles(fs, 0, -1);    
+    assertEquals(0, format.getSplits(job, 2).length);
+    
+    // Test with 2 path and 4 splits
+    initFiles(fs, 2, 500);
+    assertEquals(2, format.getSplits(job, 4).length);
   }
   
   public static void main(String[] args) throws Exception{