You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/11/09 16:54:48 UTC
svn commit: r1407507 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclie...
Author: bobby
Date: Fri Nov 9 15:54:47 2012
New Revision: 1407507
URL: http://svn.apache.org/viewvc?rev=1407507&view=rev
Log:
svn merge -c 1407505 FIXES: MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit (Mark Fuhs via bobby)
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1407507&r1=1407506&r2=1407507&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Nov 9 15:54:47 2012
@@ -76,6 +76,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4772. Fetch failures can take way too long for a map to be
restarted (bobby)
+
+ MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit
+ (Mark Fuhs via bobby)
Release 0.23.4 - UNRELEASED
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java?rev=1407507&r1=1407506&r2=1407507&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java Fri Nov 9 15:54:47 2012
@@ -107,25 +107,14 @@ public class NLineInputFormat extends Fi
numLines++;
length += num;
if (numLines == numLinesPerSplit) {
- // NLineInputFormat uses LineRecordReader, which always reads
- // (and consumes) at least one character out of its upper split
- // boundary. So to make sure that each mapper gets N lines, we
- // move back the upper split limits of each split
- // by one character here.
- if (begin == 0) {
- splits.add(new FileSplit(fileName, begin, length - 1,
- new String[] {}));
- } else {
- splits.add(new FileSplit(fileName, begin - 1, length,
- new String[] {}));
- }
+ splits.add(createFileSplit(fileName, begin, length));
begin += length;
length = 0;
numLines = 0;
}
}
if (numLines != 0) {
- splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+ splits.add(createFileSplit(fileName, begin, length));
}
} finally {
if (lr != null) {
@@ -134,6 +123,23 @@ public class NLineInputFormat extends Fi
}
return splits;
}
+
+ /**
+ * NLineInputFormat uses LineRecordReader, which always reads
+ * (and consumes) at least one character out of its upper split
+ * boundary. So to make sure that each mapper gets N lines, we
+ * move back the upper split limits of each split
+ * by one character here.
+ * @param fileName Path of file
+ * @param begin the position of the first byte in the file to process
+ * @param length number of bytes in InputSplit
+ * @return FileSplit
+ */
+ protected static FileSplit createFileSplit(Path fileName, long begin, long length) {
+ return (begin == 0)
+ ? new FileSplit(fileName, begin, length - 1, new String[] {})
+ : new FileSplit(fileName, begin - 1, length, new String[] {});
+ }
/**
* Set the number of lines per split
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java?rev=1407507&r1=1407506&r2=1407507&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java Fri Nov 9 15:54:47 2012
@@ -50,37 +50,40 @@ public class TestNLineInputFormat extend
Job job = Job.getInstance(conf);
Path file = new Path(workDir, "test.txt");
- int seed = new Random().nextInt();
- Random random = new Random(seed);
-
localFs.delete(workDir, true);
FileInputFormat.setInputPaths(job, workDir);
int numLinesPerMap = 5;
NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap);
- // for a variety of lengths
for (int length = 0; length < MAX_LENGTH;
- length += random.nextInt(MAX_LENGTH / 10) + 1) {
+ length += 1) {
+
// create a file with length entries
Writer writer = new OutputStreamWriter(localFs.create(file));
try {
for (int i = 0; i < length; i++) {
- writer.write(Integer.toString(i));
+ writer.write(Integer.toString(i)+" some more text");
writer.write("\n");
}
} finally {
writer.close();
}
- checkFormat(job, numLinesPerMap);
+ int lastN = 0;
+ if (length != 0) {
+ lastN = length % 5;
+ if (lastN == 0) {
+ lastN = 5;
+ }
+ }
+ checkFormat(job, numLinesPerMap, lastN);
}
}
- void checkFormat(Job job, int expectedN)
+ void checkFormat(Job job, int expectedN, int lastN)
throws IOException, InterruptedException {
NLineInputFormat format = new NLineInputFormat();
List<InputSplit> splits = format.getSplits(job);
- // check all splits except last one
int count = 0;
- for (int i = 0; i < splits.size() -1; i++) {
+ for (int i = 0; i < splits.size(); i++) {
assertEquals("There are no split locations", 0,
splits.get(i).getLocations().length);
TaskAttemptContext context = MapReduceTestUtil.
@@ -104,8 +107,13 @@ public class TestNLineInputFormat extend
} finally {
reader.close();
}
- assertEquals("number of lines in split is " + expectedN ,
- expectedN, count);
+ if ( i == splits.size() - 1) {
+ assertEquals("number of lines in split(" + i + ") is wrong" ,
+ lastN, count);
+ } else {
+ assertEquals("number of lines in split(" + i + ") is wrong" ,
+ expectedN, count);
+ }
}
}