You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/11/09 16:53:52 UTC

svn commit: r1407506 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/...

Author: bobby
Date: Fri Nov  9 15:53:52 2012
New Revision: 1407506

URL: http://svn.apache.org/viewvc?rev=1407506&view=rev
Log:
svn merge -c 1407505 FIXES: MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit (Mark Fuhs via bobby)

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1407506&r1=1407505&r2=1407506&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Nov  9 15:53:52 2012
@@ -493,6 +493,9 @@ Release 0.23.5 - UNRELEASED
 
     MAPREDUCE-4772. Fetch failures can take way too long for a map to be 
     restarted (bobby)
+
+    MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit 
+    (Mark Fuhs via bobby)
  
 Release 0.23.4 - UNRELEASED
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java?rev=1407506&r1=1407505&r2=1407506&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java Fri Nov  9 15:53:52 2012
@@ -107,25 +107,14 @@ public class NLineInputFormat extends Fi
         numLines++;
         length += num;
         if (numLines == numLinesPerSplit) {
-          // NLineInputFormat uses LineRecordReader, which always reads
-          // (and consumes) at least one character out of its upper split
-          // boundary. So to make sure that each mapper gets N lines, we
-          // move back the upper split limits of each split 
-          // by one character here.
-          if (begin == 0) {
-            splits.add(new FileSplit(fileName, begin, length - 1,
-              new String[] {}));
-          } else {
-            splits.add(new FileSplit(fileName, begin - 1, length,
-              new String[] {}));
-          }
+          splits.add(createFileSplit(fileName, begin, length));
           begin += length;
           length = 0;
           numLines = 0;
         }
       }
       if (numLines != 0) {
-        splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+        splits.add(createFileSplit(fileName, begin, length));
       }
     } finally {
       if (lr != null) {
@@ -134,6 +123,23 @@ public class NLineInputFormat extends Fi
     }
     return splits; 
   }
+
+  /**
+   * NLineInputFormat uses LineRecordReader, which always reads
+   * (and consumes) at least one character out of its upper split
+   * boundary. So to make sure that each mapper gets N lines, we
+   * move back the upper split limits of each split 
+   * by one character here.
+   * @param fileName  Path of file
+   * @param begin  the position of the first byte in the file to process
+   * @param length  number of bytes in InputSplit
+   * @return  FileSplit
+   */
+  protected static FileSplit createFileSplit(Path fileName, long begin, long length) {
+    return (begin == 0) 
+    ? new FileSplit(fileName, begin, length - 1, new String[] {})
+    : new FileSplit(fileName, begin - 1, length, new String[] {});
+  }
   
   /**
    * Set the number of lines per split

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java?rev=1407506&r1=1407505&r2=1407506&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java Fri Nov  9 15:53:52 2012
@@ -50,37 +50,40 @@ public class TestNLineInputFormat extend
     Job job = Job.getInstance(conf);
     Path file = new Path(workDir, "test.txt");
 
-    int seed = new Random().nextInt();
-    Random random = new Random(seed);
-
     localFs.delete(workDir, true);
     FileInputFormat.setInputPaths(job, workDir);
     int numLinesPerMap = 5;
     NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap);
-    // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;
-         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+         length += 1) {
+ 
       // create a file with length entries
       Writer writer = new OutputStreamWriter(localFs.create(file));
       try {
         for (int i = 0; i < length; i++) {
-          writer.write(Integer.toString(i));
+          writer.write(Integer.toString(i)+" some more text");
           writer.write("\n");
         }
       } finally {
         writer.close();
       }
-      checkFormat(job, numLinesPerMap);
+      int lastN = 0;
+      if (length != 0) {
+        lastN = length % 5;
+        if (lastN == 0) {
+          lastN = 5;
+        }
+      }
+      checkFormat(job, numLinesPerMap, lastN);
     }
   }
 
-  void checkFormat(Job job, int expectedN) 
+  void checkFormat(Job job, int expectedN, int lastN) 
       throws IOException, InterruptedException {
     NLineInputFormat format = new NLineInputFormat();
     List<InputSplit> splits = format.getSplits(job);
-    // check all splits except last one
     int count = 0;
-    for (int i = 0; i < splits.size() -1; i++) {
+    for (int i = 0; i < splits.size(); i++) {
       assertEquals("There are no split locations", 0,
                    splits.get(i).getLocations().length);
       TaskAttemptContext context = MapReduceTestUtil.
@@ -104,8 +107,13 @@ public class TestNLineInputFormat extend
       } finally {
         reader.close();
       }
-      assertEquals("number of lines in split is " + expectedN ,
-                   expectedN, count);
+      if ( i == splits.size() - 1) {
+        assertEquals("number of lines in split(" + i + ") is wrong" ,
+                     lastN, count);
+      } else {
+        assertEquals("number of lines in split(" + i + ") is wrong" ,
+                     expectedN, count);
+      }
     }
   }