You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/12/10 00:42:37 UTC
svn commit: r1549710 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/...
Author: jlowe
Date: Mon Dec 9 23:42:36 2013
New Revision: 1549710
URL: http://svn.apache.org/r1549710
Log:
svn merge -c 1549705 FIXES: MAPREDUCE-5656. bzip2 codec can drop records when reading data in splits. Contributed by Jason Lowe
Added:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java
- copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/SplitLineReader.java
- copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/SplitLineReader.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
- copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
- copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCR.txt.bz2
- copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCR.txt.bz2
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCRThenLF.txt.bz2
- copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCRThenLF.txt.bz2
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1549710&r1=1549709&r2=1549710&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon Dec 9 23:42:36 2013
@@ -94,6 +94,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5632. TestRMContainerAllocator#testUpdatedNodes fails (jeagles)
+ MAPREDUCE-5656. bzip2 codec can drop records when reading data in splits
+ (jlowe)
+
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=1549710&r1=1549709&r2=1549710&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java Mon Dec 9 23:42:36 2013
@@ -36,6 +36,8 @@ import org.apache.hadoop.io.compress.Com
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader;
+import org.apache.hadoop.mapreduce.lib.input.SplitLineReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
@@ -52,7 +54,7 @@ public class LineRecordReader implements
private long start;
private long pos;
private long end;
- private LineReader in;
+ private SplitLineReader in;
private FSDataInputStream fileIn;
private final Seekable filePosition;
int maxLineLength;
@@ -111,17 +113,18 @@ public class LineRecordReader implements
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
- in = new LineReader(cIn, job, recordDelimiter);
+ in = new CompressedSplitLineReader(cIn, job, recordDelimiter);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream
} else {
- in = new LineReader(codec.createInputStream(fileIn, decompressor), job, recordDelimiter);
+ in = new SplitLineReader(codec.createInputStream(fileIn,
+ decompressor), job, recordDelimiter);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
- in = new LineReader(fileIn, job, recordDelimiter);
+ in = new SplitLineReader(fileIn, job, recordDelimiter);
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
@@ -141,7 +144,7 @@ public class LineRecordReader implements
public LineRecordReader(InputStream in, long offset, long endOffset,
int maxLineLength, byte[] recordDelimiter) {
this.maxLineLength = maxLineLength;
- this.in = new LineReader(in, recordDelimiter);
+ this.in = new SplitLineReader(in, recordDelimiter);
this.start = offset;
this.pos = offset;
this.end = endOffset;
@@ -159,7 +162,7 @@ public class LineRecordReader implements
throws IOException{
this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
- this.in = new LineReader(in, job, recordDelimiter);
+ this.in = new SplitLineReader(in, job, recordDelimiter);
this.start = offset;
this.pos = offset;
this.end = endOffset;
@@ -200,7 +203,7 @@ public class LineRecordReader implements
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
- while (getFilePosition() <= end) {
+ while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
key.set(pos);
int newSize = in.readLine(value, maxLineLength,
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=1549710&r1=1549709&r2=1549710&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Mon Dec 9 23:42:36 2013
@@ -38,7 +38,6 @@ import org.apache.hadoop.io.compress.Dec
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.LineReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
@@ -55,7 +54,7 @@ public class LineRecordReader extends Re
private long start;
private long pos;
private long end;
- private LineReader in;
+ private SplitLineReader in;
private FSDataInputStream fileIn;
private Seekable filePosition;
private int maxLineLength;
@@ -94,33 +93,19 @@ public class LineRecordReader extends Re
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
- if (null == this.recordDelimiterBytes){
- in = new LineReader(cIn, job);
- } else {
- in = new LineReader(cIn, job, this.recordDelimiterBytes);
- }
-
+ in = new CompressedSplitLineReader(cIn, job,
+ this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
- if (null == this.recordDelimiterBytes) {
- in = new LineReader(codec.createInputStream(fileIn, decompressor),
- job);
- } else {
- in = new LineReader(codec.createInputStream(fileIn,
- decompressor), job, this.recordDelimiterBytes);
- }
+ in = new SplitLineReader(codec.createInputStream(fileIn,
+ decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
- if (null == this.recordDelimiterBytes){
- in = new LineReader(fileIn, job);
- } else {
- in = new LineReader(fileIn, job, this.recordDelimiterBytes);
- }
-
+ in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
@@ -160,7 +145,7 @@ public class LineRecordReader extends Re
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
- while (getFilePosition() <= end) {
+ while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
newSize = in.readLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength));
pos += newSize;