You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tu...@apache.org on 2012/08/16 01:15:10 UTC
svn commit: r1373672 - in /hadoop/common/branches/branch-1: ./ src/mapred/
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: tucu
Date: Wed Aug 15 23:15:09 2012
New Revision: 1373672
URL: http://svn.apache.org/viewvc?rev=1373672&view=rev
Log:
MAPREDUCE-4511. Add IFile readahead (ahmed via tucu)
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1373672&r1=1373671&r2=1373672&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Wed Aug 15 23:15:09 2012
@@ -82,6 +82,8 @@ Release 1.2.0 - unreleased
HADOOP-8656 Backport forced daemon shutdown of HADOOP-8353 into branch-1
(Roman Shaposhnik via stevel)
+ MAPREDUCE-4511. Add IFile readahead (ahmed via tucu)
+
OPTIMIZATIONS
HDFS-2533. Backport: Remove needless synchronization on some FSDataSet
Modified: hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/mapred-default.xml?rev=1373672&r1=1373671&r2=1373672&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-1/src/mapred/mapred-default.xml Wed Aug 15 23:15:09 2012
@@ -980,7 +980,21 @@
acceptable.
</description>
</property>
-
+
+ <property>
+ <name>mapreduce.ifile.readahead</name>
+ <value>true</value>
+ <description>Configuration key to enable/disable IFile readahead.
+ </description>
+ </property>
+
+ <property>
+ <name>mapreduce.ifile.readahead.bytes</name>
+ <value>4194304</value>
+ <description>Configuration key to set the IFile readahead length in bytes.
+ </description>
+ </property>
+
<!-- Job Notification Configuration -->
<!--
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=1373672&r1=1373671&r2=1373672&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java Wed Aug 15 23:15:09 2012
@@ -292,7 +292,7 @@ class IFile {
CompressionCodec codec,
Counters.Counter readsCounter) throws IOException {
readRecordsCounter = readsCounter;
- checksumIn = new IFileInputStream(in,length);
+ checksumIn = new IFileInputStream(in,length, conf);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
this.in = codec.createInputStream(checksumIn, decompressor);
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java?rev=1373672&r1=1373671&r2=1373672&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java Wed Aug 15 23:15:09 2012
@@ -19,11 +19,20 @@
package org.apache.hadoop.mapred;
import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.util.DataChecksum;
/**
* A checksum input stream, used for IFiles.
@@ -32,7 +41,8 @@ import org.apache.hadoop.util.DataChecks
class IFileInputStream extends InputStream {
- private final InputStream in; //The input stream to be verified for checksum.
+ private final InputStream in; //The input stream to be verified for checksum.
+ private final FileDescriptor inFd; // the file descriptor, if it is known
private final long length; //The total length of the input file
private final long dataLength;
private DataChecksum sum;
@@ -40,19 +50,66 @@ class IFileInputStream extends InputStre
private final byte b[] = new byte[1];
private byte csum[] = null;
private int checksumSize;
-
+
+ private ReadaheadRequest curReadahead = null;
+ private ReadaheadPool raPool = ReadaheadPool.getInstance();
+ private boolean readahead;
+ private int readaheadLength;
+
+ /**
+ * Configuration key to enable/disable IFile readahead.
+ */
+ public static final String MAPRED_IFILE_READAHEAD =
+ "mapreduce.ifile.readahead";
+
+ public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
+
+ /**
+ * Configuration key to set the IFile readahead length in bytes.
+ */
+ public static final String MAPRED_IFILE_READAHEAD_BYTES =
+ "mapreduce.ifile.readahead.bytes";
+
+ public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
+ 4 * 1024 * 1024;
+
+ public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
+
/**
* Create a checksum input stream that reads
* @param in The input stream to be verified for checksum.
* @param len The length of the input stream including checksum bytes.
*/
- public IFileInputStream(InputStream in, long len) {
+ public IFileInputStream(InputStream in, long len, Configuration conf) {
this.in = in;
+ this.inFd = getFileDescriptorIfAvail(in);
sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
Integer.MAX_VALUE);
checksumSize = sum.getChecksumSize();
length = len;
dataLength = length - checksumSize;
+
+ conf = (conf != null) ? conf : new Configuration();
+ readahead = conf.getBoolean(MAPRED_IFILE_READAHEAD,
+ DEFAULT_MAPRED_IFILE_READAHEAD);
+ readaheadLength = conf.getInt(MAPRED_IFILE_READAHEAD_BYTES,
+ DEFAULT_MAPRED_IFILE_READAHEAD_BYTES);
+
+ doReadahead();
+ }
+
+ private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
+ FileDescriptor fd = null;
+ try {
+ if (in instanceof HasFileDescriptor) {
+ fd = ((HasFileDescriptor)in).getFileDescriptor();
+ } else if (in instanceof FileInputStream) {
+ fd = ((FileInputStream)in).getFD();
+ }
+ } catch (IOException e) {
+ LOG.info("Unable to determine FileDescriptor", e);
+ }
+ return fd;
}
/**
@@ -61,6 +118,10 @@ class IFileInputStream extends InputStre
*/
@Override
public void close() throws IOException {
+
+ if (curReadahead != null) {
+ curReadahead.cancel();
+ }
if (currentOffset < dataLength) {
byte[] t = new byte[Math.min((int)
(Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
@@ -97,10 +158,21 @@ class IFileInputStream extends InputStre
if (currentOffset >= dataLength) {
return -1;
}
-
+
+ doReadahead();
+
return doRead(b,off,len);
}
+ private void doReadahead() {
+ if (raPool != null && inFd != null && readahead) {
+ curReadahead = raPool.readaheadStream(
+ "ifile", inFd,
+ currentOffset, readaheadLength, dataLength,
+ curReadahead);
+ }
+ }
+
/**
* Read bytes from the stream.
* At EOF, checksum is validated and sent back
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1373672&r1=1373671&r2=1373672&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed Aug 15 23:15:09 2012
@@ -1684,7 +1684,7 @@ class ReduceTask extends Task {
}
IFileInputStream checksumIn =
- new IFileInputStream(input,compressedLength);
+ new IFileInputStream(input,compressedLength, conf);
input = checksumIn;
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java?rev=1373672&r1=1373671&r2=1373672&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java Wed Aug 15 23:15:09 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.mapred;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -35,7 +36,7 @@ public class TestIFileStreams extends Te
ifos.close();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), DLEN + 4);
- IFileInputStream ifis = new IFileInputStream(dib, 104);
+ IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
for (int i = 0; i < DLEN; ++i) {
assertEquals(i, ifis.read());
}
@@ -54,7 +55,7 @@ public class TestIFileStreams extends Te
final byte[] b = dob.getData();
++b[17];
dib.reset(b, DLEN + 4);
- IFileInputStream ifis = new IFileInputStream(dib, 104);
+ IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
int i = 0;
try {
while (i < DLEN) {
@@ -83,7 +84,7 @@ public class TestIFileStreams extends Te
ifos.close();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), DLEN + 4);
- IFileInputStream ifis = new IFileInputStream(dib, 100);
+ IFileInputStream ifis = new IFileInputStream(dib, 100, new Configuration());
int i = 0;
try {
while (i < DLEN - 8) {