You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/06/24 19:50:44 UTC
[06/21] hadoop git commit: MAPREDUCE-5948.
org.apache.hadoop.mapred.LineRecordReader does not handle multibyte record
delimiters well. Contributed by Vinayakumar B, Rushabh Shah, and Akira AJISAKA
MAPREDUCE-5948. org.apache.hadoop.mapred.LineRecordReader does not handle multibyte record delimiters well. Contributed by Vinayakumar B, Rushabh Shah, and Akira AJISAKA
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/077250d8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/077250d8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/077250d8
Branch: refs/heads/HDFS-7240
Commit: 077250d8d7b4b757543a39a6ce8bb6e3be356c6f
Parents: 11ac848
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Jun 22 21:59:20 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Jun 22 21:59:20 2015 +0000
----------------------------------------------------------------------
.../java/org/apache/hadoop/util/LineReader.java | 8 ++
hadoop-mapreduce-project/CHANGES.txt | 4 +
.../apache/hadoop/mapred/LineRecordReader.java | 4 +-
.../mapreduce/lib/input/LineRecordReader.java | 3 +-
.../lib/input/UncompressedSplitLineReader.java | 125 +++++++++++++++++++
.../hadoop/mapred/TestLineRecordReader.java | 77 +++++++++++-
.../lib/input/TestLineRecordReader.java | 79 ++++++++++--
7 files changed, 286 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077250d8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
index 3188cb5..1d1b569 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
@@ -369,4 +369,12 @@ public class LineReader implements Closeable {
public int readLine(Text str) throws IOException {
return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
+
+ protected int getBufferPosn() {
+ return bufferPosn;
+ }
+
+ protected int getBufferSize() {
+ return bufferSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077250d8/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index bae2674..da1a2f3 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -498,6 +498,10 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6403. Fix typo in the usage of NNBench.
(Jagadesh Kiran N via aajisaka)
+ MAPREDUCE-5948. org.apache.hadoop.mapred.LineRecordReader does not handle
+ multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira
+ AJISAKA via jlowe)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077250d8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
index 45263c4..9802697 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader;
import org.apache.hadoop.mapreduce.lib.input.SplitLineReader;
+import org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
@@ -131,7 +132,8 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
}
} else {
fileIn.seek(start);
- in = new SplitLineReader(fileIn, job, recordDelimiter);
+ in = new UncompressedSplitLineReader(
+ fileIn, job, recordDelimiter, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077250d8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
index 5af8f43..9e1ca2a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
@@ -112,7 +112,8 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
}
} else {
fileIn.seek(start);
- in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
+ in = new UncompressedSplitLineReader(
+ fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077250d8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
new file mode 100644
index 0000000..52fb7b0
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+
+/**
+ * SplitLineReader for uncompressed files.
+ * This class can split the file correctly even if the delimiter is multi-bytes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class UncompressedSplitLineReader extends SplitLineReader {
+ private boolean needAdditionalRecord = false;
+ private long splitLength;
+ /** Total bytes read from the input stream. */
+ private long totalBytesRead = 0;
+ private boolean finished = false;
+ private boolean usingCRLF;
+ private int unusedBytes = 0;
+ private int lastBytesRead = 0;
+
+ public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf,
+ byte[] recordDelimiterBytes, long splitLength) throws IOException {
+ super(in, conf, recordDelimiterBytes);
+ this.splitLength = splitLength;
+ usingCRLF = (recordDelimiterBytes == null);
+ }
+
+ @Override
+ protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+ throws IOException {
+ int maxBytesToRead = buffer.length;
+ if (totalBytesRead < splitLength) {
+ maxBytesToRead = Math.min(maxBytesToRead,
+ (int)(splitLength - totalBytesRead));
+ }
+ int bytesRead = in.read(buffer, 0, maxBytesToRead);
+ lastBytesRead = bytesRead;
+
+ // If the split ended in the middle of a record delimiter then we need
+ // to read one additional record, as the consumer of the next split will
+ // not recognize the partial delimiter as a record.
+ // However if using the default delimiter and the next character is a
+ // linefeed then next split will treat it as a delimiter all by itself
+ // and the additional record read should not be performed.
+ if (totalBytesRead == splitLength && inDelimiter && bytesRead > 0) {
+ if (usingCRLF) {
+ needAdditionalRecord = (buffer[0] != '\n');
+ } else {
+ needAdditionalRecord = true;
+ }
+ }
+ if (bytesRead > 0) {
+ totalBytesRead += bytesRead;
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ long bytesRead = 0;
+ if (!finished) {
+ // only allow at most one more record to be read after the stream
+ // reports the split ended
+ if (totalBytesRead > splitLength) {
+ finished = true;
+ }
+ bytesRead = totalBytesRead;
+ int bytesConsumed = super.readLine(str, maxLineLength, maxBytesToConsume);
+ bytesRead = totalBytesRead - bytesRead;
+
+ // No records left.
+ if (bytesConsumed == 0 && bytesRead == 0) {
+ return 0;
+ }
+
+ int bufferSize = getBufferSize();
+
+ // Add the remaining buffer size not used for the last call
+ // of fillBuffer method.
+ if (lastBytesRead <= 0) {
+ bytesRead += bufferSize;
+ } else if (bytesRead > 0) {
+ bytesRead += bufferSize - lastBytesRead;
+ }
+
+ // Adjust the size of the buffer not used for this record.
+ // The size is carried over for the next calculation.
+ bytesRead += unusedBytes;
+ unusedBytes = bufferSize - getBufferPosn();
+ bytesRead -= unusedBytes;
+ }
+ return (int) bytesRead;
+ }
+
+ @Override
+ public boolean needAdditionalRecordAfterSplit() {
+ return !finished && needAdditionalRecord;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077250d8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
index cbbbeaa..a5c9933 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
@@ -25,13 +25,17 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.io.Charsets;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -41,6 +45,9 @@ import org.apache.hadoop.io.compress.Decompressor;
import org.junit.Test;
public class TestLineRecordReader {
+ private static Path workDir = new Path(new Path(System.getProperty(
+ "test.build.data", "target"), "data"), "TestTextInputFormat");
+ private static Path inputDir = new Path(workDir, "input");
private void testSplitRecords(String testFileName, long firstSplitLength)
throws IOException {
@@ -50,15 +57,27 @@ public class TestLineRecordReader {
long testFileSize = testFile.length();
Path testFilePath = new Path(testFile.getAbsolutePath());
Configuration conf = new Configuration();
+ testSplitRecordsForFile(conf, firstSplitLength, testFileSize, testFilePath);
+ }
+
+ private void testSplitRecordsForFile(Configuration conf,
+ long firstSplitLength, long testFileSize, Path testFilePath)
+ throws IOException {
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
- assertTrue("unexpected test data at " + testFile,
+ assertTrue("unexpected test data at " + testFilePath,
testFileSize > firstSplitLength);
+ String delimiter = conf.get("textinputformat.record.delimiter");
+ byte[] recordDelimiterBytes = null;
+ if (null != delimiter) {
+ recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+ }
// read the data without splitting to count the records
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
(String[])null);
- LineRecordReader reader = new LineRecordReader(conf, split);
+ LineRecordReader reader = new LineRecordReader(conf, split,
+ recordDelimiterBytes);
LongWritable key = new LongWritable();
Text value = new Text();
int numRecordsNoSplits = 0;
@@ -69,7 +88,7 @@ public class TestLineRecordReader {
// count the records in the first split
split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
- reader = new LineRecordReader(conf, split);
+ reader = new LineRecordReader(conf, split, recordDelimiterBytes);
int numRecordsFirstSplit = 0;
while (reader.next(key, value)) {
++numRecordsFirstSplit;
@@ -79,14 +98,14 @@ public class TestLineRecordReader {
// count the records in the second split
split = new FileSplit(testFilePath, firstSplitLength,
testFileSize - firstSplitLength, (String[])null);
- reader = new LineRecordReader(conf, split);
+ reader = new LineRecordReader(conf, split, recordDelimiterBytes);
int numRecordsRemainingSplits = 0;
while (reader.next(key, value)) {
++numRecordsRemainingSplits;
}
reader.close();
- assertEquals("Unexpected number of records in bzip2 compressed split",
+ assertEquals("Unexpected number of records in split",
numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
}
@@ -290,4 +309,52 @@ public class TestLineRecordReader {
}
assertEquals(10, decompressors.size());
}
+
+ /**
+ * Writes the input test file
+ *
+ * @param conf
+ * @return Path of the file created
+ * @throws IOException
+ */
+ private Path createInputFile(Configuration conf, String data)
+ throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Path file = new Path(inputDir, "test.txt");
+ Writer writer = new OutputStreamWriter(localFs.create(file));
+ try {
+ writer.write(data);
+ } finally {
+ writer.close();
+ }
+ return file;
+ }
+
+ @Test
+ public void testUncompressedInput() throws Exception {
+ Configuration conf = new Configuration();
+ String inputData = "abc+++def+++ghi+++"
+ + "jkl+++mno+++pqr+++stu+++vw +++xyz";
+ Path inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "+++");
+ for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ }
+
+ @Test
+ public void testUncompressedInputContainingCRLF() throws Exception {
+ Configuration conf = new Configuration();
+ String inputData = "a\r\nb\rc\nd\r\n";
+ Path inputFile = createInputFile(conf, inputData);
+ for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/077250d8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
index 8b385a0..3c1f28f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
@@ -25,13 +25,17 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.io.Charsets;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CodecPool;
@@ -42,6 +46,9 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.junit.Test;
public class TestLineRecordReader {
+ private static Path workDir = new Path(new Path(System.getProperty(
+ "test.build.data", "target"), "data"), "TestTextInputFormat");
+ private static Path inputDir = new Path(workDir, "input");
private void testSplitRecords(String testFileName, long firstSplitLength)
throws IOException {
@@ -51,17 +58,28 @@ public class TestLineRecordReader {
long testFileSize = testFile.length();
Path testFilePath = new Path(testFile.getAbsolutePath());
Configuration conf = new Configuration();
+ testSplitRecordsForFile(conf, firstSplitLength, testFileSize, testFilePath);
+ }
+
+ private void testSplitRecordsForFile(Configuration conf,
+ long firstSplitLength, long testFileSize, Path testFilePath)
+ throws IOException {
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
- assertTrue("unexpected test data at " + testFile,
+ assertTrue("unexpected test data at " + testFilePath,
testFileSize > firstSplitLength);
+ String delimiter = conf.get("textinputformat.record.delimiter");
+ byte[] recordDelimiterBytes = null;
+ if (null != delimiter) {
+ recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+ }
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
// read the data without splitting to count the records
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
(String[])null);
- LineRecordReader reader = new LineRecordReader();
+ LineRecordReader reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
int numRecordsNoSplits = 0;
while (reader.nextKeyValue()) {
@@ -71,7 +89,7 @@ public class TestLineRecordReader {
// count the records in the first split
split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
- reader = new LineRecordReader();
+ reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
int numRecordsFirstSplit = 0;
while (reader.nextKeyValue()) {
@@ -82,16 +100,15 @@ public class TestLineRecordReader {
// count the records in the second split
split = new FileSplit(testFilePath, firstSplitLength,
testFileSize - firstSplitLength, (String[])null);
- reader = new LineRecordReader();
+ reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
int numRecordsRemainingSplits = 0;
while (reader.nextKeyValue()) {
++numRecordsRemainingSplits;
}
reader.close();
-
- assertEquals("Unexpected number of records in bzip2 compressed split",
- numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
+ assertEquals("Unexpected number of records in split ", numRecordsNoSplits,
+ numRecordsFirstSplit + numRecordsRemainingSplits);
}
@Test
@@ -276,4 +293,52 @@ public class TestLineRecordReader {
}
assertEquals(10, decompressors.size());
}
+
+ /**
+ * Writes the input test file
+ *
+ * @param conf
+ * @return Path of the file created
+ * @throws IOException
+ */
+ private Path createInputFile(Configuration conf, String data)
+ throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Path file = new Path(inputDir, "test.txt");
+ Writer writer = new OutputStreamWriter(localFs.create(file));
+ try {
+ writer.write(data);
+ } finally {
+ writer.close();
+ }
+ return file;
+ }
+
+ @Test
+ public void testUncompressedInput() throws Exception {
+ Configuration conf = new Configuration();
+ String inputData = "abc+++def+++ghi+++"
+ + "jkl+++mno+++pqr+++stu+++vw +++xyz";
+ Path inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "+++");
+ for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ }
+
+ @Test
+ public void testUncompressedInputContainingCRLF() throws Exception {
+ Configuration conf = new Configuration();
+ String inputData = "a\r\nb\rc\nd\r\n";
+ Path inputFile = createInputFile(conf, inputData);
+ for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ }
}