You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2012/12/21 08:41:29 UTC

svn commit: r1424817 - in /hadoop/common/branches/branch-1.1: CHANGES.txt src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java

Author: acmurthy
Date: Fri Dec 21 07:41:29 2012
New Revision: 1424817

URL: http://svn.apache.org/viewvc?rev=1424817&view=rev
Log:
Merge -c 1424815 from branch-1 to branch-1.1 to fix MAPREDUCE-4888. Fixed NLineInputFormat one-off error which dropped data. Contributed by Vinod K V.

Modified:
    hadoop/common/branches/branch-1.1/CHANGES.txt
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java
    hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java

Modified: hadoop/common/branches/branch-1.1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1424817&r1=1424816&r2=1424817&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.1/CHANGES.txt Fri Dec 21 07:41:29 2012
@@ -65,6 +65,9 @@ Release 1.1.2 - 2012.12.07
 
     MAPREDUCE-4859. Fixed TestRecoveryManager. (acmurthy) 
 
+    MAPREDUCE-4888. Fixed NLineInputFormat one-off error which dropped data.
+    (vinodkv via acmurthy) 
+
 Release 1.1.1 - 2012.11.18
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java?rev=1424817&r1=1424816&r2=1424817&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java (original)
+++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java Fri Dec 21 07:41:29 2012
@@ -97,25 +97,14 @@ public class NLineInputFormat extends Fi
           numLines++;
           length += num;
           if (numLines == N) {
-            // NLineInputFormat uses LineRecordReader, which always reads (and
-            // consumes) at least one character out of its upper split
-            // boundary. So to make sure that each mapper gets N lines, we
-            // move back the upper split limits of each split by one character 
-            // here.
-            if (begin == 0) {
-              splits.add(new FileSplit(fileName, begin, length - 1,
-                new String[] {}));
-            } else {
-              splits.add(new FileSplit(fileName, begin - 1, length,
-                new String[] {}));
-            }
+            splits.add(createFileSplit(fileName, begin, length));
             begin += length;
             length = 0;
             numLines = 0;
           }
         }
         if (numLines != 0) {
-          splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+          splits.add(createFileSplit(fileName, begin, length));
         }
    
       } finally {
@@ -127,6 +116,23 @@ public class NLineInputFormat extends Fi
     return splits.toArray(new FileSplit[splits.size()]);
   }
 
+  /**
+   * NLineInputFormat uses LineRecordReader, which always reads
+   * (and consumes) at least one character out of its upper split
+   * boundary. So to make sure that each mapper gets N lines, we
+   * move back the upper split limits of each split 
+   * by one character here.
+   * @param fileName  Path of file
+   * @param begin  the position of the first byte in the file to process
+   * @param length  number of bytes in InputSplit
+   * @return  FileSplit
+   */
+  protected static FileSplit createFileSplit(Path fileName, long begin, long length) {
+    return (begin == 0) 
+    ? new FileSplit(fileName, begin, length - 1, new String[] {})
+    : new FileSplit(fileName, begin - 1, length, new String[] {});
+  }
+
   public void configure(JobConf conf) {
     N = conf.getInt("mapred.line.input.format.linespermap", 1);
   }

Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java?rev=1424817&r1=1424816&r2=1424817&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java (original)
+++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java Fri Dec 21 07:41:29 2012
@@ -48,9 +48,6 @@ public class TestLineInputFormat extends
     JobConf job = new JobConf();
     Path file = new Path(workDir, "test.txt");
 
-    int seed = new Random().nextInt();
-    Random random = new Random(seed);
-
     localFs.delete(workDir, true);
     FileInputFormat.setInputPaths(job, workDir);
     int numLinesPerMap = 5;
@@ -58,7 +55,8 @@ public class TestLineInputFormat extends
 
     // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;
-         length += random.nextInt(MAX_LENGTH/10) + 1) {
+         length += 1) {
+      System.out.println("Processing file of length "+length);
       // create a file with length entries
       Writer writer = new OutputStreamWriter(localFs.create(file));
       try {
@@ -69,14 +67,21 @@ public class TestLineInputFormat extends
       } finally {
         writer.close();
       }
-      checkFormat(job, numLinesPerMap);
+      int lastN = 0;
+      if (length != 0) {
+        lastN = length % numLinesPerMap;
+        if (lastN == 0) {
+          lastN = numLinesPerMap;
+        }
+      }
+      checkFormat(job, numLinesPerMap, lastN);
     }
   }
 
   // A reporter that does nothing
   private static final Reporter voidReporter = Reporter.NULL;
   
-  void checkFormat(JobConf job, int expectedN) throws IOException{
+  void checkFormat(JobConf job, int expectedN, int lastN) throws IOException{
     NLineInputFormat format = new NLineInputFormat();
     format.configure(job);
     int ignoredNumSplits = 1;
@@ -84,7 +89,8 @@ public class TestLineInputFormat extends
 
     // check all splits except last one
     int count = 0;
-    for (int j = 0; j < splits.length -1; j++) {
+    for (int j = 0; j < splits.length; j++) {
+      System.out.println("Processing split "+splits[j]);
       assertEquals("There are no split locations", 0,
                    splits[j].getLocations().length);
       RecordReader<LongWritable, Text> reader =
@@ -102,16 +108,22 @@ public class TestLineInputFormat extends
       try {
         count = 0;
         while (reader.next(key, value)) {
+          System.out.println("Got "+key+" "+value+" at count "+count+" of split "+j);
           count++;
         }
       } finally {
         reader.close();
       }
-      assertEquals("number of lines in split is " + expectedN ,
-                   expectedN, count);
+      if ( j == splits.length - 1) {
+        assertEquals("number of lines in split(" + j + ") is wrong" ,
+                     lastN, count);
+      } else {
+        assertEquals("number of lines in split(" + j + ") is wrong" ,
+                     expectedN, count);
+      }
     }
   }
-  
+
   public static void main(String[] args) throws Exception {
     new TestLineInputFormat().testFormat();
   }