You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by pr...@apache.org on 2022/07/06 04:30:34 UTC

[hadoop] branch trunk updated: HADOOP-18321.Fix when to read an additional record from a BZip2 text file split (#4521)

This is an automated email from the ASF dual-hosted git repository.

prabhujoseph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a432925f74b HADOOP-18321.Fix when to read an additional record from a BZip2 text file split (#4521)
a432925f74b is described below

commit a432925f74b93d05b4dfdd1831bfbabbf4466a80
Author: Ashutosh Gupta <as...@st.niituniversity.in>
AuthorDate: Wed Jul 6 05:30:14 2022 +0100

    HADOOP-18321.Fix when to read an additional record from a BZip2 text file split (#4521)
    
    * HADOOP-18321.Fix when to read an additional record from a BZip2 text file split
    
    Co-authored-by: Ashutosh Gupta <as...@amazon.com> and Reviewed by Akira Ajisaka.
---
 .../org/apache/hadoop/io/compress/BZip2Codec.java  |  13 +-
 .../io/compress/bzip2/CBZip2InputStream.java       |  16 +-
 .../io/compress/bzip2/CBZip2OutputStream.java      |   9 +-
 .../io/compress/bzip2/BZip2TextFileWriter.java     | 114 +++++++
 .../hadoop/io/compress/bzip2/BZip2Utils.java       |  67 ++++
 .../io/compress/bzip2/TestBZip2TextFileWriter.java |  91 +++++
 .../lib/input/CompressedSplitLineReader.java       |  12 +-
 .../hadoop/mapred/LineRecordReaderHelper.java      |  54 +++
 .../hadoop/mapred/TestLineRecordReaderBZip2.java   |  31 ++
 .../lib/input/BaseLineRecordReaderHelper.java      |  58 ++++
 .../lib/input/BaseTestLineRecordReaderBZip2.java   | 375 +++++++++++++++++++++
 .../lib/input/LineRecordReaderHelper.java          |  62 ++++
 .../lib/input/TestLineRecordReaderBZip2.java       |  29 ++
 13 files changed, 921 insertions(+), 10 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
index 7640f7ed7a6..1564ae90855 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 
@@ -255,10 +256,7 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
 
     private void writeStreamHeader() throws IOException {
       if (super.out != null) {
-        // The compressed bzip2 stream should start with the
-        // identifying characters BZ. Caller of CBZip2OutputStream
-        // i.e. this class must write these characters.
-        out.write(HEADER.getBytes(StandardCharsets.UTF_8));
+        writeHeader(out);
       }
     }
 
@@ -547,4 +545,11 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
 
   }// end of BZip2CompressionInputStream
 
+  @VisibleForTesting
+  public static void writeHeader(OutputStream out) throws IOException {
+    // The compressed bzip2 stream should start with the
+    // identifying characters BZ. Caller of CBZip2OutputStream
+    // i.e. this class must write these characters.
+    out.write(HEADER.getBytes(StandardCharsets.UTF_8));
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
index 187fe481588..61e88d80d8c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
@@ -27,6 +27,7 @@ import java.io.BufferedInputStream;
 import java.io.InputStream;
 import java.io.IOException;
 
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE;
 
 
@@ -312,13 +313,24 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
   }
     } else if (readMode == READ_MODE.BYBLOCK) {
       this.currentState = STATE.NO_PROCESS_STATE;
-      skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH);
+      skipResult = skipToNextBlockMarker();
       if(!skipDecompression){
         changeStateToProcessABlock();
       }
     }
   }
 
+  /**
+   * Skips bytes in the stream until the start marker of a block is reached
+   * or end of stream is reached. Used for testing purposes to identify the
+   * start offsets of blocks.
+   */
+  @VisibleForTesting
+  boolean skipToNextBlockMarker() throws IOException {
+    return skipToNextMarker(
+        CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH);
+  }
+
   /**
    * Returns the number of bytes between the current stream position
    * and the immediate next BZip2 block marker.
@@ -428,7 +440,7 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
       //report 'end of block' or 'end of stream'
       result = b;
 
-      skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH);
+      skipResult = skipToNextBlockMarker();
 
       changeStateToProcessABlock();
     }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
index 39c3638b0f4..50bdddb8136 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
@@ -27,6 +27,7 @@ package org.apache.hadoop.io.compress.bzip2;
 import java.io.OutputStream;
 import java.io.IOException;
 
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.io.IOUtils;
 
 /**
@@ -781,8 +782,7 @@ public class CBZip2OutputStream extends OutputStream implements BZip2Constants {
       inUse[i] = false;
     }
 
-    /* 20 is just a paranoia constant */
-    this.allowableBlockSize = (this.blockSize100k * BZip2Constants.baseBlockSize) - 20;
+    this.allowableBlockSize = getAllowableBlockSize(this.blockSize100k);
   }
 
   private void endBlock() throws IOException {
@@ -2093,4 +2093,9 @@ public class CBZip2OutputStream extends OutputStream implements BZip2Constants {
 
   }
 
+  @VisibleForTesting
+  static int getAllowableBlockSize(int blockSize100k) {
+    /* 20 is just a paranoia constant */
+    return (blockSize100k * BZip2Constants.baseBlockSize) - 20;
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2TextFileWriter.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2TextFileWriter.java
new file mode 100644
index 00000000000..e40f631b603
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2TextFileWriter.java
@@ -0,0 +1,114 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.io.compress.bzip2;
+
+import static org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream.MIN_BLOCKSIZE;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.BZip2Codec;
+
+/**
+ * A writer that simplifies creating BZip2 compressed text data for testing
+ * purposes.
+ */
+public final class BZip2TextFileWriter implements Closeable {
+
+  // Use minimum block size to reduce amount of data to require to be written
+  // to CBZip2OutputStream before a new block is created.
+  private static final int BLOCK_SIZE_100K = MIN_BLOCKSIZE;
+
+  /**
+   * The amount of bytes of run-length encoded data that needs to be written
+   * to this writer in order for the next byte written starts a new BZip2 block.
+   */
+  public static final int BLOCK_SIZE =
+      // The + 1 is needed because of how CBZip2OutputStream checks whether the
+      // last offset written is less than allowable block size. Because the last
+      // offset is one less of the amount of bytes written to the block, we need
+      // to write an extra byte to trigger writing a new block.
+      CBZip2OutputStream.getAllowableBlockSize(BLOCK_SIZE_100K) + 1;
+
+  private final CBZip2OutputStream out;
+
+  public BZip2TextFileWriter(Path path, Configuration conf) throws IOException {
+    this(path.getFileSystem(conf).create(path));
+  }
+
+  public BZip2TextFileWriter(OutputStream rawOut) throws IOException {
+    try {
+      BZip2Codec.writeHeader(rawOut);
+      out = new CBZip2OutputStream(rawOut, BLOCK_SIZE_100K);
+    } catch (Throwable e) {
+      rawOut.close();
+      throw e;
+    }
+  }
+
+  public void writeManyRecords(int totalSize, int numRecords, byte[] delimiter)
+      throws IOException {
+    checkArgument(numRecords > 0);
+    checkArgument(delimiter.length > 0);
+
+    int minRecordSize = totalSize / numRecords;
+    checkArgument(minRecordSize >= delimiter.length);
+
+    int lastRecordExtraSize = totalSize % numRecords;
+
+    for (int i = 0; i < numRecords - 1; i++) {
+      writeRecord(minRecordSize, delimiter);
+    }
+    writeRecord(minRecordSize + lastRecordExtraSize, delimiter);
+  }
+
+  public void writeRecord(int totalSize, byte[] delimiter) throws IOException {
+    checkArgument(delimiter.length > 0);
+    checkArgument(totalSize >= delimiter.length);
+
+    int contentSize = totalSize - delimiter.length;
+    for (int i = 0; i < contentSize; i++) {
+      // Alternate between characters so that internals of CBZip2OutputStream
+      // cannot condensed the written bytes using run-length encoding. This
+      // allows the caller to use #BLOCK_SIZE in order to know whether the next
+      // write will end just before the end of the current block, or exceed it,
+      // and by how much.
+      out.write(i % 2 == 0 ? 'a' : 'b');
+    }
+    write(delimiter);
+  }
+
+  public void write(String bytes) throws IOException {
+    write(bytes.getBytes(StandardCharsets.UTF_8));
+  }
+
+  public void write(byte[] bytes) throws IOException {
+    out.write(bytes);
+  }
+
+  @Override
+  public void close() throws IOException {
+    out.close();
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2Utils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2Utils.java
new file mode 100644
index 00000000000..717b282797e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2Utils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.bzip2;
+
+import static org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE.BYBLOCK;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public final class BZip2Utils {
+
+  private BZip2Utils() {
+  }
+
+  /**
+   * Returns the start offsets of blocks that follow the first block in the
+   * BZip2 compressed file at the given path. The first offset corresponds to
+   * the first byte containing the BZip2 block marker of the second block. The
+   * i-th offset corresponds to the block marker of the (i + 1)-th block.
+   */
+  public static List<Long> getNextBlockMarkerOffsets(
+      Path path, Configuration conf) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    try (InputStream fileIn = fs.open(path)) {
+      return getNextBlockMarkerOffsets(fileIn);
+    }
+  }
+
+  /**
+   * Returns the start offsets of blocks that follow the first block in the
+   * BZip2 compressed input stream. The first offset corresponds to
+   * the first byte containing the BZip2 block marker of the second block. The
+   * i-th offset corresponds to the block marker of the (i + 1)-th block.
+   */
+  public static List<Long> getNextBlockMarkerOffsets(InputStream rawIn)
+      throws IOException {
+    try (CBZip2InputStream in = new CBZip2InputStream(rawIn, BYBLOCK)) {
+      ArrayList<Long> offsets = new ArrayList<>();
+      while (in.skipToNextBlockMarker()) {
+        offsets.add(in.getProcessedByteCount());
+      }
+      return offsets;
+    }
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBZip2TextFileWriter.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBZip2TextFileWriter.java
new file mode 100644
index 00000000000..9b36f899b6d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBZip2TextFileWriter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress.bzip2;
+
+import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public final class TestBZip2TextFileWriter {
+
+  private static final byte[] DELIMITER = new byte[] {'\0'};
+
+  private ByteArrayOutputStream rawOut;
+  private BZip2TextFileWriter writer;
+
+  @Before
+  public void setUp() throws Exception {
+    rawOut = new ByteArrayOutputStream();
+    writer = new BZip2TextFileWriter(rawOut);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    rawOut = null;
+    writer.close();
+  }
+
+  @Test
+  public void writingSingleBlockSizeOfData() throws Exception {
+    writer.writeRecord(BLOCK_SIZE, DELIMITER);
+    writer.close();
+
+    List<Long> nextBlocks = getNextBlockMarkerOffsets();
+    assertEquals(0, nextBlocks.size());
+  }
+
+  @Test
+  public void justExceedingBeyondBlockSize() throws Exception {
+    writer.writeRecord(BLOCK_SIZE + 1, DELIMITER);
+    writer.close();
+
+    List<Long> nextBlocks = getNextBlockMarkerOffsets();
+    assertEquals(1, nextBlocks.size());
+  }
+
+  @Test
+  public void writingTwoBlockSizesOfData() throws Exception {
+    writer.writeRecord(2 * BLOCK_SIZE, DELIMITER);
+    writer.close();
+
+    List<Long> nextBlocks = getNextBlockMarkerOffsets();
+    assertEquals(1, nextBlocks.size());
+  }
+
+  @Test
+  public void justExceedingBeyondTwoBlocks() throws Exception {
+    writer.writeRecord(2 * BLOCK_SIZE + 1, DELIMITER);
+    writer.close();
+
+    List<Long> nextBlocks = getNextBlockMarkerOffsets();
+    assertEquals(2, nextBlocks.size());
+  }
+
+  private List<Long> getNextBlockMarkerOffsets() throws IOException {
+    ByteArrayInputStream in = new ByteArrayInputStream(rawOut.toByteArray());
+    return BZip2Utils.getNextBlockMarkerOffsets(in);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java
index 9d0e949a10b..74b959cf47c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java
@@ -127,6 +127,8 @@ public class CompressedSplitLineReader extends SplitLineReader {
   @Override
   protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
       throws IOException {
+    boolean alreadyReadAfterSplit = didReadAfterSplit();
+
     int bytesRead = in.read(buffer);
 
     // If the split ended in the middle of a record delimiter then we need
@@ -135,7 +137,9 @@ public class CompressedSplitLineReader extends SplitLineReader {
     // However if using the default delimiter and the next character is a
     // linefeed then next split will treat it as a delimiter all by itself
     // and the additional record read should not be performed.
-    if (inDelimiter && bytesRead > 0) {
+    boolean justReadAfterSplit = !alreadyReadAfterSplit && didReadAfterSplit();
+
+    if (justReadAfterSplit && inDelimiter && bytesRead > 0) {
       if (usingCRLF) {
         needAdditionalRecord = (buffer[0] != '\n');
       } else {
@@ -152,7 +156,7 @@ public class CompressedSplitLineReader extends SplitLineReader {
     if (!finished) {
       // only allow at most one more record to be read after the stream
       // reports the split ended
-      if (scin.getPos() > scin.getAdjustedEnd()) {
+      if (didReadAfterSplit()) {
         finished = true;
       }
 
@@ -170,4 +174,8 @@ public class CompressedSplitLineReader extends SplitLineReader {
   protected void unsetNeedAdditionalRecordAfterSplit() {
     needAdditionalRecord = false;
   }
+
+  private boolean didReadAfterSplit() throws IOException {
+    return scin.getPos() > scin.getAdjustedEnd();
+  }
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/LineRecordReaderHelper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/LineRecordReaderHelper.java
new file mode 100644
index 00000000000..fccc01ad7e6
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/LineRecordReaderHelper.java
@@ -0,0 +1,54 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.BaseLineRecordReaderHelper;
+
+public final class LineRecordReaderHelper extends
+    BaseLineRecordReaderHelper {
+
+  public LineRecordReaderHelper(Path filePath, Configuration conf) {
+    super(filePath, conf);
+  }
+
+  @Override
+  public long countRecords(long start, long length) throws IOException {
+    try (LineRecordReader reader = newReader(start, length)) {
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+
+      long numRecords = 0L;
+      while (reader.next(key, value)) {
+        numRecords++;
+      }
+      return numRecords;
+    }
+  }
+
+  private LineRecordReader newReader(long start, long length)
+      throws IOException {
+    FileSplit split = new FileSplit(getFilePath(), start, length, (String[]) null);
+    return new LineRecordReader(getConf(), split, getRecordDelimiterBytes());
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReaderBZip2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReaderBZip2.java
new file mode 100644
index 00000000000..1588e629c8c
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReaderBZip2.java
@@ -0,0 +1,31 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.BaseLineRecordReaderHelper;
+import org.apache.hadoop.mapreduce.lib.input.BaseTestLineRecordReaderBZip2;
+
+public final class TestLineRecordReaderBZip2 extends
+    BaseTestLineRecordReaderBZip2 {
+
+  @Override
+  protected BaseLineRecordReaderHelper newReader(Path file) {
+    return new LineRecordReaderHelper(file, getConf());
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseLineRecordReaderHelper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseLineRecordReaderHelper.java
new file mode 100644
index 00000000000..d1e21e6e258
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseLineRecordReaderHelper.java
@@ -0,0 +1,58 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public abstract class BaseLineRecordReaderHelper {
+
+  private final Configuration conf;
+  private final Path filePath;
+  private final byte[] recordDelimiterBytes;
+
+
+
+  public BaseLineRecordReaderHelper(Path filePath, Configuration conf) {
+    this.conf = conf;
+    this.filePath = filePath;
+
+    conf.setInt(LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+
+    String delimiter = conf.get("textinputformat.record.delimiter");
+    this.recordDelimiterBytes =
+        null != delimiter ? delimiter.getBytes(StandardCharsets.UTF_8) : null;
+  }
+
+  public abstract long countRecords(long start, long length) throws IOException;
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public Path getFilePath() {
+    return filePath;
+  }
+
+  public byte[] getRecordDelimiterBytes() {
+    return recordDelimiterBytes;
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java
new file mode 100644
index 00000000000..cd400365efa
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java
@@ -0,0 +1,375 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.bzip2.BZip2Utils;
+import org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public abstract class BaseTestLineRecordReaderBZip2 {
+
+  // LF stands for line feed
+  private static final byte[] LF = new byte[] {'\n'};
+  // CR stands for cartridge return
+  private static final byte[] CR = new byte[] {'\r'};
+  private static final byte[] CR_LF = new byte[] {'\r', '\n'};
+
+  private Configuration conf;
+  private FileSystem fs;
+  private Path tempFile;
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  public Path getTempFile() {
+    return tempFile;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+
+    Path workDir = new Path(
+        System.getProperty("test.build.data", "target"),
+        "data/" + getClass().getSimpleName());
+
+    fs = workDir.getFileSystem(conf);
+
+    Path inputDir = new Path(workDir, "input");
+    tempFile = new Path(inputDir, "test.txt.bz2");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fs.delete(tempFile, /* recursive */ false);
+  }
+
+  @Test
+  public void firstBlockEndsWithLF() throws Exception {
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE, 1000, LF);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
+  }
+
+  @Test
+  public void firstBlockEndsWithLFSecondBlockStartsWithLF() throws Exception {
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE, 1000, LF);
+      // Write 254 empty rows terminating at LF, as those records will get
+      // rolled into the first block record due to run-length encoding, the
+      // 255th LF character will trigger a run to be written to the block. We
+      // only need 254 LF characters since the last byte written by prior
+      // writeManyRecords call is already a LF.
+      writer.writeManyRecords(254, 254, LF);
+
+      // This LF character should be the first byte of the second block, but
+      // if splitting at blocks, the first split will read this record as the
+      // additional record.
+      writer.writeRecord(1, LF);
+
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1255, 2});
+  }
+
+  @Test
+  public void firstBlockEndsWithLFSecondBlockStartsWithCR() throws Exception {
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE, 1000, LF);
+      writer.writeRecord(1, CR);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
+  }
+
+  @Test
+  public void firstBlockEndsWithCRLF() throws Exception {
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE, 1000, CR_LF);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
+  }
+
+  @Test
+  public void lastRecordContentSpanAcrossBlocks()
+      throws Exception {
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
+      writer.writeRecord(100, LF);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+  }
+
+  @Test
+  public void lastRecordOfBlockHasItsLFInNextBlock() throws Exception {
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
+      // The LF character is the first byte of the second block
+      writer.writeRecord(51, LF);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+  }
+
+  @Test
+  public void lastRecordOfFirstBlockHasItsCRLFInSecondBlock() throws Exception {
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
+      // Both CR + LF characters are the first two bytes of second block
+      writer.writeRecord(52, CR_LF);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+  }
+
+  @Test
+  public void lastRecordOfFirstBlockHasItsCRLFPartlyInSecondBlock()
+      throws Exception {
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
+      // The CR character is the last byte of the first block and the LF is
+      // the firs byte of the second block
+      writer.writeRecord(51, CR_LF);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+  }
+
+  @Test
+  public void lastByteInFirstBlockIsCRFirstByteInSecondBlockIsNotLF()
+      throws Exception {
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE, 1000, CR);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+      writer.writeRecord(10, LF);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
+  }
+
+  @Test
+  public void usingCRDelimiterWithSmallestBufferSize() throws Exception {
+    // Forces calling LineReader#fillBuffer for ever byte read
+    conf.set(IO_FILE_BUFFER_SIZE_KEY, "1");
+
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE - 50, 999, CR);
+      writer.writeRecord(100, CR);
+      writer.writeRecord(10, CR);
+      writer.writeRecord(10, CR);
+      writer.writeRecord(10, CR);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+  }
+
+  @Test
+  public void delimitedByCRSpanningThreeBlocks() throws Exception {
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeRecord(3 * BLOCK_SIZE, CR);
+      writer.writeRecord(3 * BLOCK_SIZE, CR);
+      writer.writeRecord(3 * BLOCK_SIZE, CR);
+    }
+    assertRecordCountsPerSplit(tempFile,
+        new long[] {1, 0, 1, 0, 0, 1, 0, 0, 0});
+  }
+
+  @Test
+  public void customDelimiterLastThreeBytesInBlockAreDelimiter()
+      throws Exception {
+    byte[] delimiter = new byte[] {'e', 'n', 'd'};
+    setDelimiter(delimiter);
+
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE, 1000, delimiter);
+      writer.writeRecord(10, delimiter);
+      writer.writeRecord(10, delimiter);
+      writer.writeRecord(10, delimiter);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
+  }
+
+  @Test
+  public void customDelimiterDelimiterSpansAcrossBlocks()
+      throws Exception {
+    byte[] delimiter = new byte[] {'e', 'n', 'd'};
+    setDelimiter(delimiter);
+
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE - 50, 999, delimiter);
+      writer.writeRecord(52, delimiter);
+      writer.writeRecord(10, delimiter);
+      writer.writeRecord(10, delimiter);
+      writer.writeRecord(10, delimiter);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
+  }
+
+  @Test
+  public void customDelimiterLastRecordDelimiterStartsAtNextBlockStart()
+      throws Exception {
+    byte[] delimiter = new byte[] {'e', 'n', 'd'};
+    setDelimiter(delimiter);
+
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE - 50, 999, delimiter);
+      writer.writeRecord(53, delimiter);
+      writer.writeRecord(10, delimiter);
+      writer.writeRecord(10, delimiter);
+      writer.writeRecord(10, delimiter);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+  }
+
+  @Test
+  public void customDelimiterLastBlockBytesShareCommonPrefixWithDelimiter()
+      throws Exception {
+    byte[] delimiter = new byte[] {'e', 'n', 'd'};
+    setDelimiter(delimiter);
+
+    try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+      writer.writeManyRecords(BLOCK_SIZE - 4, 999, delimiter);
+      // The first 4 bytes, "an e", will be the last 4 bytes of the first block,
+      // the last byte being 'e' which matches the first character of the
+      // delimiter "end". The first byte of the next block also matches the
+      // second byte of the delimiter "n"; however the next character "c" does
+      // not match the last character of the delimiter. Thus an additional
+      // record should not be read for the split that reads the first block.
+      // The split that reads the second block will just discard
+      // "nchanting tale coming to an end".
+      writer.write("an enchanting tale coming to an end");
+      writer.writeRecord(10, delimiter);
+      writer.writeRecord(10, delimiter);
+      writer.writeRecord(10, delimiter);
+    }
+    assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+  }
+
+  protected abstract BaseLineRecordReaderHelper newReader(Path file);
+
+  private void assertRecordCountsPerSplit(
+      Path path, long[] countsIfSplitAtBlocks) throws IOException {
+    RecordCountAssert countAssert =
+        new RecordCountAssert(path, countsIfSplitAtBlocks);
+    countAssert.assertSingleSplit();
+    countAssert.assertSplittingAtBlocks();
+    countAssert.assertSplittingJustAfterSecondBlockStarts();
+  }
+
+  private class RecordCountAssert {
+
+    private final BaseLineRecordReaderHelper reader;
+    private final long numBlocks;
+    private final long[] countsIfSplitAtBlocks;
+    private final long fileSize;
+    private final long totalRecords;
+    private final List<Long> nextBlockOffsets;
+
+    RecordCountAssert(
+        Path path, long[] countsIfSplitAtBlocks) throws IOException {
+      this.reader = newReader(path);
+      this.countsIfSplitAtBlocks = countsIfSplitAtBlocks;
+      this.fileSize = getFileSize(path);
+      this.totalRecords = Arrays.stream(countsIfSplitAtBlocks).sum();
+      this.numBlocks = countsIfSplitAtBlocks.length;
+      this.nextBlockOffsets = BZip2Utils.getNextBlockMarkerOffsets(path, conf);
+
+      assertEquals(numBlocks, nextBlockOffsets.size() + 1);
+    }
+
+    private void assertSingleSplit() throws IOException {
+      assertEquals(totalRecords, reader.countRecords(0, fileSize));
+    }
+
+    private void assertSplittingAtBlocks() throws IOException {
+      for (int i = 0; i < numBlocks; i++) {
+        long start = i == 0 ? 0 : nextBlockOffsets.get(i - 1);
+        long end = i == numBlocks - 1 ? fileSize : nextBlockOffsets.get(i);
+        long length = end - start;
+
+        String message = "At i=" + i;
+        long expectedCount = countsIfSplitAtBlocks[i];
+        assertEquals(
+            message, expectedCount, reader.countRecords(start, length));
+      }
+    }
+
+    private void assertSplittingJustAfterSecondBlockStarts()
+        throws IOException {
+      if (numBlocks <= 1) {
+        return;
+      }
+      long recordsInFirstTwoBlocks =
+          countsIfSplitAtBlocks[0] + countsIfSplitAtBlocks[1];
+      long remainingRecords = totalRecords - recordsInFirstTwoBlocks;
+
+      long firstSplitSize = nextBlockOffsets.get(0) + 1;
+      assertEquals(
+          recordsInFirstTwoBlocks,
+          reader.countRecords(0, firstSplitSize));
+      assertEquals(
+          remainingRecords,
+          reader.countRecords(firstSplitSize, fileSize - firstSplitSize));
+    }
+  }
+
+  private long getFileSize(Path path) throws IOException {
+    return fs.getFileStatus(path).getLen();
+  }
+
+  private void setDelimiter(byte[] delimiter) {
+    conf.set("textinputformat.record.delimiter",
+        new String(delimiter, StandardCharsets.UTF_8));
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReaderHelper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReaderHelper.java
new file mode 100644
index 00000000000..56fc7eb4e86
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReaderHelper.java
@@ -0,0 +1,62 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+public final class LineRecordReaderHelper extends
+    BaseLineRecordReaderHelper {
+
+  public LineRecordReaderHelper(Path filePath, Configuration conf) {
+    super(filePath, conf);
+  }
+
+  @Override
+  public long countRecords(long start, long length) throws IOException {
+    try (LineRecordReader reader = newReader(start, length)) {
+      long numRecords = 0L;
+      while (reader.nextKeyValue()) {
+        numRecords++;
+      }
+      return numRecords;
+    }
+  }
+
+  private LineRecordReader newReader(long start, long length)
+      throws IOException {
+    FileSplit split = new FileSplit(getFilePath(), start, length, null);
+
+    TaskAttemptContext context =
+        new TaskAttemptContextImpl(getConf(), new TaskAttemptID());
+
+    LineRecordReader reader = new LineRecordReader(getRecordDelimiterBytes());
+    try {
+      reader.initialize(split, context);
+      return reader;
+    } catch (Throwable e) {
+      reader.close();
+      throw e;
+    }
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReaderBZip2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReaderBZip2.java
new file mode 100644
index 00000000000..ec01ef332fa
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReaderBZip2.java
@@ -0,0 +1,29 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import org.apache.hadoop.fs.Path;
+
+public final class TestLineRecordReaderBZip2
+    extends BaseTestLineRecordReaderBZip2 {
+
+  @Override
+  protected BaseLineRecordReaderHelper newReader(Path file) {
+    return new LineRecordReaderHelper(file, getConf());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org