You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cl...@apache.org on 2014/05/30 00:27:33 UTC
svn commit: r1598435 - in
/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop...
Author: clamb
Date: Thu May 29 22:27:25 2014
New Revision: 1598435
URL: http://svn.apache.org/r1598435
Log:
merge from trunk r1598430
Added:
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt
- copied unchanged from r1598430, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt.bz2
- copied unchanged from r1598430, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt.bz2
Modified:
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/conf/ (props changed)
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed)
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1596816-1598430
Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt?rev=1598435&r1=1598434&r2=1598435&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt Thu May 29 22:27:25 2014
@@ -202,6 +202,8 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5809. Enhance distcp to support preserving HDFS ACLs. (cnauroth)
+ MAPREDUCE-5899. Support incremental data copy in DistCp. (jing9)
+
OPTIMIZATIONS
BUG FIXES
@@ -239,6 +241,12 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5309. 2.0.4 JobHistoryParser can't parse certain failed job
history files generated by 2.0.3 history server (Rushabh S Shah via jlowe)
+ MAPREDUCE-5862. Line records longer than 2x split size aren't handled
+ correctly (bc Wong via jlowe)
+
+ MAPREDUCE-5895. Close streams properly to avoid leakage in TaskLog.
+ (Kousuke Saruta via devaraj)
+
Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1596816-1598430
Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1596816-1598430
Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1598435&r1=1598434&r2=1598435&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Thu May 29 22:27:25 2014
@@ -85,6 +85,15 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=1598435&r1=1598434&r2=1598435&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java Thu May 29 22:27:25 2014
@@ -184,7 +184,7 @@ public class LineRecordReader implements
private int maxBytesToConsume(long pos) {
return isCompressedInput()
? Integer.MAX_VALUE
- : (int) Math.min(Integer.MAX_VALUE, end - pos);
+ : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
}
private long getFilePosition() throws IOException {
@@ -206,8 +206,7 @@ public class LineRecordReader implements
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
key.set(pos);
- int newSize = in.readLine(value, maxLineLength,
- Math.max(maxBytesToConsume(pos), maxLineLength));
+ int newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
if (newSize == 0) {
return false;
}
Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1598435&r1=1598434&r2=1598435&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Thu May 29 22:27:25 2014
@@ -199,16 +199,18 @@ public class TaskLog {
// file first and then rename.
File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup);
- BufferedOutputStream bos =
- new BufferedOutputStream(
- SecureIOUtils.createForWrite(tmpIndexFile, 0644));
- DataOutputStream dos = new DataOutputStream(bos);
- //the format of the index file is
- //LOG_DIR: <the dir where the task logs are really stored>
- //STDOUT: <start-offset in the stdout file> <length>
- //STDERR: <start-offset in the stderr file> <length>
- //SYSLOG: <start-offset in the syslog file> <length>
+ BufferedOutputStream bos = null;
+ DataOutputStream dos = null;
try{
+ bos = new BufferedOutputStream(
+ SecureIOUtils.createForWrite(tmpIndexFile, 0644));
+ dos = new DataOutputStream(bos);
+ //the format of the index file is
+ //LOG_DIR: <the dir where the task logs are really stored>
+ //STDOUT: <start-offset in the stdout file> <length>
+ //STDERR: <start-offset in the stderr file> <length>
+ //SYSLOG: <start-offset in the syslog file> <length>
+
dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n"
+ LogName.STDOUT.toString() + ":");
dos.writeBytes(Long.toString(prevOutLength) + " ");
@@ -225,8 +227,10 @@ public class TaskLog {
+ "\n");
dos.close();
dos = null;
+ bos.close();
+ bos = null;
} finally {
- IOUtils.cleanup(LOG, dos);
+ IOUtils.cleanup(LOG, dos, bos);
}
File indexFile = getIndexFile(currentTaskid, isCleanup);
Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=1598435&r1=1598434&r2=1598435&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Thu May 29 22:27:25 2014
@@ -121,7 +121,7 @@ public class LineRecordReader extends Re
private int maxBytesToConsume(long pos) {
return isCompressedInput
? Integer.MAX_VALUE
- : (int) Math.min(Integer.MAX_VALUE, end - pos);
+ : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
}
private long getFilePosition() throws IOException {
@@ -146,8 +146,7 @@ public class LineRecordReader extends Re
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
- newSize = in.readLine(value, maxLineLength,
- Math.max(maxBytesToConsume(pos), maxLineLength));
+ newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
if (newSize < maxLineLength) {
break;
Propchange: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1596816-1598430
Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm?rev=1598435&r1=1598434&r2=1598435&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm Thu May 29 22:27:25 2014
@@ -18,8 +18,6 @@
Hadoop MapReduce Next Generation - Distributed Cache Deploy
- \[ {{{./index.html}Go Back}} \]
-
* Introduction
The MapReduce application framework has rudimentary support for deploying a
Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm?rev=1598435&r1=1598434&r2=1598435&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm Thu May 29 22:27:25 2014
@@ -18,8 +18,6 @@
Hadoop MapReduce Next Generation - Encrypted Shuffle
- \[ {{{./index.html}Go Back}} \]
-
* {Introduction}
The Encrypted Shuffle capability allows encryption of the MapReduce shuffle
Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm?rev=1598435&r1=1598434&r2=1598435&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm Thu May 29 22:27:25 2014
@@ -18,8 +18,6 @@
Apache Hadoop MapReduce - Migrating from Apache Hadoop 1.x to Apache Hadoop 2.x
- \[ {{{../../hadoop-yarn/hadoop-yarn-site/index.html}Go Back}} \]
-
* {Introduction}
This document provides information for users to migrate their Apache Hadoop
Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm?rev=1598435&r1=1598434&r2=1598435&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm Thu May 29 22:27:25 2014
@@ -18,8 +18,6 @@
Hadoop MapReduce Next Generation - Pluggable Shuffle and Pluggable Sort
- \[ {{{./index.html}Go Back}} \]
-
* Introduction
The pluggable shuffle and pluggable sort capabilities allow replacing the
Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java?rev=1598435&r1=1598434&r2=1598435&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java Thu May 29 22:27:25 2014
@@ -23,9 +23,12 @@ import static org.junit.Assert.assertNot
import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
@@ -97,4 +100,92 @@ public class TestLineRecordReader {
// character is a linefeed
testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
}
+
+ // Use the LineRecordReader to read records from the file
+ public ArrayList<String> readRecords(URL testFileUrl, int splitSize)
+ throws IOException {
+
+ // Set up context
+ File testFile = new File(testFileUrl.getFile());
+ long testFileSize = testFile.length();
+ Path testFilePath = new Path(testFile.getAbsolutePath());
+ Configuration conf = new Configuration();
+ conf.setInt("io.file.buffer.size", 1);
+
+ // Gather the records returned by the record reader
+ ArrayList<String> records = new ArrayList<String>();
+
+ long offset = 0;
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ while (offset < testFileSize) {
+ FileSplit split =
+ new FileSplit(testFilePath, offset, splitSize, (String[]) null);
+ LineRecordReader reader = new LineRecordReader(conf, split);
+
+ while (reader.next(key, value)) {
+ records.add(value.toString());
+ }
+ offset += splitSize;
+ }
+ return records;
+ }
+
+ // Gather the records by just splitting on new lines
+ public String[] readRecordsDirectly(URL testFileUrl, boolean bzip)
+ throws IOException {
+ int MAX_DATA_SIZE = 1024 * 1024;
+ byte[] data = new byte[MAX_DATA_SIZE];
+ FileInputStream fis = new FileInputStream(testFileUrl.getFile());
+ int count;
+ if (bzip) {
+ BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis);
+ count = bzIn.read(data);
+ bzIn.close();
+ } else {
+ count = fis.read(data);
+ }
+ fis.close();
+ assertTrue("Test file data too big for buffer", count < data.length);
+ return new String(data, 0, count, "UTF-8").split("\n");
+ }
+
+ public void checkRecordSpanningMultipleSplits(String testFile,
+ int splitSize,
+ boolean bzip)
+ throws IOException {
+ URL testFileUrl = getClass().getClassLoader().getResource(testFile);
+ ArrayList<String> records = readRecords(testFileUrl, splitSize);
+ String[] actuals = readRecordsDirectly(testFileUrl, bzip);
+
+ assertEquals("Wrong number of records", actuals.length, records.size());
+
+ boolean hasLargeRecord = false;
+ for (int i = 0; i < actuals.length; ++i) {
+ assertEquals(actuals[i], records.get(i));
+ if (actuals[i].length() > 2 * splitSize) {
+ hasLargeRecord = true;
+ }
+ }
+
+ assertTrue("Invalid test data. Doesn't have a large enough record",
+ hasLargeRecord);
+ }
+
+ @Test
+ public void testRecordSpanningMultipleSplits()
+ throws IOException {
+ checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt",
+ 10, false);
+ }
+
+ @Test
+ public void testRecordSpanningMultipleSplitsCompressed()
+ throws IOException {
+ // The file is generated with bz2 block size of 100k. The split size
+ // needs to be larger than that for the CompressedSplitLineReader to
+ // work.
+ checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2",
+ 200 * 1000, true);
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java?rev=1598435&r1=1598434&r2=1598435&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java Thu May 29 22:27:25 2014
@@ -23,9 +23,12 @@ import static org.junit.Assert.assertNot
import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -101,4 +104,93 @@ public class TestLineRecordReader {
// character is a linefeed
testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
}
+
+ // Use the LineRecordReader to read records from the file
+ public ArrayList<String> readRecords(URL testFileUrl, int splitSize)
+ throws IOException {
+
+ // Set up context
+ File testFile = new File(testFileUrl.getFile());
+ long testFileSize = testFile.length();
+ Path testFilePath = new Path(testFile.getAbsolutePath());
+ Configuration conf = new Configuration();
+ conf.setInt("io.file.buffer.size", 1);
+ TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
+
+ // Gather the records returned by the record reader
+ ArrayList<String> records = new ArrayList<String>();
+
+ long offset = 0;
+ while (offset < testFileSize) {
+ FileSplit split = new FileSplit(testFilePath, offset, splitSize, null);
+ LineRecordReader reader = new LineRecordReader();
+ reader.initialize(split, context);
+
+ while (reader.nextKeyValue()) {
+ records.add(reader.getCurrentValue().toString());
+ }
+ offset += splitSize;
+ }
+ return records;
+ }
+
+ // Gather the records by just splitting on new lines
+ public String[] readRecordsDirectly(URL testFileUrl, boolean bzip)
+ throws IOException {
+ int MAX_DATA_SIZE = 1024 * 1024;
+ byte[] data = new byte[MAX_DATA_SIZE];
+ FileInputStream fis = new FileInputStream(testFileUrl.getFile());
+ int count;
+ if (bzip) {
+ BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis);
+ count = bzIn.read(data);
+ bzIn.close();
+ } else {
+ count = fis.read(data);
+ }
+ fis.close();
+ assertTrue("Test file data too big for buffer", count < data.length);
+ return new String(data, 0, count, "UTF-8").split("\n");
+ }
+
+ public void checkRecordSpanningMultipleSplits(String testFile,
+ int splitSize,
+ boolean bzip)
+ throws IOException {
+ URL testFileUrl = getClass().getClassLoader().getResource(testFile);
+ ArrayList<String> records = readRecords(testFileUrl, splitSize);
+ String[] actuals = readRecordsDirectly(testFileUrl, bzip);
+
+ assertEquals("Wrong number of records", actuals.length, records.size());
+
+ boolean hasLargeRecord = false;
+ for (int i = 0; i < actuals.length; ++i) {
+ assertEquals(actuals[i], records.get(i));
+ if (actuals[i].length() > 2 * splitSize) {
+ hasLargeRecord = true;
+ }
+ }
+
+ assertTrue("Invalid test data. Doesn't have a large enough record",
+ hasLargeRecord);
+ }
+
+ @Test
+ public void testRecordSpanningMultipleSplits()
+ throws IOException {
+ checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt",
+ 10,
+ false);
+ }
+
+ @Test
+ public void testRecordSpanningMultipleSplitsCompressed()
+ throws IOException {
+ // The file is generated with bz2 block size of 100k. The split size
+ // needs to be larger than that for the CompressedSplitLineReader to
+ // work.
+ checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2",
+ 200 * 1000,
+ true);
+ }
}