You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hadoop.apache.org by Jesse Jaggars <jh...@gmail.com> on 2013/08/09 18:28:43 UTC

missing data when reading compressed files with my custom LineReader

I've built a custom LineReader-like class to back a new InputFormat. I
wrote the new LineReader class to handle escaped whitespace that the
util.LineReader doesn't handle. The data I'm reading might have lines that
look something like this:

foo\tbar\tbaz\\tmore\\ndata with escaped stuff\n

I'd like to eventually break the above line into the following:

['foo', 'bar', 'baz more data with escaped stuff']

My class 'EscapedLineReader' replaces escaped whitespace with a single
space to facilitate this, and it works on uncompressed files just fine. The
problem I'm having is with compressed files. It seems that for a given
buffer size in my EscapedLineReader class I end up with lines that have
bits in the middle missing in the final output. I noticed that if I
increase the buffer to 128k GZip files seem to work, but BZip2 and LZO
files still have errors (albeit in different locations than with the
default 4K buffer size). I have the feeling I've just missed a step in
getting compression to work right, but I can't see it.

Here is the (hopefully) relevant big from my RecordReader (notice it was
essentially cargo culted from the LineRecordReader):

public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                    Integer.MAX_VALUE);
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();
    compressionCodecs = new CompressionCodecFactory(job);
    codec = compressionCodecs.getCodec(file);

    LOG.info("Buffer size = " + job.getInt("io.file.buffer.size", -1));

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());

    if (isCompressedInput()) {
      decompressor = CodecPool.getDecompressor(codec);
      if (codec instanceof SplittableCompressionCodec) {
        final SplitCompressionInputStream cIn =
          ((SplittableCompressionCodec)codec).createInputStream(
            fileIn, decompressor, start, end,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
        in = new EscapedLineReader(cIn, job);
        start = cIn.getAdjustedStart();
        end = cIn.getAdjustedEnd();
        filePosition = cIn;
      } else {
        in = new EscapedLineReader(codec.createInputStream(fileIn,
decompressor), job);
        filePosition = fileIn;
      }
    } else {
      fileIn.seek(start);
      in = new EscapedLineReader(fileIn, job);
      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text());
    }
    this.pos = start;
  }

And here is my entire EscapedRecordReader (lots of copy/paste here too):

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.io.InputStream;

public class EscapedLineReader {

    private InputStream in;
    private byte[] buffer;
    private int bufferPosition = 0;
    private int bufferLength = 0;
    private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
    private static final byte ESCAPE = '\\';
    private static final byte CR = '\r';
    private static final byte LF = '\n';
    private static final byte TAB = '\t';
    private static final byte [] SPACE = {' '};
    private boolean inEscape = false;

    public EscapedLineReader(InputStream in) {
       this(in, DEFAULT_BUFFER_SIZE);
    }

    public EscapedLineReader(InputStream in, int bufferSize) {
        this.in = in;
        buffer = new byte[bufferSize];
    }

    public EscapedLineReader(InputStream in, Configuration conf) {
        this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
    }

    public void close() throws IOException {
        in.close();
    }

    int writeToText(Text str, int startPosition) {
        int appendLength = bufferPosition - startPosition;
        str.append(buffer, startPosition, appendLength);
        return appendLength;
    }

    int writeSpaceToText(Text str) {
        str.append(SPACE, 0, 1);
        return 1;
    }

    public int readLine(Text str) throws IOException {
        str.clear();
        int startPosition = 0;
        long bytesConsumed = 0;
        boolean foundEOL = false;

        do {

            if (bufferPosition >= bufferLength) {

                if (startPosition > 0) {
                    bytesConsumed += writeToText(str, startPosition);
                }

                bufferLength = in.read(buffer);
                bufferPosition = 0;
                if (bufferLength <= 0) {
                    break; // EOF
                }
            }

            startPosition = bufferPosition;

            for (; bufferPosition < bufferLength; ++bufferPosition) {

                if (foundEOL) {
                    break;
                }

                byte b = buffer[bufferPosition];

                if (inEscape) {
                    inEscape = false;
                    if (b == LF || b == TAB || b == CR) {
                        startPosition = bufferPosition + 1;
                        bytesConsumed = writeSpaceToText(str);
                    }
                } else {
                    switch (buffer[bufferPosition]) {
                        // Write out what we've scanned so far, skip the
byte
                        // and start over
                        case ESCAPE:
                            inEscape = true;
                            bytesConsumed += writeToText(str,
startPosition);
                            startPosition = bufferPosition + 1;
                            break;
                        // we are done
                        case LF:
                            bytesConsumed += writeToText(str,
startPosition);
                            foundEOL = true;
                            break;
                        case CR:
                            bytesConsumed += writeToText(str,
startPosition);
                            startPosition = bufferPosition + 1;
                            break;
                        default:
                            break;
                    }
                }
            }
        } while (!foundEOL);
        return (int) bytesConsumed;
    }
}