You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2015/09/18 17:04:36 UTC

hadoop git commit: MAPREDUCE-5948. org.apache.hadoop.mapred.LineRecordReader does not handle multibyte record delimiters well. Contributed by Vinayakumar B, Rushabh Shah, and Akira AJISAKA (cherry picked from commit 077250d8d7b4b757543a39a6ce8bb6e3be356c

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 f3fd5e30c -> 850d6a3fd


MAPREDUCE-5948. org.apache.hadoop.mapred.LineRecordReader does not handle multibyte record delimiters well. Contributed by Vinayakumar B, Rushabh Shah, and Akira AJISAKA
(cherry picked from commit 077250d8d7b4b757543a39a6ce8bb6e3be356c6f)

Conflicts:

	hadoop-mapreduce-project/CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/850d6a3f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/850d6a3f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/850d6a3f

Branch: refs/heads/branch-2.7
Commit: 850d6a3fd32364e520de8e7b1f929b27ea3f3705
Parents: f3fd5e3
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Sep 18 15:04:02 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Sep 18 15:04:02 2015 +0000

----------------------------------------------------------------------
 .../java/org/apache/hadoop/util/LineReader.java |   8 ++
 hadoop-mapreduce-project/CHANGES.txt            |   4 +
 .../apache/hadoop/mapred/LineRecordReader.java  |   4 +-
 .../mapreduce/lib/input/LineRecordReader.java   |   3 +-
 .../lib/input/UncompressedSplitLineReader.java  | 125 +++++++++++++++++++
 .../hadoop/mapred/TestLineRecordReader.java     |  77 +++++++++++-
 .../lib/input/TestLineRecordReader.java         |  79 ++++++++++--
 7 files changed, 286 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/850d6a3f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
index 3188cb5..1d1b569 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
@@ -369,4 +369,12 @@ public class LineReader implements Closeable {
   public int readLine(Text str) throws IOException {
     return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
   }
+
+  protected int getBufferPosn() {
+    return bufferPosn;
+  }
+
+  protected int getBufferSize() {
+    return bufferSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/850d6a3f/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index fcc8ba9..55f4f0a 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -30,6 +30,10 @@ Release 2.7.2 - UNRELEASED
     MAPREDUCE-6472. MapReduce AM should have java.io.tmpdir=./tmp to be
     consistent with tasks (Naganarasimha G R via jlowe)
 
+    MAPREDUCE-5948. org.apache.hadoop.mapred.LineRecordReader does not handle
+    multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira
+    AJISAKA via jlowe)
+
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/850d6a3f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
index ba075e5..824c075 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.io.compress.SplitCompressionInputStream;
 import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader;
 import org.apache.hadoop.mapreduce.lib.input.SplitLineReader;
+import org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 
@@ -124,7 +125,8 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
       }
     } else {
       fileIn.seek(start);
-      in = new SplitLineReader(fileIn, job, recordDelimiter);
+      in = new UncompressedSplitLineReader(
+          fileIn, job, recordDelimiter, split.getLength());
       filePosition = fileIn;
     }
     // If this is not the first split, we always throw away first record

http://git-wip-us.apache.org/repos/asf/hadoop/blob/850d6a3f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
index 42e94ad..74dda6e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
@@ -105,7 +105,8 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
       }
     } else {
       fileIn.seek(start);
-      in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
+      in = new UncompressedSplitLineReader(
+          fileIn, job, this.recordDelimiterBytes, split.getLength());
       filePosition = fileIn;
     }
     // If this is not the first split, we always throw away first record

http://git-wip-us.apache.org/repos/asf/hadoop/blob/850d6a3f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
new file mode 100644
index 0000000..52fb7b0
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+
+/**
+ * SplitLineReader for uncompressed files.
+ * This class can split the file correctly even if the delimiter is multi-bytes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class UncompressedSplitLineReader extends SplitLineReader {
+  private boolean needAdditionalRecord = false;
+  private long splitLength;
+  /** Total bytes read from the input stream. */
+  private long totalBytesRead = 0;
+  private boolean finished = false;
+  private boolean usingCRLF;
+  private int unusedBytes = 0;
+  private int lastBytesRead = 0;
+
+  public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf,
+      byte[] recordDelimiterBytes, long splitLength) throws IOException {
+    super(in, conf, recordDelimiterBytes);
+    this.splitLength = splitLength;
+    usingCRLF = (recordDelimiterBytes == null);
+  }
+
+  @Override
+  protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+      throws IOException {
+    int maxBytesToRead = buffer.length;
+    if (totalBytesRead < splitLength) {
+      maxBytesToRead = Math.min(maxBytesToRead,
+                                (int)(splitLength - totalBytesRead));
+    }
+    int bytesRead = in.read(buffer, 0, maxBytesToRead);
+    lastBytesRead = bytesRead;
+
+    // If the split ended in the middle of a record delimiter then we need
+    // to read one additional record, as the consumer of the next split will
+    // not recognize the partial delimiter as a record.
+    // However if using the default delimiter and the next character is a
+    // linefeed then next split will treat it as a delimiter all by itself
+    // and the additional record read should not be performed.
+    if (totalBytesRead == splitLength && inDelimiter && bytesRead > 0) {
+      if (usingCRLF) {
+        needAdditionalRecord = (buffer[0] != '\n');
+      } else {
+        needAdditionalRecord = true;
+      }
+    }
+    if (bytesRead > 0) {
+      totalBytesRead += bytesRead;
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+    long bytesRead = 0;
+    if (!finished) {
+      // only allow at most one more record to be read after the stream
+      // reports the split ended
+      if (totalBytesRead > splitLength) {
+        finished = true;
+      }
+      bytesRead = totalBytesRead;
+      int bytesConsumed = super.readLine(str, maxLineLength, maxBytesToConsume);
+      bytesRead = totalBytesRead - bytesRead;
+
+      // No records left.
+      if (bytesConsumed == 0 && bytesRead == 0) {
+        return 0;
+      }
+
+      int bufferSize = getBufferSize();
+
+      // Add the remaining buffer size not used for the last call
+      // of fillBuffer method.
+      if (lastBytesRead <= 0) {
+        bytesRead += bufferSize;
+      } else if (bytesRead > 0) {
+        bytesRead += bufferSize - lastBytesRead;
+      }
+
+      // Adjust the size of the buffer not used for this record.
+      // The size is carried over for the next calculation.
+      bytesRead += unusedBytes;
+      unusedBytes = bufferSize - getBufferPosn();
+      bytesRead -= unusedBytes;
+    }
+    return (int) bytesRead;
+  }
+
+  @Override
+  public boolean needAdditionalRecordAfterSplit() {
+    return !finished && needAdditionalRecord;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/850d6a3f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
index 4c94e59..ffba2d9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
@@ -25,13 +25,17 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.io.Charsets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -41,6 +45,9 @@ import org.apache.hadoop.io.compress.Decompressor;
 import org.junit.Test;
 
 public class TestLineRecordReader {
+  private static Path workDir = new Path(new Path(System.getProperty(
+      "test.build.data", "target"), "data"), "TestTextInputFormat");
+  private static Path inputDir = new Path(workDir, "input");
 
   private void testSplitRecords(String testFileName, long firstSplitLength)
       throws IOException {
@@ -50,15 +57,27 @@ public class TestLineRecordReader {
     long testFileSize = testFile.length();
     Path testFilePath = new Path(testFile.getAbsolutePath());
     Configuration conf = new Configuration();
+    testSplitRecordsForFile(conf, firstSplitLength, testFileSize, testFilePath);
+  }
+
+  private void testSplitRecordsForFile(Configuration conf,
+      long firstSplitLength, long testFileSize, Path testFilePath)
+      throws IOException {
     conf.setInt(org.apache.hadoop.mapreduce.lib.input.
         LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
-    assertTrue("unexpected test data at " + testFile,
+    assertTrue("unexpected test data at " + testFilePath,
         testFileSize > firstSplitLength);
 
+    String delimiter = conf.get("textinputformat.record.delimiter");
+    byte[] recordDelimiterBytes = null;
+    if (null != delimiter) {
+      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+    }
     // read the data without splitting to count the records
     FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
         (String[])null);
-    LineRecordReader reader = new LineRecordReader(conf, split);
+    LineRecordReader reader = new LineRecordReader(conf, split,
+        recordDelimiterBytes);
     LongWritable key = new LongWritable();
     Text value = new Text();
     int numRecordsNoSplits = 0;
@@ -69,7 +88,7 @@ public class TestLineRecordReader {
 
     // count the records in the first split
     split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
-    reader = new LineRecordReader(conf, split);
+    reader = new LineRecordReader(conf, split, recordDelimiterBytes);
     int numRecordsFirstSplit = 0;
     while (reader.next(key,  value)) {
       ++numRecordsFirstSplit;
@@ -79,14 +98,14 @@ public class TestLineRecordReader {
     // count the records in the second split
     split = new FileSplit(testFilePath, firstSplitLength,
         testFileSize - firstSplitLength, (String[])null);
-    reader = new LineRecordReader(conf, split);
+    reader = new LineRecordReader(conf, split, recordDelimiterBytes);
     int numRecordsRemainingSplits = 0;
     while (reader.next(key,  value)) {
       ++numRecordsRemainingSplits;
     }
     reader.close();
 
-    assertEquals("Unexpected number of records in bzip2 compressed split",
+    assertEquals("Unexpected number of records in split",
         numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
   }
 
@@ -283,4 +302,52 @@ public class TestLineRecordReader {
     }
     assertEquals(10, decompressors.size());
   }
+
+  /**
+   * Writes the input test file
+   *
+   * @param conf
+   * @return Path of the file created
+   * @throws IOException
+   */
+  private Path createInputFile(Configuration conf, String data)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path file = new Path(inputDir, "test.txt");
+    Writer writer = new OutputStreamWriter(localFs.create(file));
+    try {
+      writer.write(data);
+    } finally {
+      writer.close();
+    }
+    return file;
+  }
+
+  @Test
+  public void testUncompressedInput() throws Exception {
+    Configuration conf = new Configuration();
+    String inputData = "abc+++def+++ghi+++"
+        + "jkl+++mno+++pqr+++stu+++vw +++xyz";
+    Path inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "+++");
+    for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+      }
+    }
+  }
+
+  @Test
+  public void testUncompressedInputContainingCRLF() throws Exception {
+    Configuration conf = new Configuration();
+    String inputData = "a\r\nb\rc\nd\r\n";
+    Path inputFile = createInputFile(conf, inputData);
+    for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/850d6a3f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
index 52fdc09..6c86739 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
@@ -25,13 +25,17 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.io.Charsets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CodecPool;
@@ -42,6 +46,9 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.junit.Test;
 
 public class TestLineRecordReader {
+  private static Path workDir = new Path(new Path(System.getProperty(
+      "test.build.data", "target"), "data"), "TestTextInputFormat");
+  private static Path inputDir = new Path(workDir, "input");
 
   private void testSplitRecords(String testFileName, long firstSplitLength)
       throws IOException {
@@ -51,17 +58,28 @@ public class TestLineRecordReader {
     long testFileSize = testFile.length();
     Path testFilePath = new Path(testFile.getAbsolutePath());
     Configuration conf = new Configuration();
+    testSplitRecordsForFile(conf, firstSplitLength, testFileSize, testFilePath);
+  }
+
+  private void testSplitRecordsForFile(Configuration conf,
+      long firstSplitLength, long testFileSize, Path testFilePath)
+      throws IOException {
     conf.setInt(org.apache.hadoop.mapreduce.lib.input.
         LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
-    assertTrue("unexpected test data at " + testFile,
+    assertTrue("unexpected test data at " + testFilePath,
         testFileSize > firstSplitLength);
 
+    String delimiter = conf.get("textinputformat.record.delimiter");
+    byte[] recordDelimiterBytes = null;
+    if (null != delimiter) {
+      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+    }
     TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
 
     // read the data without splitting to count the records
     FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
         (String[])null);
-    LineRecordReader reader = new LineRecordReader();
+    LineRecordReader reader = new LineRecordReader(recordDelimiterBytes);
     reader.initialize(split, context);
     int numRecordsNoSplits = 0;
     while (reader.nextKeyValue()) {
@@ -71,7 +89,7 @@ public class TestLineRecordReader {
 
     // count the records in the first split
     split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
-    reader = new LineRecordReader();
+    reader = new LineRecordReader(recordDelimiterBytes);
     reader.initialize(split, context);
     int numRecordsFirstSplit = 0;
     while (reader.nextKeyValue()) {
@@ -82,16 +100,15 @@ public class TestLineRecordReader {
     // count the records in the second split
     split = new FileSplit(testFilePath, firstSplitLength,
         testFileSize - firstSplitLength, (String[])null);
-    reader = new LineRecordReader();
+    reader = new LineRecordReader(recordDelimiterBytes);
     reader.initialize(split, context);
     int numRecordsRemainingSplits = 0;
     while (reader.nextKeyValue()) {
       ++numRecordsRemainingSplits;
     }
     reader.close();
-
-    assertEquals("Unexpected number of records in bzip2 compressed split",
-        numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
+    assertEquals("Unexpected number of records in split ", numRecordsNoSplits,
+        numRecordsFirstSplit + numRecordsRemainingSplits);
   }
 
   @Test
@@ -269,4 +286,52 @@ public class TestLineRecordReader {
     }
     assertEquals(10, decompressors.size());
   }
+
+  /**
+   * Writes the input test file
+   *
+   * @param conf
+   * @return Path of the file created
+   * @throws IOException
+   */
+  private Path createInputFile(Configuration conf, String data)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path file = new Path(inputDir, "test.txt");
+    Writer writer = new OutputStreamWriter(localFs.create(file));
+    try {
+      writer.write(data);
+    } finally {
+      writer.close();
+    }
+    return file;
+  }
+
+  @Test
+  public void testUncompressedInput() throws Exception {
+    Configuration conf = new Configuration();
+    String inputData = "abc+++def+++ghi+++"
+        + "jkl+++mno+++pqr+++stu+++vw +++xyz";
+    Path inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "+++");
+    for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+      }
+    }
+  }
+
+  @Test
+  public void testUncompressedInputContainingCRLF() throws Exception {
+    Configuration conf = new Configuration();
+    String inputData = "a\r\nb\rc\nd\r\n";
+    Path inputFile = createInputFile(conf, inputData);
+    for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+      }
+    }
+  }
 }