You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tu...@apache.org on 2012/08/16 01:15:10 UTC

svn commit: r1373672 - in /hadoop/common/branches/branch-1: ./ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: tucu
Date: Wed Aug 15 23:15:09 2012
New Revision: 1373672

URL: http://svn.apache.org/viewvc?rev=1373672&view=rev
Log:
MAPREDUCE-4511. Add IFile readahead (ahmed via tucu)

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1373672&r1=1373671&r2=1373672&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Wed Aug 15 23:15:09 2012
@@ -82,6 +82,8 @@ Release 1.2.0 - unreleased
     HADOOP-8656 Backport forced daemon shutdown of HADOOP-8353 into branch-1
     (Roman Shaposhnik via stevel)
 
+    MAPREDUCE-4511. Add IFile readahead (ahmed via tucu)
+
   OPTIMIZATIONS
 
     HDFS-2533. Backport: Remove needless synchronization on some FSDataSet

Modified: hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/mapred-default.xml?rev=1373672&r1=1373671&r2=1373672&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-1/src/mapred/mapred-default.xml Wed Aug 15 23:15:09 2012
@@ -980,7 +980,21 @@
     acceptable.
     </description>
   </property>
-  
+
+  <property>
+    <name>mapreduce.ifile.readahead</name>
+    <value>true</value>
+    <description>Configuration key to enable/disable IFile readahead.
+    </description>
+  </property>
+
+  <property>
+    <name>mapreduce.ifile.readahead.bytes</name>
+    <value>4194304</value>
+    <description>Configuration key to set the IFile readahead length in bytes.
+    </description>
+  </property>
+
 <!-- Job Notification Configuration -->
 
 <!--

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=1373672&r1=1373671&r2=1373672&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java Wed Aug 15 23:15:09 2012
@@ -292,7 +292,7 @@ class IFile {
                   CompressionCodec codec,
                   Counters.Counter readsCounter) throws IOException {
       readRecordsCounter = readsCounter;
-      checksumIn = new IFileInputStream(in,length);
+      checksumIn = new IFileInputStream(in,length, conf);
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
         this.in = codec.createInputStream(checksumIn, decompressor);

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java?rev=1373672&r1=1373671&r2=1373672&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java Wed Aug 15 23:15:09 2012
@@ -19,11 +19,20 @@
 package org.apache.hadoop.mapred;
 
 import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.HasFileDescriptor;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.util.DataChecksum;
 /**
  * A checksum input stream, used for IFiles.
@@ -32,7 +41,8 @@ import org.apache.hadoop.util.DataChecks
 
 class IFileInputStream extends InputStream {
   
-  private final InputStream in; //The input stream to be verified for checksum. 
+  private final InputStream in; //The input stream to be verified for checksum.
+  private final FileDescriptor inFd; // the file descriptor, if it is known
   private final long length; //The total length of the input file
   private final long dataLength;
   private DataChecksum sum;
@@ -40,19 +50,66 @@ class IFileInputStream extends InputStre
   private final byte b[] = new byte[1];
   private byte csum[] = null;
   private int checksumSize;
-  
+
+  private ReadaheadRequest curReadahead = null;
+  private ReadaheadPool raPool = ReadaheadPool.getInstance();
+  private boolean readahead;
+  private int readaheadLength;
+
+  /**
+   * Configuration key to enable/disable IFile readahead.
+   */
+  public static final String MAPRED_IFILE_READAHEAD =
+    "mapreduce.ifile.readahead";
+
+  public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
+
+  /**
+   * Configuration key to set the IFile readahead length in bytes.
+   */
+  public static final String MAPRED_IFILE_READAHEAD_BYTES =
+    "mapreduce.ifile.readahead.bytes";
+
+  public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
+    4 * 1024 * 1024;
+
+  public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
+
   /**
    * Create a checksum input stream that reads
    * @param in The input stream to be verified for checksum.
    * @param len The length of the input stream including checksum bytes.
    */
-  public IFileInputStream(InputStream in, long len) {
+  public IFileInputStream(InputStream in, long len, Configuration conf) {
     this.in = in;
+    this.inFd = getFileDescriptorIfAvail(in);
     sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
         Integer.MAX_VALUE);
     checksumSize = sum.getChecksumSize();
     length = len;
     dataLength = length - checksumSize;
+
+    conf = (conf != null) ? conf : new Configuration();
+    readahead = conf.getBoolean(MAPRED_IFILE_READAHEAD,
+        DEFAULT_MAPRED_IFILE_READAHEAD);
+    readaheadLength = conf.getInt(MAPRED_IFILE_READAHEAD_BYTES,
+        DEFAULT_MAPRED_IFILE_READAHEAD_BYTES);
+
+    doReadahead();
+  }
+
+  private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
+    FileDescriptor fd = null;
+    try {
+      if (in instanceof HasFileDescriptor) {
+        fd = ((HasFileDescriptor)in).getFileDescriptor();
+      } else if (in instanceof FileInputStream) {
+        fd = ((FileInputStream)in).getFD();
+      }
+    } catch (IOException e) {
+      LOG.info("Unable to determine FileDescriptor", e);
+    }
+    return fd;
   }
 
   /**
@@ -61,6 +118,10 @@ class IFileInputStream extends InputStre
    */
   @Override
   public void close() throws IOException {
+
+    if (curReadahead != null) {
+      curReadahead.cancel();
+    }
     if (currentOffset < dataLength) {
       byte[] t = new byte[Math.min((int)
             (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
@@ -97,10 +158,21 @@ class IFileInputStream extends InputStre
     if (currentOffset >= dataLength) {
       return -1;
     }
-    
+
+    doReadahead();
+
     return doRead(b,off,len);
   }
 
+  private void doReadahead() {
+    if (raPool != null && inFd != null && readahead) {
+      curReadahead = raPool.readaheadStream(
+          "ifile", inFd,
+          currentOffset, readaheadLength, dataLength,
+          curReadahead);
+    }
+  }
+
   /**
    * Read bytes from the stream.
    * At EOF, checksum is validated and sent back

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1373672&r1=1373671&r2=1373672&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed Aug 15 23:15:09 2012
@@ -1684,7 +1684,7 @@ class ReduceTask extends Task {
         }
 
         IFileInputStream checksumIn = 
-          new IFileInputStream(input,compressedLength);
+          new IFileInputStream(input,compressedLength, conf);
 
         input = checksumIn;       
       

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java?rev=1373672&r1=1373671&r2=1373672&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestIFileStreams.java Wed Aug 15 23:15:09 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -35,7 +36,7 @@ public class TestIFileStreams extends Te
     ifos.close();
     DataInputBuffer dib = new DataInputBuffer();
     dib.reset(dob.getData(), DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 104);
+    IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
     for (int i = 0; i < DLEN; ++i) {
       assertEquals(i, ifis.read());
     }
@@ -54,7 +55,7 @@ public class TestIFileStreams extends Te
     final byte[] b = dob.getData();
     ++b[17];
     dib.reset(b, DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 104);
+    IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
     int i = 0;
     try {
       while (i < DLEN) {
@@ -83,7 +84,7 @@ public class TestIFileStreams extends Te
     ifos.close();
     DataInputBuffer dib = new DataInputBuffer();
     dib.reset(dob.getData(), DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 100);
+    IFileInputStream ifis = new IFileInputStream(dib, 100, new Configuration());
     int i = 0;
     try {
       while (i < DLEN - 8) {