You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hadoop.apache.org by Jesse Jaggars <jh...@gmail.com> on 2013/08/09 18:28:43 UTC
missing data when reading compressed files with my custom LineReader
I've built a custom LineReader-like class to back a new InputFormat. I
wrote the new LineReader class to handle escaped whitespace that the
util.LineReader doesn't handle. The data I'm reading might have lines that
look something like this:
foo\tbar\tbaz\\tmore\\ndata with escaped stuff\n
I'd like to eventually break the above line into the following:
['foo', 'bar', 'baz more data with escaped stuff']
My class 'EscapedLineReader' replaces escaped whitespace with a single
space to facilitate this, and it works on uncompressed files just fine. The
problem I'm having is with compressed files. It seems that for a given
buffer size in my EscapedLineReader class I end up with lines that have
bits in the middle missing in the final output. I noticed that if I
increase the buffer to 128k GZip files seem to work, but BZip2 and LZO
files still have errors (albeit in different locations than with the
default 4K buffer size). I have the feeling I've just missed a step in
getting compression to work right, but I can't see it.
Here is the (hopefully) relevant big from my RecordReader (notice it was
essentially cargo culted from the LineRecordReader):
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
codec = compressionCodecs.getCodec(file);
LOG.info("Buffer size = " + job.getInt("io.file.buffer.size", -1));
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new EscapedLineReader(cIn, job);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
in = new EscapedLineReader(codec.createInputStream(fileIn,
decompressor), job);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new EscapedLineReader(fileIn, job);
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text());
}
this.pos = start;
}
And here is my entire EscapedRecordReader (lots of copy/paste here too):
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.io.InputStream;
public class EscapedLineReader {
private InputStream in;
private byte[] buffer;
private int bufferPosition = 0;
private int bufferLength = 0;
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private static final byte ESCAPE = '\\';
private static final byte CR = '\r';
private static final byte LF = '\n';
private static final byte TAB = '\t';
private static final byte [] SPACE = {' '};
private boolean inEscape = false;
public EscapedLineReader(InputStream in) {
this(in, DEFAULT_BUFFER_SIZE);
}
public EscapedLineReader(InputStream in, int bufferSize) {
this.in = in;
buffer = new byte[bufferSize];
}
public EscapedLineReader(InputStream in, Configuration conf) {
this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
}
public void close() throws IOException {
in.close();
}
int writeToText(Text str, int startPosition) {
int appendLength = bufferPosition - startPosition;
str.append(buffer, startPosition, appendLength);
return appendLength;
}
int writeSpaceToText(Text str) {
str.append(SPACE, 0, 1);
return 1;
}
public int readLine(Text str) throws IOException {
str.clear();
int startPosition = 0;
long bytesConsumed = 0;
boolean foundEOL = false;
do {
if (bufferPosition >= bufferLength) {
if (startPosition > 0) {
bytesConsumed += writeToText(str, startPosition);
}
bufferLength = in.read(buffer);
bufferPosition = 0;
if (bufferLength <= 0) {
break; // EOF
}
}
startPosition = bufferPosition;
for (; bufferPosition < bufferLength; ++bufferPosition) {
if (foundEOL) {
break;
}
byte b = buffer[bufferPosition];
if (inEscape) {
inEscape = false;
if (b == LF || b == TAB || b == CR) {
startPosition = bufferPosition + 1;
bytesConsumed = writeSpaceToText(str);
}
} else {
switch (buffer[bufferPosition]) {
// Write out what we've scanned so far, skip the
byte
// and start over
case ESCAPE:
inEscape = true;
bytesConsumed += writeToText(str,
startPosition);
startPosition = bufferPosition + 1;
break;
// we are done
case LF:
bytesConsumed += writeToText(str,
startPosition);
foundEOL = true;
break;
case CR:
bytesConsumed += writeToText(str,
startPosition);
startPosition = bufferPosition + 1;
break;
default:
break;
}
}
}
} while (!foundEOL);
return (int) bytesConsumed;
}
}