You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2015/12/01 23:37:52 UTC
[35/46] hadoop git commit: MAPREDUCE-6549. multibyte delimiters with
LineRecordReader cause duplicate records (wilfreds via rkanter)
MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause duplicate records (wilfreds via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7fd00b3d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7fd00b3d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7fd00b3d
Branch: refs/heads/HDFS-1312
Commit: 7fd00b3db4b7d73afd41276ba9a06ec06a0e1762
Parents: e556c35
Author: Robert Kanter <rk...@apache.org>
Authored: Wed Nov 25 17:03:38 2015 -0800
Committer: Robert Kanter <rk...@apache.org>
Committed: Wed Nov 25 17:03:38 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/hadoop/util/LineReader.java | 9 +
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../lib/input/UncompressedSplitLineReader.java | 5 +
.../hadoop/mapred/TestLineRecordReader.java | 230 +++++++++++++-----
.../lib/input/TestLineRecordReader.java | 237 ++++++++++++++-----
5 files changed, 361 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
index 900215a..153953d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
@@ -333,6 +333,10 @@ public class LineReader implements Closeable {
//appending the ambiguous characters (refer case 2.2)
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
ambiguousByteCount = 0;
+ // since it is now certain that the split did not split a delimiter we
+ // should not read the next record: clear the flag otherwise duplicate
+ // records could be generated
+ unsetNeedAdditionalRecordAfterSplit();
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
@@ -380,4 +384,9 @@ public class LineReader implements Closeable {
protected int getBufferSize() {
return bufferSize;
}
+
+ protected void unsetNeedAdditionalRecordAfterSplit() {
+ // needed for custom multi byte line delimiters only
+ // see MAPREDUCE-6549 for details
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index c6e80e7..503e687 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -650,6 +650,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6557. Tests in mapreduce-client-app are writing outside of
target. (Akira AJISAKA via junping_du)
+ MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause
+ duplicate records (wilfreds via rkanter)
+
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
index 38491b0..6d495ef 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
@@ -97,4 +97,9 @@ public class UncompressedSplitLineReader extends SplitLineReader {
public boolean needAdditionalRecordAfterSplit() {
return !finished && needAdditionalRecord;
}
+
+ @Override
+ protected void unsetNeedAdditionalRecordAfterSplit() {
+ needAdditionalRecord = false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
index d33a614..f0cf9f5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
@@ -334,12 +334,72 @@ public class TestLineRecordReader {
@Test
public void testUncompressedInput() throws Exception {
Configuration conf = new Configuration();
- String inputData = "abc+++def+++ghi+++"
- + "jkl+++mno+++pqr+++stu+++vw +++xyz";
+ // single char delimiter, best case
+ String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz";
Path inputFile = createInputFile(conf, inputData);
- conf.set("textinputformat.record.delimiter", "+++");
- for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
- for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.set("textinputformat.record.delimiter", "+");
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ // multi char delimiter, best case
+ inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz";
+ inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "|+|");
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ // single char delimiter with empty records
+ inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz";
+ inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "+");
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ // multi char delimiter with empty records
+ inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz";
+ inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "|+|");
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ // multi char delimiter with starting part of the delimiter in the data
+ inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz";
+ inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "+-");
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ // multi char delimiter with newline as start of the delimiter
+ inputData = "abc\n+def\n+ghi\n+jkl\n+mno";
+ inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "\n+");
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ // multi char delimiter with newline in delimiter and in data
+ inputData = "abc\ndef+\nghi+\njkl\nmno";
+ inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "+\n");
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
@@ -363,80 +423,126 @@ public class TestLineRecordReader {
public void testUncompressedInputCustomDelimiterPosValue()
throws Exception {
Configuration conf = new Configuration();
- String inputData = "1234567890ab12ab345";
- Path inputFile = createInputFile(conf, inputData);
conf.setInt("io.file.buffer.size", 10);
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
- String delimiter = "ab";
+ String inputData = "abcdefghij++kl++mno";
+ Path inputFile = createInputFile(conf, inputData);
+ String delimiter = "++";
byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
- FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
+ // the first split must contain two records to make sure that it also pulls
+ // in the record from the 2nd split
+ int splitLength = 15;
+ FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[]) null);
LineRecordReader reader = new LineRecordReader(conf, split,
recordDelimiterBytes);
LongWritable key = new LongWritable();
Text value = new Text();
- reader.next(key, value);
- // Get first record:"1234567890"
- assertEquals(10, value.getLength());
- // Position should be 12 right after "1234567890ab"
- assertEquals(12, reader.getPos());
- reader.next(key, value);
- // Get second record:"12"
- assertEquals(2, value.getLength());
- // Position should be 16 right after "1234567890ab12ab"
- assertEquals(16, reader.getPos());
- reader.next(key, value);
- // Get third record:"345"
- assertEquals(3, value.getLength());
- // Position should be 19 right after "1234567890ab12ab345"
- assertEquals(19, reader.getPos());
+ // Get first record: "abcdefghij"
+ assertTrue("Expected record got nothing", reader.next(key, value));
+ assertEquals("Wrong length for record value", 10, value.getLength());
+ // Position should be 12 right after "abcdefghij++"
+ assertEquals("Wrong position after record read", 12, reader.getPos());
+ // Get second record: "kl"
+ assertTrue("Expected record got nothing", reader.next(key, value));
+ assertEquals("Wrong length for record value", 2, value.getLength());
+ // Position should be 16 right after "abcdefghij++kl++"
+ assertEquals("Wrong position after record read", 16, reader.getPos());
+ // Get third record: "mno"
+ assertTrue("Expected record got nothing", reader.next(key, value));
+ assertEquals("Wrong length for record value", 3, value.getLength());
+ // Position should be 19 right after "abcdefghij++kl++mno"
+ assertEquals("Wrong position after record read", 19, reader.getPos());
assertFalse(reader.next(key, value));
- assertEquals(19, reader.getPos());
-
- split = new FileSplit(inputFile, 15, 4, (String[])null);
- reader = new LineRecordReader(conf, split, recordDelimiterBytes);
- // No record is in the second split because the second split dropped
+ assertEquals("Wrong position after record read", 19, reader.getPos());
+ reader.close();
+ // No record is in the second split because the second split will drop
// the first record, which was already reported by the first split.
- // The position should be 19 right after "1234567890ab12ab345"
- assertEquals(19, reader.getPos());
- assertFalse(reader.next(key, value));
- assertEquals(19, reader.getPos());
-
- inputData = "123456789aab";
- inputFile = createInputFile(conf, inputData);
- split = new FileSplit(inputFile, 0, 12, (String[])null);
+ split = new FileSplit(inputFile, splitLength,
+ inputData.length() - splitLength, (String[]) null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
- reader.next(key, value);
- // Get first record:"123456789a"
- assertEquals(10, value.getLength());
- // Position should be 12 right after "123456789aab"
- assertEquals(12, reader.getPos());
- assertFalse(reader.next(key, value));
- assertEquals(12, reader.getPos());
+ // The position should be 19 right after "abcdefghij++kl++mno" and should
+ // not change
+ assertEquals("Wrong position after record read", 19, reader.getPos());
+ assertFalse("Unexpected record returned", reader.next(key, value));
+ assertEquals("Wrong position after record read", 19, reader.getPos());
+ reader.close();
- inputData = "123456789a";
+ // multi char delimiter with starting part of the delimiter in the data
+ inputData = "abcd+efgh++ijk++mno";
inputFile = createInputFile(conf, inputData);
- split = new FileSplit(inputFile, 0, 10, (String[])null);
+ splitLength = 5;
+ split = new FileSplit(inputFile, 0, splitLength, (String[]) null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
- reader.next(key, value);
- // Get first record:"123456789a"
- assertEquals(10, value.getLength());
- // Position should be 10 right after "123456789a"
- assertEquals(10, reader.getPos());
+ // Get first record: "abcd+efgh"
+ assertTrue("Expected record got nothing", reader.next(key, value));
+ assertEquals("Wrong position after record read", 11, reader.getPos());
+ assertEquals("Wrong length for record value", 9, value.getLength());
+ // should have jumped over the delimiter, no record
+ assertFalse("Unexpected record returned", reader.next(key, value));
+ assertEquals("Wrong position after record read", 11, reader.getPos());
+ reader.close();
+ // next split: check for duplicate or dropped records
+ split = new FileSplit(inputFile, splitLength,
+ inputData.length() - splitLength, (String[]) null);
+ reader = new LineRecordReader(conf, split, recordDelimiterBytes);
+ // Get second record: "ijk" first in this split
+ assertTrue("Expected record got nothing", reader.next(key, value));
+ assertEquals("Wrong position after record read", 16, reader.getPos());
+ assertEquals("Wrong length for record value", 3, value.getLength());
+ // Get third record: "mno" second in this split
+ assertTrue("Expected record got nothing", reader.next(key, value));
+ assertEquals("Wrong position after record read", 19, reader.getPos());
+ assertEquals("Wrong length for record value", 3, value.getLength());
+ // should be at the end of the input
assertFalse(reader.next(key, value));
- assertEquals(10, reader.getPos());
+ assertEquals("Wrong position after record read", 19, reader.getPos());
+ reader.close();
- inputData = "123456789ab";
+ inputData = "abcd|efgh|+|ij|kl|+|mno|pqr";
inputFile = createInputFile(conf, inputData);
- split = new FileSplit(inputFile, 0, 11, (String[])null);
- reader = new LineRecordReader(conf, split, recordDelimiterBytes);
- reader.next(key, value);
- // Get first record:"123456789"
- assertEquals(9, value.getLength());
- // Position should be 11 right after "123456789ab"
- assertEquals(11, reader.getPos());
- assertFalse(reader.next(key, value));
- assertEquals(11, reader.getPos());
+ delimiter = "|+|";
+ recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+ // walking over the buffer and split sizes checks for proper processing
+ // of the ambiguous bytes of the delimiter
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ split = new FileSplit(inputFile, 0, bufferSize, (String[]) null);
+ reader = new LineRecordReader(conf, split, recordDelimiterBytes);
+ // Get first record: "abcd|efgh" always possible
+ assertTrue("Expected record got nothing", reader.next(key, value));
+ assertTrue("abcd|efgh".equals(value.toString()));
+ assertEquals("Wrong position after record read", 9, value.getLength());
+ // Position should be 12 right after "|+|"
+ int recordPos = 12;
+ assertEquals("Wrong position after record read", recordPos,
+ reader.getPos());
+ // get the next record: "ij|kl" if the split/buffer allows it
+ if (reader.next(key, value)) {
+ // check the record info: "ij|kl"
+ assertTrue("ij|kl".equals(value.toString()));
+ // Position should be 20 right after "|+|"
+ recordPos = 20;
+ assertEquals("Wrong position after record read", recordPos,
+ reader.getPos());
+ }
+ // get the third record: "mno|pqr" if the split/buffer allows it
+ if (reader.next(key, value)) {
+ // check the record info: "mno|pqr"
+ assertTrue("mno|pqr".equals(value.toString()));
+ // Position should be 27 at the end of the string now
+ recordPos = inputData.length();
+ assertEquals("Wrong position after record read", recordPos,
+ reader.getPos());
+ }
+ // no more records can be read we should still be at the last position
+ assertFalse("Unexpected record returned", reader.next(key, value));
+ assertEquals("Wrong position after record read", recordPos,
+ reader.getPos());
+ reader.close();
+ }
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
index dfe8b5d..6819af7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.lib.input;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
@@ -320,16 +321,76 @@ public class TestLineRecordReader {
@Test
public void testUncompressedInput() throws Exception {
Configuration conf = new Configuration();
- String inputData = "abc+++def+++ghi+++"
- + "jkl+++mno+++pqr+++stu+++vw +++xyz";
+ // single char delimiter, best case
+ String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz";
Path inputFile = createInputFile(conf, inputData);
- conf.set("textinputformat.record.delimiter", "+++");
+ conf.set("textinputformat.record.delimiter", "+");
for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
+ // multi char delimiter, best case
+ inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz";
+ inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "|+|");
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ // single char delimiter with empty records
+ inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz";
+ inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "+");
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ // multi char delimiter with empty records
+ inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz";
+ inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "|+|");
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ // multi char delimiter with starting part of the delimiter in the data
+ inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz";
+ inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "+-");
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ // multi char delimiter with newline as start of the delimiter
+ inputData = "abc\n+def\n+ghi\n+jkl\n+mno";
+ inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "\n+");
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
+ // multi char delimiter with newline in delimiter and in data
+ inputData = "abc\ndef+\nghi+\njkl\nmno";
+ inputFile = createInputFile(conf, inputData);
+ conf.set("textinputformat.record.delimiter", "+\n");
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ conf.setInt("io.file.buffer.size", bufferSize);
+ testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
+ }
+ }
}
@Test
@@ -349,91 +410,145 @@ public class TestLineRecordReader {
public void testUncompressedInputCustomDelimiterPosValue()
throws Exception {
Configuration conf = new Configuration();
- String inputData = "1234567890ab12ab345";
- Path inputFile = createInputFile(conf, inputData);
conf.setInt("io.file.buffer.size", 10);
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
- String delimiter = "ab";
+ String inputData = "abcdefghij++kl++mno";
+ Path inputFile = createInputFile(conf, inputData);
+ String delimiter = "++";
byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
- FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
+ int splitLength = 15;
+ FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[])null);
TaskAttemptContext context = new TaskAttemptContextImpl(conf,
new TaskAttemptID());
LineRecordReader reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
- LongWritable key;
- Text value;
- reader.nextKeyValue();
- key = reader.getCurrentKey();
- value = reader.getCurrentValue();
- // Get first record:"1234567890"
- assertEquals(10, value.getLength());
- assertEquals(0, key.get());
- reader.nextKeyValue();
- // Get second record:"12"
- assertEquals(2, value.getLength());
- // Key should be 12 right after "1234567890ab"
- assertEquals(12, key.get());
- reader.nextKeyValue();
- // Get third record:"345"
- assertEquals(3, value.getLength());
- // Key should be 16 right after "1234567890ab12ab"
- assertEquals(16, key.get());
+ // Get first record: "abcdefghij"
+ assertTrue("Expected record got nothing", reader.nextKeyValue());
+ LongWritable key = reader.getCurrentKey();
+ Text value = reader.getCurrentValue();
+ assertEquals("Wrong length for record value", 10, value.getLength());
+ assertEquals("Wrong position after record read", 0, key.get());
+ // Get second record: "kl"
+ assertTrue("Expected record got nothing", reader.nextKeyValue());
+ assertEquals("Wrong length for record value", 2, value.getLength());
+ // Key should be 12 right after "abcdefghij++"
+ assertEquals("Wrong position after record read", 12, key.get());
+ // Get third record: "mno"
+ assertTrue("Expected record got nothing", reader.nextKeyValue());
+ assertEquals("Wrong length for record value", 3, value.getLength());
+ // Key should be 16 right after "abcdefghij++kl++"
+ assertEquals("Wrong position after record read", 16, key.get());
assertFalse(reader.nextKeyValue());
- // Key should be 19 right after "1234567890ab12ab345"
- assertEquals(19, key.get());
-
- split = new FileSplit(inputFile, 15, 4, (String[])null);
+ // Key should be 19 right after "abcdefghij++kl++mno"
+ assertEquals("Wrong position after record read", 19, key.get());
+ // after refresh should be empty
+ key = reader.getCurrentKey();
+ assertNull("Unexpected key returned", key);
+ reader.close();
+ split = new FileSplit(inputFile, splitLength,
+ inputData.length() - splitLength, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
// No record is in the second split because the second split dropped
// the first record, which was already reported by the first split.
- assertFalse(reader.nextKeyValue());
+ assertFalse("Unexpected record returned", reader.nextKeyValue());
+ key = reader.getCurrentKey();
+ assertNull("Unexpected key returned", key);
+ reader.close();
- inputData = "123456789aab";
+ // multi char delimiter with starting part of the delimiter in the data
+ inputData = "abcd+efgh++ijk++mno";
inputFile = createInputFile(conf, inputData);
- split = new FileSplit(inputFile, 0, 12, (String[])null);
+ splitLength = 5;
+ split = new FileSplit(inputFile, 0, splitLength, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
- reader.nextKeyValue();
+ // Get first record: "abcd+efgh"
+ assertTrue("Expected record got nothing", reader.nextKeyValue());
key = reader.getCurrentKey();
value = reader.getCurrentValue();
- // Get first record:"123456789a"
- assertEquals(10, value.getLength());
- assertEquals(0, key.get());
+ assertEquals("Wrong position after record read", 0, key.get());
+ assertEquals("Wrong length for record value", 9, value.getLength());
+ // should have jumped over the delimiter, no record
assertFalse(reader.nextKeyValue());
- // Key should be 12 right after "123456789aab"
- assertEquals(12, key.get());
-
- inputData = "123456789a";
- inputFile = createInputFile(conf, inputData);
- split = new FileSplit(inputFile, 0, 10, (String[])null);
+ assertEquals("Wrong position after record read", 11, key.get());
+ // after refresh should be empty
+ key = reader.getCurrentKey();
+ assertNull("Unexpected key returned", key);
+ reader.close();
+ // next split: check for duplicate or dropped records
+ split = new FileSplit(inputFile, splitLength,
+ inputData.length () - splitLength, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
- reader.nextKeyValue();
+ assertTrue("Expected record got nothing", reader.nextKeyValue());
key = reader.getCurrentKey();
value = reader.getCurrentValue();
- // Get first record:"123456789a"
- assertEquals(10, value.getLength());
- assertEquals(0, key.get());
+ // Get second record: "ijk" first in this split
+ assertEquals("Wrong position after record read", 11, key.get());
+ assertEquals("Wrong length for record value", 3, value.getLength());
+ // Get third record: "mno" second in this split
+ assertTrue("Expected record got nothing", reader.nextKeyValue());
+ assertEquals("Wrong position after record read", 16, key.get());
+ assertEquals("Wrong length for record value", 3, value.getLength());
+ // should be at the end of the input
assertFalse(reader.nextKeyValue());
- // Key should be 10 right after "123456789a"
- assertEquals(10, key.get());
+ assertEquals("Wrong position after record read", 19, key.get());
+ reader.close();
- inputData = "123456789ab";
+ inputData = "abcd|efgh|+|ij|kl|+|mno|pqr";
inputFile = createInputFile(conf, inputData);
- split = new FileSplit(inputFile, 0, 11, (String[])null);
- reader = new LineRecordReader(recordDelimiterBytes);
- reader.initialize(split, context);
- reader.nextKeyValue();
- key = reader.getCurrentKey();
- value = reader.getCurrentValue();
- // Get first record:"123456789"
- assertEquals(9, value.getLength());
- assertEquals(0, key.get());
- assertFalse(reader.nextKeyValue());
- // Key should be 11 right after "123456789ab"
- assertEquals(11, key.get());
+ delimiter = "|+|";
+ recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+ // walking over the buffer and split sizes checks for proper processing
+ // of the ambiguous bytes of the delimiter
+ for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+ for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+ // track where we are in the inputdata
+ int keyPosition = 0;
+ conf.setInt("io.file.buffer.size", bufferSize);
+ split = new FileSplit(inputFile, 0, bufferSize, (String[]) null);
+ reader = new LineRecordReader(recordDelimiterBytes);
+ reader.initialize(split, context);
+ // Get the first record: "abcd|efgh" always possible
+ assertTrue("Expected record got nothing", reader.nextKeyValue());
+ key = reader.getCurrentKey();
+ value = reader.getCurrentValue();
+ assertTrue("abcd|efgh".equals(value.toString()));
+ // Position should be 0 right at the start
+ assertEquals("Wrong position after record read", keyPosition,
+ key.get());
+ // Position should be 12 right after the first "|+|"
+ keyPosition = 12;
+ // get the next record: "ij|kl" if the split/buffer allows it
+ if (reader.nextKeyValue()) {
+ // check the record info: "ij|kl"
+ assertTrue("ij|kl".equals(value.toString()));
+ assertEquals("Wrong position after record read", keyPosition,
+ key.get());
+ // Position should be 20 after the second "|+|"
+ keyPosition = 20;
+ }
+ // get the third record: "mno|pqr" if the split/buffer allows it
+ if (reader.nextKeyValue()) {
+ // check the record info: "mno|pqr"
+ assertTrue("mno|pqr".equals(value.toString()));
+ assertEquals("Wrong position after record read", keyPosition,
+ key.get());
+ // Position should be the end of the input
+ keyPosition = inputData.length();
+ }
+ assertFalse("Unexpected record returned", reader.nextKeyValue());
+ // no more records can be read we should be at the last position
+ assertEquals("Wrong position after record read", keyPosition,
+ key.get());
+ // after refresh should be empty
+ key = reader.getCurrentKey();
+ assertNull("Unexpected key returned", key);
+ reader.close();
+ }
+ }
}
@Test