You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/23 05:40:48 UTC
[14/50] [abbrv] hadoop git commit: MAPREDUCE-6481. LineRecordReader
may give incomplete record and wrong position/key information for
uncompressed input sometimes. Contributed by Zhihai Xu
MAPREDUCE-6481. LineRecordReader may give incomplete record and wrong position/key information for uncompressed input sometimes. Contributed by Zhihai Xu
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58d1a02b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58d1a02b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58d1a02b
Branch: refs/heads/HDFS-7285
Commit: 58d1a02b8d66b1d2a6ac2158be32bd35ad2e69bd
Parents: 6c6e734
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Sep 17 14:30:18 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Sep 17 14:30:18 2015 +0000
----------------------------------------------------------------------
.../java/org/apache/hadoop/util/LineReader.java | 17 +-
hadoop-mapreduce-project/CHANGES.txt | 4 +
.../lib/input/UncompressedSplitLineReader.java | 31 +---
.../hadoop/mapred/TestLineRecordReader.java | 138 ++++++++++++++++
.../lib/input/TestLineRecordReader.java | 161 +++++++++++++++++++
5 files changed, 316 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d1a02b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
index 1d1b569..900215a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
@@ -303,7 +303,10 @@ public class LineReader implements Closeable {
startPosn = bufferPosn = 0;
bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
if (bufferLength <= 0) {
- str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+ if (ambiguousByteCount > 0) {
+ str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+ bytesConsumed += ambiguousByteCount;
+ }
break; // EOF
}
}
@@ -325,13 +328,13 @@ public class LineReader implements Closeable {
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
+ bytesConsumed += ambiguousByteCount;
+ if (appendLength >= 0 && ambiguousByteCount > 0) {
+ //appending the ambiguous characters (refer case 2.2)
+ str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+ ambiguousByteCount = 0;
+ }
if (appendLength > 0) {
- if (ambiguousByteCount > 0) {
- str.append(recordDelimiterBytes, 0, ambiguousByteCount);
- //appending the ambiguous characters (refer case 2.2)
- bytesConsumed += ambiguousByteCount;
- ambiguousByteCount=0;
- }
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d1a02b/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 669fee5..cde6d92 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -566,6 +566,10 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6452. NPE when intermediate encrypt enabled for LocalRunner.
(Zhihai Xu)
+ MAPREDUCE-6481. LineRecordReader may give incomplete record and wrong
+ position/key information for uncompressed input sometimes. (Zhihai Xu via
+ jlowe)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d1a02b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
index 52fb7b0..38491b0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
@@ -40,8 +40,6 @@ public class UncompressedSplitLineReader extends SplitLineReader {
private long totalBytesRead = 0;
private boolean finished = false;
private boolean usingCRLF;
- private int unusedBytes = 0;
- private int lastBytesRead = 0;
public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf,
byte[] recordDelimiterBytes, long splitLength) throws IOException {
@@ -59,7 +57,6 @@ public class UncompressedSplitLineReader extends SplitLineReader {
(int)(splitLength - totalBytesRead));
}
int bytesRead = in.read(buffer, 0, maxBytesToRead);
- lastBytesRead = bytesRead;
// If the split ended in the middle of a record delimiter then we need
// to read one additional record, as the consumer of the next split will
@@ -83,39 +80,17 @@ public class UncompressedSplitLineReader extends SplitLineReader {
@Override
public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
- long bytesRead = 0;
+ int bytesRead = 0;
if (!finished) {
// only allow at most one more record to be read after the stream
// reports the split ended
if (totalBytesRead > splitLength) {
finished = true;
}
- bytesRead = totalBytesRead;
- int bytesConsumed = super.readLine(str, maxLineLength, maxBytesToConsume);
- bytesRead = totalBytesRead - bytesRead;
- // No records left.
- if (bytesConsumed == 0 && bytesRead == 0) {
- return 0;
- }
-
- int bufferSize = getBufferSize();
-
- // Add the remaining buffer size not used for the last call
- // of fillBuffer method.
- if (lastBytesRead <= 0) {
- bytesRead += bufferSize;
- } else if (bytesRead > 0) {
- bytesRead += bufferSize - lastBytesRead;
- }
-
- // Adjust the size of the buffer not used for this record.
- // The size is carried over for the next calculation.
- bytesRead += unusedBytes;
- unusedBytes = bufferSize - getBufferPosn();
- bytesRead -= unusedBytes;
+ bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
}
- return (int) bytesRead;
+ return bytesRead;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d1a02b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
index a5c9933..d33a614 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -357,4 +358,141 @@ public class TestLineRecordReader {
}
}
}
+
+ @Test
+ public void testUncompressedInputCustomDelimiterPosValue()
+ throws Exception {
+ Configuration conf = new Configuration();
+ String inputData = "1234567890ab12ab345";
+ Path inputFile = createInputFile(conf, inputData);
+ conf.setInt("io.file.buffer.size", 10);
+ conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+ LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+ String delimiter = "ab";
+ byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+ FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
+ LineRecordReader reader = new LineRecordReader(conf, split,
+ recordDelimiterBytes);
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ reader.next(key, value);
+ // Get first record:"1234567890"
+ assertEquals(10, value.getLength());
+ // Position should be 12 right after "1234567890ab"
+ assertEquals(12, reader.getPos());
+ reader.next(key, value);
+ // Get second record:"12"
+ assertEquals(2, value.getLength());
+ // Position should be 16 right after "1234567890ab12ab"
+ assertEquals(16, reader.getPos());
+ reader.next(key, value);
+ // Get third record:"345"
+ assertEquals(3, value.getLength());
+ // Position should be 19 right after "1234567890ab12ab345"
+ assertEquals(19, reader.getPos());
+ assertFalse(reader.next(key, value));
+ assertEquals(19, reader.getPos());
+
+ split = new FileSplit(inputFile, 15, 4, (String[])null);
+ reader = new LineRecordReader(conf, split, recordDelimiterBytes);
+ // No record is in the second split because the second split dropped
+ // the first record, which was already reported by the first split.
+ // The position should be 19 right after "1234567890ab12ab345"
+ assertEquals(19, reader.getPos());
+ assertFalse(reader.next(key, value));
+ assertEquals(19, reader.getPos());
+
+ inputData = "123456789aab";
+ inputFile = createInputFile(conf, inputData);
+ split = new FileSplit(inputFile, 0, 12, (String[])null);
+ reader = new LineRecordReader(conf, split, recordDelimiterBytes);
+ reader.next(key, value);
+ // Get first record:"123456789a"
+ assertEquals(10, value.getLength());
+ // Position should be 12 right after "123456789aab"
+ assertEquals(12, reader.getPos());
+ assertFalse(reader.next(key, value));
+ assertEquals(12, reader.getPos());
+
+ inputData = "123456789a";
+ inputFile = createInputFile(conf, inputData);
+ split = new FileSplit(inputFile, 0, 10, (String[])null);
+ reader = new LineRecordReader(conf, split, recordDelimiterBytes);
+ reader.next(key, value);
+ // Get first record:"123456789a"
+ assertEquals(10, value.getLength());
+ // Position should be 10 right after "123456789a"
+ assertEquals(10, reader.getPos());
+ assertFalse(reader.next(key, value));
+ assertEquals(10, reader.getPos());
+
+ inputData = "123456789ab";
+ inputFile = createInputFile(conf, inputData);
+ split = new FileSplit(inputFile, 0, 11, (String[])null);
+ reader = new LineRecordReader(conf, split, recordDelimiterBytes);
+ reader.next(key, value);
+ // Get first record:"123456789"
+ assertEquals(9, value.getLength());
+ // Position should be 11 right after "123456789ab"
+ assertEquals(11, reader.getPos());
+ assertFalse(reader.next(key, value));
+ assertEquals(11, reader.getPos());
+ }
+
+ @Test
+ public void testUncompressedInputDefaultDelimiterPosValue()
+ throws Exception {
+ Configuration conf = new Configuration();
+ String inputData = "1234567890\r\n12\r\n345";
+ Path inputFile = createInputFile(conf, inputData);
+ conf.setInt("io.file.buffer.size", 10);
+ conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+ LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+ FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
+ LineRecordReader reader = new LineRecordReader(conf, split,
+ null);
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ reader.next(key, value);
+ // Get first record:"1234567890"
+ assertEquals(10, value.getLength());
+ // Position should be 12 right after "1234567890\r\n"
+ assertEquals(12, reader.getPos());
+ reader.next(key, value);
+ // Get second record:"12"
+ assertEquals(2, value.getLength());
+ // Position should be 16 right after "1234567890\r\n12\r\n"
+ assertEquals(16, reader.getPos());
+ assertFalse(reader.next(key, value));
+
+ split = new FileSplit(inputFile, 15, 4, (String[])null);
+ reader = new LineRecordReader(conf, split, null);
+ // The second split dropped the first record "\n"
+ // The position should be 16 right after "1234567890\r\n12\r\n"
+ assertEquals(16, reader.getPos());
+ reader.next(key, value);
+ // Get third record:"345"
+ assertEquals(3, value.getLength());
+ // Position should be 19 right after "1234567890\r\n12\r\n345"
+ assertEquals(19, reader.getPos());
+ assertFalse(reader.next(key, value));
+ assertEquals(19, reader.getPos());
+
+ inputData = "123456789\r\r\n";
+ inputFile = createInputFile(conf, inputData);
+ split = new FileSplit(inputFile, 0, 12, (String[])null);
+ reader = new LineRecordReader(conf, split, null);
+ reader.next(key, value);
+ // Get first record:"123456789"
+ assertEquals(9, value.getLength());
+ // Position should be 10 right after "123456789\r"
+ assertEquals(10, reader.getPos());
+ reader.next(key, value);
+ // Get second record:""
+ assertEquals(0, value.getLength());
+ // Position should be 12 right after "123456789\r\r\n"
+ assertEquals(12, reader.getPos());
+ assertFalse(reader.next(key, value));
+ assertEquals(12, reader.getPos());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d1a02b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
index 3c1f28f..dfe8b5d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.lib.input;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -37,6 +38,8 @@ import org.apache.commons.io.Charsets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.Decompressor;
@@ -341,4 +344,162 @@ public class TestLineRecordReader {
}
}
}
+
+ @Test
+ public void testUncompressedInputCustomDelimiterPosValue()
+ throws Exception {
+ Configuration conf = new Configuration();
+ String inputData = "1234567890ab12ab345";
+ Path inputFile = createInputFile(conf, inputData);
+ conf.setInt("io.file.buffer.size", 10);
+ conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+ LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+ String delimiter = "ab";
+ byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+ FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
+ TaskAttemptContext context = new TaskAttemptContextImpl(conf,
+ new TaskAttemptID());
+ LineRecordReader reader = new LineRecordReader(recordDelimiterBytes);
+ reader.initialize(split, context);
+ LongWritable key;
+ Text value;
+ reader.nextKeyValue();
+ key = reader.getCurrentKey();
+ value = reader.getCurrentValue();
+ // Get first record:"1234567890"
+ assertEquals(10, value.getLength());
+ assertEquals(0, key.get());
+ reader.nextKeyValue();
+ // Get second record:"12"
+ assertEquals(2, value.getLength());
+ // Key should be 12 right after "1234567890ab"
+ assertEquals(12, key.get());
+ reader.nextKeyValue();
+ // Get third record:"345"
+ assertEquals(3, value.getLength());
+ // Key should be 16 right after "1234567890ab12ab"
+ assertEquals(16, key.get());
+ assertFalse(reader.nextKeyValue());
+ // Key should be 19 right after "1234567890ab12ab345"
+ assertEquals(19, key.get());
+
+ split = new FileSplit(inputFile, 15, 4, (String[])null);
+ reader = new LineRecordReader(recordDelimiterBytes);
+ reader.initialize(split, context);
+ // No record is in the second split because the second split dropped
+ // the first record, which was already reported by the first split.
+ assertFalse(reader.nextKeyValue());
+
+ inputData = "123456789aab";
+ inputFile = createInputFile(conf, inputData);
+ split = new FileSplit(inputFile, 0, 12, (String[])null);
+ reader = new LineRecordReader(recordDelimiterBytes);
+ reader.initialize(split, context);
+ reader.nextKeyValue();
+ key = reader.getCurrentKey();
+ value = reader.getCurrentValue();
+ // Get first record:"123456789a"
+ assertEquals(10, value.getLength());
+ assertEquals(0, key.get());
+ assertFalse(reader.nextKeyValue());
+ // Key should be 12 right after "123456789aab"
+ assertEquals(12, key.get());
+
+ inputData = "123456789a";
+ inputFile = createInputFile(conf, inputData);
+ split = new FileSplit(inputFile, 0, 10, (String[])null);
+ reader = new LineRecordReader(recordDelimiterBytes);
+ reader.initialize(split, context);
+ reader.nextKeyValue();
+ key = reader.getCurrentKey();
+ value = reader.getCurrentValue();
+ // Get first record:"123456789a"
+ assertEquals(10, value.getLength());
+ assertEquals(0, key.get());
+ assertFalse(reader.nextKeyValue());
+ // Key should be 10 right after "123456789a"
+ assertEquals(10, key.get());
+
+ inputData = "123456789ab";
+ inputFile = createInputFile(conf, inputData);
+ split = new FileSplit(inputFile, 0, 11, (String[])null);
+ reader = new LineRecordReader(recordDelimiterBytes);
+ reader.initialize(split, context);
+ reader.nextKeyValue();
+ key = reader.getCurrentKey();
+ value = reader.getCurrentValue();
+ // Get first record:"123456789"
+ assertEquals(9, value.getLength());
+ assertEquals(0, key.get());
+ assertFalse(reader.nextKeyValue());
+ // Key should be 11 right after "123456789ab"
+ assertEquals(11, key.get());
+ }
+
+ @Test
+ public void testUncompressedInputDefaultDelimiterPosValue()
+ throws Exception {
+ Configuration conf = new Configuration();
+ String inputData = "1234567890\r\n12\r\n345";
+ Path inputFile = createInputFile(conf, inputData);
+ conf.setInt("io.file.buffer.size", 10);
+ conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+ LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+ FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
+ TaskAttemptContext context = new TaskAttemptContextImpl(conf,
+ new TaskAttemptID());
+ LineRecordReader reader = new LineRecordReader(null);
+ reader.initialize(split, context);
+ LongWritable key;
+ Text value;
+ reader.nextKeyValue();
+ key = reader.getCurrentKey();
+ value = reader.getCurrentValue();
+ // Get first record:"1234567890"
+ assertEquals(10, value.getLength());
+ assertEquals(0, key.get());
+ reader.nextKeyValue();
+ // Get second record:"12"
+ assertEquals(2, value.getLength());
+ // Key should be 12 right after "1234567890\r\n"
+ assertEquals(12, key.get());
+ assertFalse(reader.nextKeyValue());
+ // Key should be 16 right after "1234567890\r\n12\r\n"
+ assertEquals(16, key.get());
+
+ split = new FileSplit(inputFile, 15, 4, (String[])null);
+ reader = new LineRecordReader(null);
+ reader.initialize(split, context);
+ // The second split dropped the first record "\n"
+ reader.nextKeyValue();
+ key = reader.getCurrentKey();
+ value = reader.getCurrentValue();
+ // Get third record:"345"
+ assertEquals(3, value.getLength());
+ // Key should be 16 right after "1234567890\r\n12\r\n"
+ assertEquals(16, key.get());
+ assertFalse(reader.nextKeyValue());
+ // Key should be 19 right after "1234567890\r\n12\r\n345"
+ assertEquals(19, key.get());
+
+ inputData = "123456789\r\r\n";
+ inputFile = createInputFile(conf, inputData);
+ split = new FileSplit(inputFile, 0, 12, (String[])null);
+ reader = new LineRecordReader(null);
+ reader.initialize(split, context);
+ reader.nextKeyValue();
+ key = reader.getCurrentKey();
+ value = reader.getCurrentValue();
+ // Get first record:"123456789"
+ assertEquals(9, value.getLength());
+ assertEquals(0, key.get());
+ reader.nextKeyValue();
+ // Get second record:""
+ assertEquals(0, value.getLength());
+ // Key should be 10 right after "123456789\r"
+ assertEquals(10, key.get());
+ assertFalse(reader.nextKeyValue());
+ // Key should be 12 right after "123456789\r\r\n"
+ assertEquals(12, key.get());
+ }
}