You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by pr...@apache.org on 2022/07/06 04:30:34 UTC
[hadoop] branch trunk updated: HADOOP-18321.Fix when to read an additional record from a BZip2 text file split (#4521)
This is an automated email from the ASF dual-hosted git repository.
prabhujoseph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new a432925f74b HADOOP-18321.Fix when to read an additional record from a BZip2 text file split (#4521)
a432925f74b is described below
commit a432925f74b93d05b4dfdd1831bfbabbf4466a80
Author: Ashutosh Gupta <as...@st.niituniversity.in>
AuthorDate: Wed Jul 6 05:30:14 2022 +0100
HADOOP-18321.Fix when to read an additional record from a BZip2 text file split (#4521)
* HADOOP-18321.Fix when to read an additional record from a BZip2 text file split
Co-authored-by: Ashutosh Gupta <as...@amazon.com> and Reviewed by Akira Ajisaka.
---
.../org/apache/hadoop/io/compress/BZip2Codec.java | 13 +-
.../io/compress/bzip2/CBZip2InputStream.java | 16 +-
.../io/compress/bzip2/CBZip2OutputStream.java | 9 +-
.../io/compress/bzip2/BZip2TextFileWriter.java | 114 +++++++
.../hadoop/io/compress/bzip2/BZip2Utils.java | 67 ++++
.../io/compress/bzip2/TestBZip2TextFileWriter.java | 91 +++++
.../lib/input/CompressedSplitLineReader.java | 12 +-
.../hadoop/mapred/LineRecordReaderHelper.java | 54 +++
.../hadoop/mapred/TestLineRecordReaderBZip2.java | 31 ++
.../lib/input/BaseLineRecordReaderHelper.java | 58 ++++
.../lib/input/BaseTestLineRecordReaderBZip2.java | 375 +++++++++++++++++++++
.../lib/input/LineRecordReaderHelper.java | 62 ++++
.../lib/input/TestLineRecordReaderBZip2.java | 29 ++
13 files changed, 921 insertions(+), 10 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
index 7640f7ed7a6..1564ae90855 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@@ -255,10 +256,7 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
private void writeStreamHeader() throws IOException {
if (super.out != null) {
- // The compressed bzip2 stream should start with the
- // identifying characters BZ. Caller of CBZip2OutputStream
- // i.e. this class must write these characters.
- out.write(HEADER.getBytes(StandardCharsets.UTF_8));
+ writeHeader(out);
}
}
@@ -547,4 +545,11 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
}// end of BZip2CompressionInputStream
+ @VisibleForTesting
+ public static void writeHeader(OutputStream out) throws IOException {
+ // The compressed bzip2 stream should start with the
+ // identifying characters BZ. Caller of CBZip2OutputStream
+ // i.e. this class must write these characters.
+ out.write(HEADER.getBytes(StandardCharsets.UTF_8));
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
index 187fe481588..61e88d80d8c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
@@ -27,6 +27,7 @@ import java.io.BufferedInputStream;
import java.io.InputStream;
import java.io.IOException;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE;
@@ -312,13 +313,24 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
}
} else if (readMode == READ_MODE.BYBLOCK) {
this.currentState = STATE.NO_PROCESS_STATE;
- skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH);
+ skipResult = skipToNextBlockMarker();
if(!skipDecompression){
changeStateToProcessABlock();
}
}
}
+ /**
+ * Skips bytes in the stream until the start marker of a block is reached
+ * or end of stream is reached. Used for testing purposes to identify the
+ * start offsets of blocks.
+ */
+ @VisibleForTesting
+ boolean skipToNextBlockMarker() throws IOException {
+ return skipToNextMarker(
+ CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH);
+ }
+
/**
* Returns the number of bytes between the current stream position
* and the immediate next BZip2 block marker.
@@ -428,7 +440,7 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
//report 'end of block' or 'end of stream'
result = b;
- skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH);
+ skipResult = skipToNextBlockMarker();
changeStateToProcessABlock();
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
index 39c3638b0f4..50bdddb8136 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
@@ -27,6 +27,7 @@ package org.apache.hadoop.io.compress.bzip2;
import java.io.OutputStream;
import java.io.IOException;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.io.IOUtils;
/**
@@ -781,8 +782,7 @@ public class CBZip2OutputStream extends OutputStream implements BZip2Constants {
inUse[i] = false;
}
- /* 20 is just a paranoia constant */
- this.allowableBlockSize = (this.blockSize100k * BZip2Constants.baseBlockSize) - 20;
+ this.allowableBlockSize = getAllowableBlockSize(this.blockSize100k);
}
private void endBlock() throws IOException {
@@ -2093,4 +2093,9 @@ public class CBZip2OutputStream extends OutputStream implements BZip2Constants {
}
+ @VisibleForTesting
+ static int getAllowableBlockSize(int blockSize100k) {
+ /* 20 is just a paranoia constant */
+ return (blockSize100k * BZip2Constants.baseBlockSize) - 20;
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2TextFileWriter.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2TextFileWriter.java
new file mode 100644
index 00000000000..e40f631b603
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2TextFileWriter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.io.compress.bzip2;
+
+import static org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream.MIN_BLOCKSIZE;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.BZip2Codec;
+
+/**
+ * A writer that simplifies creating BZip2 compressed text data for testing
+ * purposes.
+ */
+public final class BZip2TextFileWriter implements Closeable {
+
+ // Use minimum block size to reduce amount of data to require to be written
+ // to CBZip2OutputStream before a new block is created.
+ private static final int BLOCK_SIZE_100K = MIN_BLOCKSIZE;
+
+ /**
+ * The amount of bytes of run-length encoded data that needs to be written
+ * to this writer in order for the next byte written starts a new BZip2 block.
+ */
+ public static final int BLOCK_SIZE =
+ // The + 1 is needed because of how CBZip2OutputStream checks whether the
+ // last offset written is less than allowable block size. Because the last
+ // offset is one less of the amount of bytes written to the block, we need
+ // to write an extra byte to trigger writing a new block.
+ CBZip2OutputStream.getAllowableBlockSize(BLOCK_SIZE_100K) + 1;
+
+ private final CBZip2OutputStream out;
+
+ public BZip2TextFileWriter(Path path, Configuration conf) throws IOException {
+ this(path.getFileSystem(conf).create(path));
+ }
+
+ public BZip2TextFileWriter(OutputStream rawOut) throws IOException {
+ try {
+ BZip2Codec.writeHeader(rawOut);
+ out = new CBZip2OutputStream(rawOut, BLOCK_SIZE_100K);
+ } catch (Throwable e) {
+ rawOut.close();
+ throw e;
+ }
+ }
+
+ public void writeManyRecords(int totalSize, int numRecords, byte[] delimiter)
+ throws IOException {
+ checkArgument(numRecords > 0);
+ checkArgument(delimiter.length > 0);
+
+ int minRecordSize = totalSize / numRecords;
+ checkArgument(minRecordSize >= delimiter.length);
+
+ int lastRecordExtraSize = totalSize % numRecords;
+
+ for (int i = 0; i < numRecords - 1; i++) {
+ writeRecord(minRecordSize, delimiter);
+ }
+ writeRecord(minRecordSize + lastRecordExtraSize, delimiter);
+ }
+
+ public void writeRecord(int totalSize, byte[] delimiter) throws IOException {
+ checkArgument(delimiter.length > 0);
+ checkArgument(totalSize >= delimiter.length);
+
+ int contentSize = totalSize - delimiter.length;
+ for (int i = 0; i < contentSize; i++) {
+ // Alternate between characters so that internals of CBZip2OutputStream
+ // cannot condensed the written bytes using run-length encoding. This
+ // allows the caller to use #BLOCK_SIZE in order to know whether the next
+ // write will end just before the end of the current block, or exceed it,
+ // and by how much.
+ out.write(i % 2 == 0 ? 'a' : 'b');
+ }
+ write(delimiter);
+ }
+
+ public void write(String bytes) throws IOException {
+ write(bytes.getBytes(StandardCharsets.UTF_8));
+ }
+
+ public void write(byte[] bytes) throws IOException {
+ out.write(bytes);
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2Utils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2Utils.java
new file mode 100644
index 00000000000..717b282797e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2Utils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.bzip2;
+
+import static org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE.BYBLOCK;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public final class BZip2Utils {
+
+ private BZip2Utils() {
+ }
+
+ /**
+ * Returns the start offsets of blocks that follow the first block in the
+ * BZip2 compressed file at the given path. The first offset corresponds to
+ * the first byte containing the BZip2 block marker of the second block. The
+ * i-th offset corresponds to the block marker of the (i + 1)-th block.
+ */
+ public static List<Long> getNextBlockMarkerOffsets(
+ Path path, Configuration conf) throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ try (InputStream fileIn = fs.open(path)) {
+ return getNextBlockMarkerOffsets(fileIn);
+ }
+ }
+
+ /**
+ * Returns the start offsets of blocks that follow the first block in the
+ * BZip2 compressed input stream. The first offset corresponds to
+ * the first byte containing the BZip2 block marker of the second block. The
+ * i-th offset corresponds to the block marker of the (i + 1)-th block.
+ */
+ public static List<Long> getNextBlockMarkerOffsets(InputStream rawIn)
+ throws IOException {
+ try (CBZip2InputStream in = new CBZip2InputStream(rawIn, BYBLOCK)) {
+ ArrayList<Long> offsets = new ArrayList<>();
+ while (in.skipToNextBlockMarker()) {
+ offsets.add(in.getProcessedByteCount());
+ }
+ return offsets;
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBZip2TextFileWriter.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBZip2TextFileWriter.java
new file mode 100644
index 00000000000..9b36f899b6d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBZip2TextFileWriter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress.bzip2;
+
+import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public final class TestBZip2TextFileWriter {
+
+ private static final byte[] DELIMITER = new byte[] {'\0'};
+
+ private ByteArrayOutputStream rawOut;
+ private BZip2TextFileWriter writer;
+
+ @Before
+ public void setUp() throws Exception {
+ rawOut = new ByteArrayOutputStream();
+ writer = new BZip2TextFileWriter(rawOut);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ rawOut = null;
+ writer.close();
+ }
+
+ @Test
+ public void writingSingleBlockSizeOfData() throws Exception {
+ writer.writeRecord(BLOCK_SIZE, DELIMITER);
+ writer.close();
+
+ List<Long> nextBlocks = getNextBlockMarkerOffsets();
+ assertEquals(0, nextBlocks.size());
+ }
+
+ @Test
+ public void justExceedingBeyondBlockSize() throws Exception {
+ writer.writeRecord(BLOCK_SIZE + 1, DELIMITER);
+ writer.close();
+
+ List<Long> nextBlocks = getNextBlockMarkerOffsets();
+ assertEquals(1, nextBlocks.size());
+ }
+
+ @Test
+ public void writingTwoBlockSizesOfData() throws Exception {
+ writer.writeRecord(2 * BLOCK_SIZE, DELIMITER);
+ writer.close();
+
+ List<Long> nextBlocks = getNextBlockMarkerOffsets();
+ assertEquals(1, nextBlocks.size());
+ }
+
+ @Test
+ public void justExceedingBeyondTwoBlocks() throws Exception {
+ writer.writeRecord(2 * BLOCK_SIZE + 1, DELIMITER);
+ writer.close();
+
+ List<Long> nextBlocks = getNextBlockMarkerOffsets();
+ assertEquals(2, nextBlocks.size());
+ }
+
+ private List<Long> getNextBlockMarkerOffsets() throws IOException {
+ ByteArrayInputStream in = new ByteArrayInputStream(rawOut.toByteArray());
+ return BZip2Utils.getNextBlockMarkerOffsets(in);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java
index 9d0e949a10b..74b959cf47c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java
@@ -127,6 +127,8 @@ public class CompressedSplitLineReader extends SplitLineReader {
@Override
protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
throws IOException {
+ boolean alreadyReadAfterSplit = didReadAfterSplit();
+
int bytesRead = in.read(buffer);
// If the split ended in the middle of a record delimiter then we need
@@ -135,7 +137,9 @@ public class CompressedSplitLineReader extends SplitLineReader {
// However if using the default delimiter and the next character is a
// linefeed then next split will treat it as a delimiter all by itself
// and the additional record read should not be performed.
- if (inDelimiter && bytesRead > 0) {
+ boolean justReadAfterSplit = !alreadyReadAfterSplit && didReadAfterSplit();
+
+ if (justReadAfterSplit && inDelimiter && bytesRead > 0) {
if (usingCRLF) {
needAdditionalRecord = (buffer[0] != '\n');
} else {
@@ -152,7 +156,7 @@ public class CompressedSplitLineReader extends SplitLineReader {
if (!finished) {
// only allow at most one more record to be read after the stream
// reports the split ended
- if (scin.getPos() > scin.getAdjustedEnd()) {
+ if (didReadAfterSplit()) {
finished = true;
}
@@ -170,4 +174,8 @@ public class CompressedSplitLineReader extends SplitLineReader {
protected void unsetNeedAdditionalRecordAfterSplit() {
needAdditionalRecord = false;
}
+
+ private boolean didReadAfterSplit() throws IOException {
+ return scin.getPos() > scin.getAdjustedEnd();
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/LineRecordReaderHelper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/LineRecordReaderHelper.java
new file mode 100644
index 00000000000..fccc01ad7e6
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/LineRecordReaderHelper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.BaseLineRecordReaderHelper;
+
+public final class LineRecordReaderHelper extends
+ BaseLineRecordReaderHelper {
+
+ public LineRecordReaderHelper(Path filePath, Configuration conf) {
+ super(filePath, conf);
+ }
+
+ @Override
+ public long countRecords(long start, long length) throws IOException {
+ try (LineRecordReader reader = newReader(start, length)) {
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+
+ long numRecords = 0L;
+ while (reader.next(key, value)) {
+ numRecords++;
+ }
+ return numRecords;
+ }
+ }
+
+ private LineRecordReader newReader(long start, long length)
+ throws IOException {
+ FileSplit split = new FileSplit(getFilePath(), start, length, (String[]) null);
+ return new LineRecordReader(getConf(), split, getRecordDelimiterBytes());
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReaderBZip2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReaderBZip2.java
new file mode 100644
index 00000000000..1588e629c8c
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReaderBZip2.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.BaseLineRecordReaderHelper;
+import org.apache.hadoop.mapreduce.lib.input.BaseTestLineRecordReaderBZip2;
+
+public final class TestLineRecordReaderBZip2 extends
+ BaseTestLineRecordReaderBZip2 {
+
+ @Override
+ protected BaseLineRecordReaderHelper newReader(Path file) {
+ return new LineRecordReaderHelper(file, getConf());
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseLineRecordReaderHelper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseLineRecordReaderHelper.java
new file mode 100644
index 00000000000..d1e21e6e258
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseLineRecordReaderHelper.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public abstract class BaseLineRecordReaderHelper {
+
+ private final Configuration conf;
+ private final Path filePath;
+ private final byte[] recordDelimiterBytes;
+
+
+
+ public BaseLineRecordReaderHelper(Path filePath, Configuration conf) {
+ this.conf = conf;
+ this.filePath = filePath;
+
+ conf.setInt(LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+
+ String delimiter = conf.get("textinputformat.record.delimiter");
+ this.recordDelimiterBytes =
+ null != delimiter ? delimiter.getBytes(StandardCharsets.UTF_8) : null;
+ }
+
+ public abstract long countRecords(long start, long length) throws IOException;
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public Path getFilePath() {
+ return filePath;
+ }
+
+ public byte[] getRecordDelimiterBytes() {
+ return recordDelimiterBytes;
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java
new file mode 100644
index 00000000000..cd400365efa
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.bzip2.BZip2Utils;
+import org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public abstract class BaseTestLineRecordReaderBZip2 {
+
+ // LF stands for line feed
+ private static final byte[] LF = new byte[] {'\n'};
+ // CR stands for cartridge return
+ private static final byte[] CR = new byte[] {'\r'};
+ private static final byte[] CR_LF = new byte[] {'\r', '\n'};
+
+ private Configuration conf;
+ private FileSystem fs;
+ private Path tempFile;
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public FileSystem getFs() {
+ return fs;
+ }
+
+ public Path getTempFile() {
+ return tempFile;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new Configuration();
+
+ Path workDir = new Path(
+ System.getProperty("test.build.data", "target"),
+ "data/" + getClass().getSimpleName());
+
+ fs = workDir.getFileSystem(conf);
+
+ Path inputDir = new Path(workDir, "input");
+ tempFile = new Path(inputDir, "test.txt.bz2");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ fs.delete(tempFile, /* recursive */ false);
+ }
+
+ @Test
+ public void firstBlockEndsWithLF() throws Exception {
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE, 1000, LF);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
+ }
+
+ @Test
+ public void firstBlockEndsWithLFSecondBlockStartsWithLF() throws Exception {
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE, 1000, LF);
+ // Write 254 empty rows terminating at LF, as those records will get
+ // rolled into the first block record due to run-length encoding, the
+ // 255th LF character will trigger a run to be written to the block. We
+ // only need 254 LF characters since the last byte written by prior
+ // writeManyRecords call is already a LF.
+ writer.writeManyRecords(254, 254, LF);
+
+ // This LF character should be the first byte of the second block, but
+ // if splitting at blocks, the first split will read this record as the
+ // additional record.
+ writer.writeRecord(1, LF);
+
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1255, 2});
+ }
+
+ @Test
+ public void firstBlockEndsWithLFSecondBlockStartsWithCR() throws Exception {
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE, 1000, LF);
+ writer.writeRecord(1, CR);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
+ }
+
+ @Test
+ public void firstBlockEndsWithCRLF() throws Exception {
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE, 1000, CR_LF);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
+ }
+
+ @Test
+ public void lastRecordContentSpanAcrossBlocks()
+ throws Exception {
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
+ writer.writeRecord(100, LF);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+ }
+
+ @Test
+ public void lastRecordOfBlockHasItsLFInNextBlock() throws Exception {
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
+ // The LF character is the first byte of the second block
+ writer.writeRecord(51, LF);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+ }
+
+ @Test
+ public void lastRecordOfFirstBlockHasItsCRLFInSecondBlock() throws Exception {
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
+ // Both CR + LF characters are the first two bytes of second block
+ writer.writeRecord(52, CR_LF);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+ }
+
+ @Test
+ public void lastRecordOfFirstBlockHasItsCRLFPartlyInSecondBlock()
+ throws Exception {
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
+ // The CR character is the last byte of the first block and the LF is
+ // the firs byte of the second block
+ writer.writeRecord(51, CR_LF);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+ }
+
+ @Test
+ public void lastByteInFirstBlockIsCRFirstByteInSecondBlockIsNotLF()
+ throws Exception {
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE, 1000, CR);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ writer.writeRecord(10, LF);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
+ }
+
+ @Test
+ public void usingCRDelimiterWithSmallestBufferSize() throws Exception {
+ // Forces calling LineReader#fillBuffer for ever byte read
+ conf.set(IO_FILE_BUFFER_SIZE_KEY, "1");
+
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE - 50, 999, CR);
+ writer.writeRecord(100, CR);
+ writer.writeRecord(10, CR);
+ writer.writeRecord(10, CR);
+ writer.writeRecord(10, CR);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+ }
+
+ @Test
+ public void delimitedByCRSpanningThreeBlocks() throws Exception {
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeRecord(3 * BLOCK_SIZE, CR);
+ writer.writeRecord(3 * BLOCK_SIZE, CR);
+ writer.writeRecord(3 * BLOCK_SIZE, CR);
+ }
+ assertRecordCountsPerSplit(tempFile,
+ new long[] {1, 0, 1, 0, 0, 1, 0, 0, 0});
+ }
+
+ @Test
+ public void customDelimiterLastThreeBytesInBlockAreDelimiter()
+ throws Exception {
+ byte[] delimiter = new byte[] {'e', 'n', 'd'};
+ setDelimiter(delimiter);
+
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE, 1000, delimiter);
+ writer.writeRecord(10, delimiter);
+ writer.writeRecord(10, delimiter);
+ writer.writeRecord(10, delimiter);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
+ }
+
+ @Test
+ public void customDelimiterDelimiterSpansAcrossBlocks()
+ throws Exception {
+ byte[] delimiter = new byte[] {'e', 'n', 'd'};
+ setDelimiter(delimiter);
+
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE - 50, 999, delimiter);
+ writer.writeRecord(52, delimiter);
+ writer.writeRecord(10, delimiter);
+ writer.writeRecord(10, delimiter);
+ writer.writeRecord(10, delimiter);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
+ }
+
+ @Test
+ public void customDelimiterLastRecordDelimiterStartsAtNextBlockStart()
+ throws Exception {
+ byte[] delimiter = new byte[] {'e', 'n', 'd'};
+ setDelimiter(delimiter);
+
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE - 50, 999, delimiter);
+ writer.writeRecord(53, delimiter);
+ writer.writeRecord(10, delimiter);
+ writer.writeRecord(10, delimiter);
+ writer.writeRecord(10, delimiter);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+ }
+
+ @Test
+ public void customDelimiterLastBlockBytesShareCommonPrefixWithDelimiter()
+ throws Exception {
+ byte[] delimiter = new byte[] {'e', 'n', 'd'};
+ setDelimiter(delimiter);
+
+ try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
+ writer.writeManyRecords(BLOCK_SIZE - 4, 999, delimiter);
+ // The first 4 bytes, "an e", will be the last 4 bytes of the first block,
+ // the last byte being 'e' which matches the first character of the
+ // delimiter "end". The first byte of the next block also matches the
+ // second byte of the delimiter "n"; however the next character "c" does
+ // not match the last character of the delimiter. Thus an additional
+ // record should not be read for the split that reads the first block.
+ // The split that reads the second block will just discard
+ // "nchanting tale coming to an end".
+ writer.write("an enchanting tale coming to an end");
+ writer.writeRecord(10, delimiter);
+ writer.writeRecord(10, delimiter);
+ writer.writeRecord(10, delimiter);
+ }
+ assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
+ }
+
+ protected abstract BaseLineRecordReaderHelper newReader(Path file);
+
+ private void assertRecordCountsPerSplit(
+ Path path, long[] countsIfSplitAtBlocks) throws IOException {
+ RecordCountAssert countAssert =
+ new RecordCountAssert(path, countsIfSplitAtBlocks);
+ countAssert.assertSingleSplit();
+ countAssert.assertSplittingAtBlocks();
+ countAssert.assertSplittingJustAfterSecondBlockStarts();
+ }
+
+ private class RecordCountAssert {
+
+ private final BaseLineRecordReaderHelper reader;
+ private final long numBlocks;
+ private final long[] countsIfSplitAtBlocks;
+ private final long fileSize;
+ private final long totalRecords;
+ private final List<Long> nextBlockOffsets;
+
+ RecordCountAssert(
+ Path path, long[] countsIfSplitAtBlocks) throws IOException {
+ this.reader = newReader(path);
+ this.countsIfSplitAtBlocks = countsIfSplitAtBlocks;
+ this.fileSize = getFileSize(path);
+ this.totalRecords = Arrays.stream(countsIfSplitAtBlocks).sum();
+ this.numBlocks = countsIfSplitAtBlocks.length;
+ this.nextBlockOffsets = BZip2Utils.getNextBlockMarkerOffsets(path, conf);
+
+ assertEquals(numBlocks, nextBlockOffsets.size() + 1);
+ }
+
+ private void assertSingleSplit() throws IOException {
+ assertEquals(totalRecords, reader.countRecords(0, fileSize));
+ }
+
+ private void assertSplittingAtBlocks() throws IOException {
+ for (int i = 0; i < numBlocks; i++) {
+ long start = i == 0 ? 0 : nextBlockOffsets.get(i - 1);
+ long end = i == numBlocks - 1 ? fileSize : nextBlockOffsets.get(i);
+ long length = end - start;
+
+ String message = "At i=" + i;
+ long expectedCount = countsIfSplitAtBlocks[i];
+ assertEquals(
+ message, expectedCount, reader.countRecords(start, length));
+ }
+ }
+
+ private void assertSplittingJustAfterSecondBlockStarts()
+ throws IOException {
+ if (numBlocks <= 1) {
+ return;
+ }
+ long recordsInFirstTwoBlocks =
+ countsIfSplitAtBlocks[0] + countsIfSplitAtBlocks[1];
+ long remainingRecords = totalRecords - recordsInFirstTwoBlocks;
+
+ long firstSplitSize = nextBlockOffsets.get(0) + 1;
+ assertEquals(
+ recordsInFirstTwoBlocks,
+ reader.countRecords(0, firstSplitSize));
+ assertEquals(
+ remainingRecords,
+ reader.countRecords(firstSplitSize, fileSize - firstSplitSize));
+ }
+ }
+
+ private long getFileSize(Path path) throws IOException {
+ return fs.getFileStatus(path).getLen();
+ }
+
+ private void setDelimiter(byte[] delimiter) {
+ conf.set("textinputformat.record.delimiter",
+ new String(delimiter, StandardCharsets.UTF_8));
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReaderHelper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReaderHelper.java
new file mode 100644
index 00000000000..56fc7eb4e86
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReaderHelper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+public final class LineRecordReaderHelper extends
+ BaseLineRecordReaderHelper {
+
+ public LineRecordReaderHelper(Path filePath, Configuration conf) {
+ super(filePath, conf);
+ }
+
+ @Override
+ public long countRecords(long start, long length) throws IOException {
+ try (LineRecordReader reader = newReader(start, length)) {
+ long numRecords = 0L;
+ while (reader.nextKeyValue()) {
+ numRecords++;
+ }
+ return numRecords;
+ }
+ }
+
+ private LineRecordReader newReader(long start, long length)
+ throws IOException {
+ FileSplit split = new FileSplit(getFilePath(), start, length, null);
+
+ TaskAttemptContext context =
+ new TaskAttemptContextImpl(getConf(), new TaskAttemptID());
+
+ LineRecordReader reader = new LineRecordReader(getRecordDelimiterBytes());
+ try {
+ reader.initialize(split, context);
+ return reader;
+ } catch (Throwable e) {
+ reader.close();
+ throw e;
+ }
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReaderBZip2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReaderBZip2.java
new file mode 100644
index 00000000000..ec01ef332fa
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReaderBZip2.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import org.apache.hadoop.fs.Path;
+
+public final class TestLineRecordReaderBZip2
+ extends BaseTestLineRecordReaderBZip2 {
+
+ @Override
+ protected BaseLineRecordReaderHelper newReader(Path file) {
+ return new LineRecordReaderHelper(file, getConf());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org