You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/12/17 21:57:01 UTC

svn commit: r1551701 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/resources/ src/test/java/org/apache/hadoop/hdfs/

Author: cmccabe
Date: Tue Dec 17 20:57:00 2013
New Revision: 1551701

URL: http://svn.apache.org/r1551701
Log:
HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not (cmccabe)

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Dec 17 20:57:00 2013
@@ -256,6 +256,9 @@ Trunk (Unreleased)
     HDFS-5431. Support cachepool-based limit management in path-based caching
     (awang via cmccabe)
 
+    HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not
+    (cmccabe)
+
   OPTIMIZATIONS
     HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Tue Dec 17 20:57:00 2013
@@ -18,8 +18,10 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.util.EnumSet;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -89,10 +91,10 @@ public interface BlockReader extends Byt
   /**
    * Get a ClientMmap object for this BlockReader.
    *
-   * @param curBlock      The current block.
+   * @param opts          The read options to use.
    * @return              The ClientMmap object, or null if mmap is not
    *                      supported.
    */
-  ClientMmap getClientMmap(LocatedBlock curBlock,
+  ClientMmap getClientMmap(EnumSet<ReadOption> opts,
         ClientMmapManager mmapManager);
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Tue Dec 17 20:57:00 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
@@ -98,7 +99,7 @@ public class BlockReaderFactory {
         // enabled, try to set up a BlockReaderLocal.
         BlockReader reader = newShortCircuitBlockReader(conf, file,
             block, blockToken, startOffset, len, peer, datanodeID,
-            domSockFactory, verifyChecksum, fisCache);
+            domSockFactory, verifyChecksum, fisCache, cachingStrategy);
         if (reader != null) {
           // One we've constructed the short-circuit block reader, we don't
           // need the socket any more.  So let's return it to the cache.
@@ -160,7 +161,8 @@ public class BlockReaderFactory {
    * @param verifyChecksum     True if we should verify the checksums.
    *                           Note: even if this is true, when
    *                           DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is
-   *                           set, we will skip checksums.
+   *                           set or the block is mlocked, we will skip
+   *                           checksums.
    *
    * @return                   The BlockReaderLocal, or null if the
    *                           DataNode declined to provide short-circuit
@@ -172,7 +174,8 @@ public class BlockReaderFactory {
       Token<BlockTokenIdentifier> blockToken, long startOffset,
       long len, Peer peer, DatanodeID datanodeID,
       DomainSocketFactory domSockFactory, boolean verifyChecksum,
-      FileInputStreamCache fisCache) throws IOException {
+      FileInputStreamCache fisCache,
+      CachingStrategy cachingStrategy) throws IOException {
     final DataOutputStream out =
         new DataOutputStream(new BufferedOutputStream(
           peer.getOutputStream()));
@@ -189,9 +192,18 @@ public class BlockReaderFactory {
       FileInputStream fis[] = new FileInputStream[2];
       sock.recvFileInputStreams(fis, buf, 0, buf.length);
       try {
-        reader = new BlockReaderLocal(conf, file, block,
-            startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum,
-            fisCache);
+        reader = new BlockReaderLocal.Builder(conf).
+            setFilename(file).
+            setBlock(block).
+            setStartOffset(startOffset).
+            setStreams(fis).
+            setDatanodeID(datanodeID).
+            setVerifyChecksum(verifyChecksum).
+            setBlockMetadataHeader(
+                BlockMetadataHeader.preadHeader(fis[1].getChannel())).
+            setFileInputStreamCache(fisCache).
+            setCachingStrategy(cachingStrategy).
+            build();
       } finally {
         if (reader == null) {
           IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Tue Dec 17 20:57:00 2013
@@ -17,25 +17,30 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.hadoop.conf.Configuration;
+import java.nio.channels.FileChannel;
+import java.util.EnumSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 /**
  * BlockReaderLocal enables local short circuited reads. If the DFS client is on
  * the same machine as the datanode, then the client can read files directly
@@ -55,446 +60,581 @@ import org.apache.hadoop.util.DataChecks
 class BlockReaderLocal implements BlockReader {
   static final Log LOG = LogFactory.getLog(BlockReaderLocal.class);
 
-  private final FileInputStream dataIn; // reader for the data file
-  private final FileInputStream checksumIn;   // reader for the checksum file
-  private final boolean verifyChecksum;
+  private static DirectBufferPool bufferPool = new DirectBufferPool();
+
+  public static class Builder {
+    private int bufferSize;
+    private boolean verifyChecksum;
+    private int maxReadahead;
+    private String filename;
+    private FileInputStream streams[];
+    private long dataPos;
+    private DatanodeID datanodeID;
+    private FileInputStreamCache fisCache;
+    private boolean mlocked;
+    private BlockMetadataHeader header;
+    private ExtendedBlock block;
+
+    public Builder(Conf conf) {
+      this.maxReadahead = Integer.MAX_VALUE;
+      this.verifyChecksum = !conf.skipShortCircuitChecksums;
+      this.bufferSize = conf.shortCircuitBufferSize;
+    }
+
+    public Builder setVerifyChecksum(boolean verifyChecksum) {
+      this.verifyChecksum = verifyChecksum;
+      return this;
+    }
+
+    public Builder setCachingStrategy(CachingStrategy cachingStrategy) {
+      long readahead = cachingStrategy.getReadahead() != null ?
+          cachingStrategy.getReadahead() :
+              DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
+      this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead);
+      return this;
+    }
+
+    public Builder setFilename(String filename) {
+      this.filename = filename;
+      return this;
+    }
+
+    public Builder setStreams(FileInputStream streams[]) {
+      this.streams = streams;
+      return this;
+    }
+
+    public Builder setStartOffset(long startOffset) {
+      this.dataPos = Math.max(0, startOffset);
+      return this;
+    }
+
+    public Builder setDatanodeID(DatanodeID datanodeID) {
+      this.datanodeID = datanodeID;
+      return this;
+    }
+
+    public Builder setFileInputStreamCache(FileInputStreamCache fisCache) {
+      this.fisCache = fisCache;
+      return this;
+    }
+
+    public Builder setMlocked(boolean mlocked) {
+      this.mlocked = mlocked;
+      return this;
+    }
+
+    public Builder setBlockMetadataHeader(BlockMetadataHeader header) {
+      this.header = header;
+      return this;
+    }
+
+    public Builder setBlock(ExtendedBlock block) {
+      this.block = block;
+      return this;
+    }
+
+    public BlockReaderLocal build() {
+      Preconditions.checkNotNull(streams);
+      Preconditions.checkArgument(streams.length == 2);
+      Preconditions.checkNotNull(header);
+      return new BlockReaderLocal(this);
+    }
+  }
+
+  private boolean closed = false;
 
   /**
-   * Offset from the most recent chunk boundary at which the next read should
-   * take place. Is only set to non-zero at construction time, and is
-   * decremented (usually to 0) by subsequent reads. This avoids having to do a
-   * checksum read at construction to position the read cursor correctly.
+   * Pair of streams for this block.
    */
-  private int offsetFromChunkBoundary;
-  
-  private byte[] skipBuf = null;
+  private final FileInputStream streams[];
 
   /**
-   * Used for checksummed reads that need to be staged before copying to their
-   * output buffer because they are either a) smaller than the checksum chunk
-   * size or b) issued by the slower read(byte[]...) path
+   * The data FileChannel.
    */
-  private ByteBuffer slowReadBuff = null;
-  private ByteBuffer checksumBuff = null;
-  private DataChecksum checksum;
+  private final FileChannel dataIn;
 
-  private static DirectBufferPool bufferPool = new DirectBufferPool();
+  /**
+   * The next place we'll read from in the block data FileChannel.
+   *
+   * If data is buffered in dataBuf, this offset will be larger than the
+   * offset of the next byte which a read() operation will give us.
+   */
+  private long dataPos;
+
+  /**
+   * The Checksum FileChannel.
+   */
+  private final FileChannel checksumIn;
+  
+  /**
+   * Checksum type and size.
+   */
+  private final DataChecksum checksum;
 
-  private final int bytesPerChecksum;
-  private final int checksumSize;
+  /**
+   * If false, we will always skip the checksum.
+   */
+  private final boolean verifyChecksum;
+
+  /**
+   * If true, this block is mlocked on the DataNode.
+   */
+  private final AtomicBoolean mlocked;
 
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
+  /**
+   * Name of the block, for logging purposes.
+   */
   private final String filename;
 
+  /**
+   * DataNode which contained this block.
+   */
   private final DatanodeID datanodeID;
+  
+  /**
+   * Block ID and Block Pool ID.
+   */
   private final ExtendedBlock block;
   
+  /**
+   * Cache of Checksum#bytesPerChecksum.
+   */
+  private int bytesPerChecksum;
+
+  /**
+   * Cache of Checksum#checksumSize.
+   */
+  private int checksumSize;
+
+  /**
+   * FileInputStream cache to return the streams to upon closing,
+   * or null if we should just close them unconditionally.
+   */
   private final FileInputStreamCache fisCache;
+
+  /**
+   * Maximum number of chunks to allocate.
+   *
+   * This is used to allocate dataBuf and checksumBuf, in the event that
+   * we need them.
+   */
+  private final int maxAllocatedChunks;
+
+  /**
+   * True if zero readahead was requested.
+   */
+  private final boolean zeroReadaheadRequested;
+
+  /**
+   * Maximum amount of readahead we'll do.  This will always be at least the,
+   * size of a single chunk, even if {@link zeroReadaheadRequested} is true.
+   * The reason is because we need to do a certain amount of buffering in order
+   * to do checksumming.
+   * 
+   * This determines how many bytes we'll use out of dataBuf and checksumBuf.
+   * Why do we allocate buffers, and then (potentially) only use part of them?
+   * The rationale is that allocating a lot of buffers of different sizes would
+   * make it very difficult for the DirectBufferPool to re-use buffers. 
+   */
+  private int maxReadaheadLength;
+
   private ClientMmap clientMmap;
-  private boolean mmapDisabled;
-  
-  private static int getSlowReadBufferNumChunks(int bufSize,
-      int bytesPerChecksum) {
-    if (bufSize < bytesPerChecksum) {
-      throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" +
-          bufSize + ") is not large enough to hold a single chunk (" +
-          bytesPerChecksum +  "). Please configure " +
-          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately");
-    }
-
-    // Round down to nearest chunk size
-    return bufSize / bytesPerChecksum;
-  }
-
-  public BlockReaderLocal(DFSClient.Conf conf, String filename,
-      ExtendedBlock block, long startOffset, long length,
-      FileInputStream dataIn, FileInputStream checksumIn,
-      DatanodeID datanodeID, boolean verifyChecksum,
-      FileInputStreamCache fisCache) throws IOException {
-    this.dataIn = dataIn;
-    this.checksumIn = checksumIn;
-    this.startOffset = Math.max(startOffset, 0);
-    this.filename = filename;
-    this.datanodeID = datanodeID;
-    this.block = block;
-    this.fisCache = fisCache;
-    this.clientMmap = null;
-    this.mmapDisabled = false;
-
-    // read and handle the common header here. For now just a version
-    checksumIn.getChannel().position(0);
-    BlockMetadataHeader header = BlockMetadataHeader
-        .readHeader(new DataInputStream(
-            new BufferedInputStream(checksumIn,
-                BlockMetadataHeader.getHeaderSize())));
-    short version = header.getVersion();
-    if (version != BlockMetadataHeader.VERSION) {
-      throw new IOException("Wrong version (" + version + ") of the " +
-          "metadata file for " + filename + ".");
-    }
-    this.verifyChecksum = verifyChecksum && !conf.skipShortCircuitChecksums;
-    long firstChunkOffset;
-    if (this.verifyChecksum) {
-      this.checksum = header.getChecksum();
-      this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
-      this.checksumSize = this.checksum.getChecksumSize();
-      firstChunkOffset = startOffset
-          - (startOffset % checksum.getBytesPerChecksum());
-      this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset);
-
-      int chunksPerChecksumRead = getSlowReadBufferNumChunks(
-          conf.shortCircuitBufferSize, bytesPerChecksum);
-      slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
-      checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
-      // Initially the buffers have nothing to read.
-      slowReadBuff.flip();
-      checksumBuff.flip();
-      long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
-      IOUtils.skipFully(checksumIn, checkSumOffset);
+
+  /**
+   * Buffers data starting at the current dataPos and extending on
+   * for dataBuf.limit().
+   *
+   * This may be null if we don't need it.
+   */
+  private ByteBuffer dataBuf;
+
+  /**
+   * Buffers checksums starting at the current checksumPos and extending on
+   * for checksumBuf.limit().
+   *
+   * This may be null if we don't need it.
+   */
+  private ByteBuffer checksumBuf;
+
+  private boolean mmapDisabled = false;
+
+  private BlockReaderLocal(Builder builder) {
+    this.streams = builder.streams;
+    this.dataIn = builder.streams[0].getChannel();
+    this.dataPos = builder.dataPos;
+    this.checksumIn = builder.streams[1].getChannel();
+    this.checksum = builder.header.getChecksum();
+    this.verifyChecksum = builder.verifyChecksum &&
+        (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
+    this.mlocked = new AtomicBoolean(builder.mlocked);
+    this.filename = builder.filename;
+    this.datanodeID = builder.datanodeID;
+    this.fisCache = builder.fisCache;
+    this.block = builder.block;
+    this.bytesPerChecksum = checksum.getBytesPerChecksum();
+    this.checksumSize = checksum.getChecksumSize();
+
+    this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 :
+        ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum);
+    // Calculate the effective maximum readahead.
+    // We can't do more readahead than there is space in the buffer.
+    int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 :
+        ((Math.min(builder.bufferSize, builder.maxReadahead) +
+            bytesPerChecksum - 1) / bytesPerChecksum);
+    if (maxReadaheadChunks == 0) {
+      this.zeroReadaheadRequested = true;
+      maxReadaheadChunks = 1;
     } else {
-      firstChunkOffset = startOffset;
-      this.checksum = null;
-      this.bytesPerChecksum = 0;
-      this.checksumSize = 0;
-      this.offsetFromChunkBoundary = 0;
+      this.zeroReadaheadRequested = false;
     }
-    
-    boolean success = false;
+    this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
+  }
+
+  private synchronized void createDataBufIfNeeded() {
+    if (dataBuf == null) {
+      dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum);
+      dataBuf.position(0);
+      dataBuf.limit(0);
+    }
+  }
+
+  private synchronized void freeDataBufIfExists() {
+    if (dataBuf != null) {
+      // When disposing of a dataBuf, we have to move our stored file index
+      // backwards.
+      dataPos -= dataBuf.remaining();
+      dataBuf.clear();
+      bufferPool.returnBuffer(dataBuf);
+      dataBuf = null;
+    }
+  }
+
+  private synchronized void createChecksumBufIfNeeded() {
+    if (checksumBuf == null) {
+      checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize);
+      checksumBuf.position(0);
+      checksumBuf.limit(0);
+    }
+  }
+
+  private synchronized void freeChecksumBufIfExists() {
+    if (checksumBuf != null) {
+      checksumBuf.clear();
+      bufferPool.returnBuffer(checksumBuf);
+      checksumBuf = null;
+    }
+  }
+
+  private synchronized int drainDataBuf(ByteBuffer buf)
+      throws IOException {
+    if (dataBuf == null) return 0;
+    int oldLimit = dataBuf.limit();
+    int nRead = Math.min(dataBuf.remaining(), buf.remaining());
+    if (nRead == 0) return 0;
     try {
-      // Reposition both input streams to the beginning of the chunk
-      // containing startOffset
-      this.dataIn.getChannel().position(firstChunkOffset);
-      success = true;
+      dataBuf.limit(dataBuf.position() + nRead);
+      buf.put(dataBuf);
     } finally {
-      if (success) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Created BlockReaderLocal for file " + filename
-              + " block " + block + " in datanode " + datanodeID);
-        }
-      } else {
-        if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff);
-        if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff);
-      }
+      dataBuf.limit(oldLimit);
     }
+    return nRead;
   }
 
   /**
-   * Reads bytes into a buffer until EOF or the buffer's limit is reached
+   * Read from the block file into a buffer.
+   *
+   * This function overwrites checksumBuf.  It will increment dataPos.
+   *
+   * @param buf   The buffer to read into.  May be dataBuf.
+   *              The position and limit of this buffer should be set to
+   *              multiples of the checksum size.
+   * @param canSkipChecksum  True if we can skip checksumming.
+   *
+   * @return      Total bytes read.  0 on EOF.
    */
-  private int fillBuffer(FileInputStream stream, ByteBuffer buf)
+  private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
       throws IOException {
-    int bytesRead = stream.getChannel().read(buf);
-    if (bytesRead < 0) {
-      //EOF
-      return bytesRead;
-    }
-    while (buf.remaining() > 0) {
-      int n = stream.getChannel().read(buf);
-      if (n < 0) {
-        //EOF
-        return bytesRead;
-      }
-      bytesRead += n;
+    int total = 0;
+    long startDataPos = dataPos;
+    int startBufPos = buf.position();
+    while (buf.hasRemaining()) {
+      int nRead = dataIn.read(buf, dataPos);
+      if (nRead < 0) {
+        break;
+      }
+      dataPos += nRead;
+      total += nRead;
+    }
+    if (canSkipChecksum) {
+      freeChecksumBufIfExists();
+      return total;
     }
-    return bytesRead;
-  }
+    if (total > 0) {
+      try {
+        buf.limit(buf.position());
+        buf.position(startBufPos);
+        createChecksumBufIfNeeded();
+        int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
+        checksumBuf.clear();
+        checksumBuf.limit(checksumsNeeded * checksumSize);
+        long checksumPos =
+          7 + ((startDataPos / bytesPerChecksum) * checksumSize);
+        while (checksumBuf.hasRemaining()) {
+          int nRead = checksumIn.read(checksumBuf, checksumPos);
+          if (nRead < 0) {
+            throw new IOException("Got unexpected checksum file EOF at " +
+                checksumPos + ", block file position " + startDataPos + " for " +
+                "block " + block + " of file " + filename);
+          }
+          checksumPos += nRead;
+        }
+        checksumBuf.flip();
   
-  /**
-   * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into
-   * another.
-   */
-  private void writeSlice(ByteBuffer from, ByteBuffer to, int length) {
-    int oldLimit = from.limit();
-    from.limit(from.position() + length);
-    try {
-      to.put(from);
-    } finally {
-      from.limit(oldLimit);
+        checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
+      } finally {
+        buf.position(buf.limit());
+      }
     }
+    return total;
   }
 
+  private boolean getCanSkipChecksum() {
+    return (!verifyChecksum) || mlocked.get();
+  }
+  
   @Override
   public synchronized int read(ByteBuffer buf) throws IOException {
-    int nRead = 0;
-    if (verifyChecksum) {
-      // A 'direct' read actually has three phases. The first drains any
-      // remaining bytes from the slow read buffer. After this the read is
-      // guaranteed to be on a checksum chunk boundary. If there are still bytes
-      // to read, the fast direct path is used for as many remaining bytes as
-      // possible, up to a multiple of the checksum chunk size. Finally, any
-      // 'odd' bytes remaining at the end of the read cause another slow read to
-      // be issued, which involves an extra copy.
-
-      // Every 'slow' read tries to fill the slow read buffer in one go for
-      // efficiency's sake. As described above, all non-checksum-chunk-aligned
-      // reads will be served from the slower read path.
-
-      if (slowReadBuff.hasRemaining()) {
-        // There are remaining bytes from a small read available. This usually
-        // means this read is unaligned, which falls back to the slow path.
-        int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining());
-        writeSlice(slowReadBuff, buf, fromSlowReadBuff);
-        nRead += fromSlowReadBuff;
-      }
-
-      if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) {
-        // Since we have drained the 'small read' buffer, we are guaranteed to
-        // be chunk-aligned
-        int len = buf.remaining() - (buf.remaining() % bytesPerChecksum);
-
-        // There's only enough checksum buffer space available to checksum one
-        // entire slow read buffer. This saves keeping the number of checksum
-        // chunks around.
-        len = Math.min(len, slowReadBuff.capacity());
-        int oldlimit = buf.limit();
-        buf.limit(buf.position() + len);
-        int readResult = 0;
-        try {
-          readResult = doByteBufferRead(buf);
-        } finally {
-          buf.limit(oldlimit);
-        }
-        if (readResult == -1) {
-          return nRead;
-        } else {
-          nRead += readResult;
-          buf.position(buf.position() + readResult);
-        }
-      }
-
-      // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read
-      // until chunk boundary
-      if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) {
-        int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary);
-        int readResult = fillSlowReadBuffer(toRead);
-        if (readResult == -1) {
-          return nRead;
-        } else {
-          int fromSlowReadBuff = Math.min(readResult, buf.remaining());
-          writeSlice(slowReadBuff, buf, fromSlowReadBuff);
-          nRead += fromSlowReadBuff;
-        }
+    boolean canSkipChecksum = getCanSkipChecksum();
+    
+    String traceString = null;
+    if (LOG.isTraceEnabled()) {
+      traceString = new StringBuilder().
+          append("read(").
+          append("buf.remaining=").append(buf.remaining()).
+          append(", block=").append(block).
+          append(", filename=").append(filename).
+          append(", canSkipChecksum=").append(canSkipChecksum).
+          append(")").toString();
+      LOG.info(traceString + ": starting");
+    }
+    int nRead;
+    try {
+      if (canSkipChecksum && zeroReadaheadRequested) {
+        nRead = readWithoutBounceBuffer(buf);
+      } else {
+        nRead = readWithBounceBuffer(buf, canSkipChecksum);
       }
-    } else {
-      // Non-checksummed reads are much easier; we can just fill the buffer directly.
-      nRead = doByteBufferRead(buf);
-      if (nRead > 0) {
-        buf.position(buf.position() + nRead);
+    } catch (IOException e) {
+      if (LOG.isTraceEnabled()) {
+        LOG.info(traceString + ": I/O error", e);
       }
+      throw e;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.info(traceString + ": returning " + nRead);
     }
     return nRead;
   }
 
+  private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
+      throws IOException {
+    freeDataBufIfExists();
+    freeChecksumBufIfExists();
+    int total = 0;
+    while (buf.hasRemaining()) {
+      int nRead = dataIn.read(buf, dataPos);
+      if (nRead < 0) {
+        break;
+      }
+      dataPos += nRead;
+      total += nRead;
+    }
+    return (total == 0) ? -1 : total;
+  }
+
   /**
-   * Tries to read as many bytes as possible into supplied buffer, checksumming
-   * each chunk if needed.
-   *
-   * <b>Preconditions:</b>
-   * <ul>
-   * <li>
-   * If checksumming is enabled, buf.remaining must be a multiple of
-   * bytesPerChecksum. Note that this is not a requirement for clients of
-   * read(ByteBuffer) - in the case of non-checksum-sized read requests,
-   * read(ByteBuffer) will substitute a suitably sized buffer to pass to this
-   * method.
-   * </li>
-   * </ul>
-   * <b>Postconditions:</b>
-   * <ul>
-   * <li>buf.limit and buf.mark are unchanged.</li>
-   * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the
-   * requested bytes can be read straight from the buffer</li>
-   * </ul>
-   *
-   * @param buf
-   *          byte buffer to write bytes to. If checksums are not required, buf
-   *          can have any number of bytes remaining, otherwise there must be a
-   *          multiple of the checksum chunk size remaining.
-   * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt>
-   *         that is, the the number of useful bytes (up to the amount
-   *         requested) readable from the buffer by the client.
-   */
-  private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException {
-    if (verifyChecksum) {
-      assert buf.remaining() % bytesPerChecksum == 0;
-    }
-    int dataRead = -1;
-
-    int oldpos = buf.position();
-    // Read as much as we can into the buffer.
-    dataRead = fillBuffer(dataIn, buf);
-
-    if (dataRead == -1) {
-      return -1;
-    }
-
-    if (verifyChecksum) {
-      ByteBuffer toChecksum = buf.duplicate();
-      toChecksum.position(oldpos);
-      toChecksum.limit(oldpos + dataRead);
-
-      checksumBuff.clear();
-      // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum );
-      int numChunks =
-        (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum;
-      checksumBuff.limit(checksumSize * numChunks);
-
-      fillBuffer(checksumIn, checksumBuff);
-      checksumBuff.flip();
-
-      checksum.verifyChunkedSums(toChecksum, checksumBuff, filename,
-          this.startOffset);
-    }
-
-    if (dataRead >= 0) {
-      buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead));
-    }
-
-    if (dataRead < offsetFromChunkBoundary) {
-      // yikes, didn't even get enough bytes to honour offset. This can happen
-      // even if we are verifying checksums if we are at EOF.
-      offsetFromChunkBoundary -= dataRead;
-      dataRead = 0;
+   * Fill the data buffer.  If necessary, validate the data against the
+   * checksums.
+   * 
+   * We always want the offsets of the data contained in dataBuf to be
+   * aligned to the chunk boundary.  If we are validating checksums, we
+   * accomplish this by seeking backwards in the file until we're on a
+   * chunk boundary.  (This is necessary because we can't checksum a
+   * partial chunk.)  If we are not validating checksums, we simply only
+   * fill the latter part of dataBuf.
+   * 
+   * @param canSkipChecksum  true if we can skip checksumming.
+   * @return                 true if we hit EOF.
+   * @throws IOException
+   */
+  private synchronized boolean fillDataBuf(boolean canSkipChecksum)
+      throws IOException {
+    createDataBufIfNeeded();
+    final int slop = (int)(dataPos % bytesPerChecksum);
+    final long oldDataPos = dataPos;
+    dataBuf.limit(maxReadaheadLength);
+    if (canSkipChecksum) {
+      dataBuf.position(slop);
+      fillBuffer(dataBuf, canSkipChecksum);
     } else {
-      dataRead -= offsetFromChunkBoundary;
-      offsetFromChunkBoundary = 0;
+      dataPos -= slop;
+      dataBuf.position(0);
+      fillBuffer(dataBuf, canSkipChecksum);
     }
-
-    return dataRead;
+    dataBuf.limit(dataBuf.position());
+    dataBuf.position(Math.min(dataBuf.position(), slop));
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " +
+          "buffer from offset " + oldDataPos + " of " + block);
+    }
+    return dataBuf.limit() != maxReadaheadLength;
   }
 
   /**
-   * Ensures that up to len bytes are available and checksummed in the slow read
-   * buffer. The number of bytes available to read is returned. If the buffer is
-   * not already empty, the number of remaining bytes is returned and no actual
-   * read happens.
+   * Read using the bounce buffer.
+   *
+   * A 'direct' read actually has three phases. The first drains any
+   * remaining bytes from the slow read buffer. After this the read is
+   * guaranteed to be on a checksum chunk boundary. If there are still bytes
+   * to read, the fast direct path is used for as many remaining bytes as
+   * possible, up to a multiple of the checksum chunk size. Finally, any
+   * 'odd' bytes remaining at the end of the read cause another slow read to
+   * be issued, which involves an extra copy.
+   *
+   * Every 'slow' read tries to fill the slow read buffer in one go for
+   * efficiency's sake. As described above, all non-checksum-chunk-aligned
+   * reads will be served from the slower read path.
    *
-   * @param len
-   *          the maximum number of bytes to make available. After len bytes
-   *          are read, the underlying bytestream <b>must</b> be at a checksum
-   *          boundary, or EOF. That is, (len + currentPosition) %
-   *          bytesPerChecksum == 0.
-   * @return the number of bytes available to read, or -1 if EOF.
+   * @param buf              The buffer to read into. 
+   * @param canSkipChecksum  True if we can skip checksums.
    */
-  private synchronized int fillSlowReadBuffer(int len) throws IOException {
-    int nRead = -1;
-    if (slowReadBuff.hasRemaining()) {
-      // Already got data, good to go.
-      nRead = Math.min(len, slowReadBuff.remaining());
-    } else {
-      // Round a complete read of len bytes (plus any implicit offset) to the
-      // next chunk boundary, since we try and read in multiples of a chunk
-      int nextChunk = len + offsetFromChunkBoundary +
-          (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum));
-      int limit = Math.min(nextChunk, slowReadBuff.capacity());
-      assert limit % bytesPerChecksum == 0;
-
-      slowReadBuff.clear();
-      slowReadBuff.limit(limit);
-
-      nRead = doByteBufferRead(slowReadBuff);
-
-      if (nRead > 0) {
-        // So that next time we call slowReadBuff.hasRemaining(), we don't get a
-        // false positive.
-        slowReadBuff.limit(nRead + slowReadBuff.position());
+  private synchronized int readWithBounceBuffer(ByteBuffer buf,
+        boolean canSkipChecksum) throws IOException {
+    int total = 0;
+    boolean eof = false;
+    while (true) {
+      int bb = drainDataBuf(buf); // drain bounce buffer if possible
+      total += bb;
+      int needed = buf.remaining();
+      if (eof || (needed == 0)) {
+        break;
+      } else if (buf.isDirect() && (needed >= maxReadaheadLength)
+          && ((dataPos % bytesPerChecksum) == 0)) {
+        // Fast lane: try to read directly into user-supplied buffer, bypassing
+        // bounce buffer.
+        int oldLimit = buf.limit();
+        int nRead;
+        try {
+          buf.limit(buf.position() + maxReadaheadLength);
+          nRead = fillBuffer(buf, canSkipChecksum);
+        } finally {
+          buf.limit(oldLimit);
+        }
+        if (nRead < maxReadaheadLength) {
+          eof = true;
+        }
+        total += nRead;
+      } else {
+        // Slow lane: refill bounce buffer.
+        if (fillDataBuf(canSkipChecksum)) {
+          eof = true;
+        }
       }
     }
-    return nRead;
+    return total == 0 ? -1 : total;
   }
 
   @Override
-  public synchronized int read(byte[] buf, int off, int len) throws IOException {
+  public synchronized int read(byte[] arr, int off, int len)
+        throws IOException {
+    boolean canSkipChecksum = getCanSkipChecksum();
+    String traceString = null;
     if (LOG.isTraceEnabled()) {
-      LOG.trace("read off " + off + " len " + len);
+      traceString = new StringBuilder().
+          append("read(arr.length=").append(arr.length).
+          append(", off=").append(off).
+          append(", len=").append(len).
+          append(", filename=").append(filename).
+          append(", block=").append(block).
+          append(", canSkipChecksum=").append(canSkipChecksum).
+          append(")").toString();
+      LOG.trace(traceString + ": starting");
     }
-    if (!verifyChecksum) {
-      return dataIn.read(buf, off, len);
+    int nRead;
+    try {
+      if (canSkipChecksum && zeroReadaheadRequested) {
+        nRead = readWithoutBounceBuffer(arr, off, len);
+      } else {
+        nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
+      }
+    } catch (IOException e) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(traceString + ": I/O error", e);
+      }
+      throw e;
     }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(traceString + ": returning " + nRead);
+    }
+    return nRead;
+  }
 
-    int nRead = fillSlowReadBuffer(slowReadBuff.capacity());
-
+  private synchronized int readWithoutBounceBuffer(byte arr[], int off,
+        int len) throws IOException {
+    freeDataBufIfExists();
+    freeChecksumBufIfExists();
+    int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
     if (nRead > 0) {
-      // Possible that buffer is filled with a larger read than we need, since
-      // we tried to read as much as possible at once
-      nRead = Math.min(len, nRead);
-      slowReadBuff.get(buf, off, nRead);
+      dataPos += nRead;
     }
+    return nRead == 0 ? -1 : nRead;
+  }
 
-    return nRead;
+  private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
+        boolean canSkipChecksum) throws IOException {
+    createDataBufIfNeeded();
+    if (!dataBuf.hasRemaining()) {
+      dataBuf.position(0);
+      dataBuf.limit(maxReadaheadLength);
+      fillDataBuf(canSkipChecksum);
+    }
+    int toRead = Math.min(dataBuf.remaining(), len);
+    dataBuf.get(arr, off, toRead);
+    return toRead == 0 ? -1 : toRead;
   }
 
   @Override
   public synchronized long skip(long n) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("skip " + n);
-    }
-    if (n <= 0) {
-      return 0;
-    }
-    if (!verifyChecksum) {
-      return dataIn.skip(n);
-    }
-  
-    // caller made sure newPosition is not beyond EOF.
-    int remaining = slowReadBuff.remaining();
-    int position = slowReadBuff.position();
-    int newPosition = position + (int)n;
-  
-    // if the new offset is already read into dataBuff, just reposition
-    if (n <= remaining) {
-      assert offsetFromChunkBoundary == 0;
-      slowReadBuff.position(newPosition);
-      return n;
-    }
-  
-    // for small gap, read through to keep the data/checksum in sync
-    if (n - remaining <= bytesPerChecksum) {
-      slowReadBuff.position(position + remaining);
-      if (skipBuf == null) {
-        skipBuf = new byte[bytesPerChecksum];
-      }
-      int ret = read(skipBuf, 0, (int)(n - remaining));
-      return ret;
+    int discardedFromBuf = 0;
+    long remaining = n;
+    if ((dataBuf != null) && dataBuf.hasRemaining()) {
+      discardedFromBuf = (int)Math.min(dataBuf.remaining(), n);
+      dataBuf.position(dataBuf.position() + discardedFromBuf);
+      remaining -= discardedFromBuf;
     }
-  
-    // optimize for big gap: discard the current buffer, skip to
-    // the beginning of the appropriate checksum chunk and then
-    // read to the middle of that chunk to be in sync with checksums.
-  
-    // We can't use this.offsetFromChunkBoundary because we need to know how
-    // many bytes of the offset were really read. Calling read(..) with a
-    // positive this.offsetFromChunkBoundary causes that many bytes to get
-    // silently skipped.
-    int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
-    long toskip = n - remaining - myOffsetFromChunkBoundary;
-
-    slowReadBuff.position(slowReadBuff.limit());
-    checksumBuff.position(checksumBuff.limit());
-  
-    IOUtils.skipFully(dataIn, toskip);
-    long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
-    IOUtils.skipFully(checksumIn, checkSumOffset);
-
-    // read into the middle of the chunk
-    if (skipBuf == null) {
-      skipBuf = new byte[bytesPerChecksum];
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" + 
+        filename + "): discarded " + discardedFromBuf + " bytes from " +
+        "dataBuf and advanced dataPos by " + remaining);
     }
-    assert skipBuf.length == bytesPerChecksum;
-    assert myOffsetFromChunkBoundary < bytesPerChecksum;
-
-    int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
+    dataPos += remaining;
+    return n;
+  }
 
-    if (ret == -1) {  // EOS
-      return toskip;
-    } else {
-      return (toskip + ret);
-    }
+  @Override
+  public int available() throws IOException {
+    // We never do network I/O in BlockReaderLocal.
+    return Integer.MAX_VALUE;
   }
 
   @Override
   public synchronized void close() throws IOException {
+    if (closed) return;
+    closed = true;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("close(filename=" + filename + ", block=" + block + ")");
+    }
     if (clientMmap != null) {
       clientMmap.unref();
       clientMmap = null;
@@ -504,58 +644,55 @@ class BlockReaderLocal implements BlockR
         LOG.debug("putting FileInputStream for " + filename +
             " back into FileInputStreamCache");
       }
-      fisCache.put(datanodeID, block, new FileInputStream[] {dataIn, checksumIn});
+      fisCache.put(datanodeID, block, streams);
     } else {
       LOG.debug("closing FileInputStream for " + filename);
       IOUtils.cleanup(LOG, dataIn, checksumIn);
     }
-    if (slowReadBuff != null) {
-      bufferPool.returnBuffer(slowReadBuff);
-      slowReadBuff = null;
-    }
-    if (checksumBuff != null) {
-      bufferPool.returnBuffer(checksumBuff);
-      checksumBuff = null;
-    }
-    startOffset = -1;
-    checksum = null;
+    freeDataBufIfExists();
+    freeChecksumBufIfExists();
   }
 
   @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return BlockReaderUtil.readAll(this, buf, offset, len);
-  }
-
-  @Override
-  public void readFully(byte[] buf, int off, int len) throws IOException {
-    BlockReaderUtil.readFully(this, buf, off, len);
+  public synchronized void readFully(byte[] arr, int off, int len)
+      throws IOException {
+    BlockReaderUtil.readFully(this, arr, off, len);
   }
 
   @Override
-  public int available() throws IOException {
-    // We never do network I/O in BlockReaderLocal.
-    return Integer.MAX_VALUE;
+  public synchronized int readAll(byte[] buf, int off, int len)
+      throws IOException {
+    return BlockReaderUtil.readAll(this, buf, off, len);
   }
 
   @Override
   public boolean isLocal() {
     return true;
   }
-  
+
   @Override
   public boolean isShortCircuit() {
     return true;
   }
 
   @Override
-  public ClientMmap getClientMmap(LocatedBlock curBlock,
-      ClientMmapManager mmapManager) {
+  public synchronized ClientMmap getClientMmap(EnumSet<ReadOption> opts,
+        ClientMmapManager mmapManager) {
+    if ((!opts.contains(ReadOption.SKIP_CHECKSUMS)) &&
+          verifyChecksum && (!mlocked.get())) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("can't get an mmap for " + block + " of " + filename + 
+            " since SKIP_CHECKSUMS was not given, " +
+            "we aren't skipping checksums, and the block is not mlocked.");
+      }
+      return null;
+    }
     if (clientMmap == null) {
       if (mmapDisabled) {
         return null;
       }
       try {
-        clientMmap = mmapManager.fetch(datanodeID, block, dataIn);
+        clientMmap = mmapManager.fetch(datanodeID, block, streams[0]);
         if (clientMmap == null) {
           mmapDisabled = true;
           return null;
@@ -572,4 +709,24 @@ class BlockReaderLocal implements BlockR
     }
     return clientMmap;
   }
+
+  /**
+   * Set the mlocked state of the BlockReader.
+   * This method does NOT need to be synchronized because mlocked is atomic.
+   *
+   * @param mlocked  the new mlocked state of the BlockReader.
+   */
+  public void setMlocked(boolean mlocked) {
+    this.mlocked.set(mlocked);
+  }
+  
+  @VisibleForTesting
+  boolean getVerifyChecksum() {
+    return this.verifyChecksum;
+  }
+
+  @VisibleForTesting
+  int getMaxReadaheadLength() {
+    return this.maxReadaheadLength;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java Tue Dec 17 20:57:00 2013
@@ -24,10 +24,12 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.commons.logging.Log;
@@ -706,8 +708,8 @@ class BlockReaderLocalLegacy implements 
   }
 
   @Override
-  public ClientMmap getClientMmap(LocatedBlock curBlock,
-      ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
+        ClientMmapManager mmapManager) {
     return null;
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Dec 17 20:57:00 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.io.ByteBufferPool;
@@ -1073,9 +1074,18 @@ implements ByteBufferReadable, CanSetDro
         DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
             "the FileInputStreamCache.");
       }
-      return new BlockReaderLocal(dfsClient.getConf(), file,
-        block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum,
-        fileInputStreamCache);
+      return new BlockReaderLocal.Builder(dfsClient.getConf()).
+          setFilename(file).
+          setBlock(block).
+          setStartOffset(startOffset).
+          setStreams(fis).
+          setDatanodeID(chosenNode).
+          setVerifyChecksum(verifyChecksum).
+          setBlockMetadataHeader(BlockMetadataHeader.
+              preadHeader(fis[1].getChannel())).
+          setFileInputStreamCache(fileInputStreamCache).
+          setCachingStrategy(cachingStrategy).
+          build();
     }
     
     // If the legacy local block reader is enabled and we are reading a local
@@ -1479,23 +1489,19 @@ implements ByteBufferReadable, CanSetDro
             "at position " + pos);
       }
     }
-    boolean canSkipChecksums = opts.contains(ReadOption.SKIP_CHECKSUMS);
-    if (canSkipChecksums) {
-      ByteBuffer buffer = tryReadZeroCopy(maxLength);
-      if (buffer != null) {
-        return buffer;
-      }
+    ByteBuffer buffer = tryReadZeroCopy(maxLength, opts);
+    if (buffer != null) {
+      return buffer;
     }
-    ByteBuffer buffer = ByteBufferUtil.
-        fallbackRead(this, bufferPool, maxLength);
+    buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
     if (buffer != null) {
       extendedReadBuffers.put(buffer, bufferPool);
     }
     return buffer;
   }
 
-  private synchronized ByteBuffer tryReadZeroCopy(int maxLength)
-      throws IOException {
+  private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
+      EnumSet<ReadOption> opts) throws IOException {
     // Java ByteBuffers can't be longer than 2 GB, because they use
     // 4-byte signed integers to represent capacity, etc.
     // So we can't mmap the parts of the block higher than the 2 GB offset.
@@ -1518,8 +1524,7 @@ implements ByteBufferReadable, CanSetDro
     long blockPos = curPos - blockStartInFile;
     long limit = blockPos + length;
     ClientMmap clientMmap =
-        blockReader.getClientMmap(currentLocatedBlock,
-            dfsClient.getMmapManager());
+        blockReader.getClientMmap(opts, dfsClient.getMmapManager());
     if (clientMmap == null) {
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Tue Dec 17 20:57:00 2013
@@ -23,10 +23,12 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
@@ -490,8 +492,8 @@ public class RemoteBlockReader extends F
   }
 
   @Override
-  public ClientMmap getClientMmap(LocatedBlock curBlock,
-      ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
+        ClientMmapManager mmapManager) {
     return null;
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Tue Dec 17 20:57:00 2013
@@ -25,10 +25,12 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import java.util.EnumSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
@@ -455,8 +457,8 @@ public class RemoteBlockReader2  impleme
   }
 
   @Override
-  public ClientMmap getClientMmap(LocatedBlock curBlock,
-      ClientMmapManager manager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
+        ClientMmapManager mmapManager) {
     return null;
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Tue Dec 17 20:57:00 2013
@@ -21,10 +21,13 @@ import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
@@ -67,7 +70,29 @@ public class BlockMetadataHeader {
     return checksum;
   }
 
- 
+  /**
+   * Read the header without changing the position of the FileChannel.
+   *
+   * @param fc The FileChannel to read.
+   * @return the Metadata Header.
+   * @throws IOException on error.
+   */
+  public static BlockMetadataHeader preadHeader(FileChannel fc)
+      throws IOException {
+    byte arr[] = new byte[2 + DataChecksum.HEADER_LEN];
+    ByteBuffer buf = ByteBuffer.wrap(arr);
+
+    while (buf.hasRemaining()) {
+      if (fc.read(buf, 0) <= 0) {
+        throw new EOFException("unexpected EOF while reading " +
+            "metadata file header");
+      }
+    }
+    short version = (short)((arr[0] << 8) | (arr[1] & 0xff));
+    DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2);
+    return new BlockMetadataHeader(version, dataChecksum);
+  }
+
   /**
    * This reads all the fields till the beginning of checksum.
    * @param in 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Tue Dec 17 20:57:00 2013
@@ -1394,12 +1394,15 @@
   <name>dfs.client.cache.readahead</name>
   <value></value>
   <description>
-    Just like dfs.datanode.readahead.bytes, this setting causes the datanode to
+    When using remote reads, this setting causes the datanode to
     read ahead in the block file using posix_fadvise, potentially decreasing
     I/O wait times.  Unlike dfs.datanode.readahead.bytes, this is a client-side
     setting rather than a setting for the entire datanode.  If present, this
     setting will override the DataNode default.
 
+    When using local reads, this setting determines how much readahead we do in
+    BlockReaderLocal.
+
     If the native libraries are not available to the DataNode, this
     configuration has no effect.
   </description>

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Tue Dec 17 20:57:00 2013
@@ -58,6 +58,7 @@ import org.apache.hadoop.util.VersionInf
 
 import java.io.*;
 import java.net.*;
+import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
@@ -1059,4 +1060,10 @@ public class DFSTestUtil {
   public static void abortStream(DFSOutputStream out) throws IOException {
     out.abort();
   }
+
+  public static byte[] asArray(ByteBuffer buf) {
+    byte arr[] = new byte[buf.remaining()];
+    buf.duplicate().get(arr);
+    return arr;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Tue Dec 17 20:57:00 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
@@ -92,22 +94,35 @@ public class TestBlockReaderLocal {
     }
   }
 
-  private static interface BlockReaderLocalTest {
-    final int TEST_LENGTH = 12345;
+  private static class BlockReaderLocalTest {
+    final static int TEST_LENGTH = 12345;
+    final static int BYTES_PER_CHECKSUM = 512;
+
+    public void setConfiguration(HdfsConfiguration conf) {
+      // default: no-op
+    }
     public void setup(File blockFile, boolean usingChecksums)
-        throws IOException;
+        throws IOException {
+      // default: no-op
+    }
     public void doTest(BlockReaderLocal reader, byte original[])
-        throws IOException;
+        throws IOException {
+      // default: no-op
+    }
   }
   
   public void runBlockReaderLocalTest(BlockReaderLocalTest test,
-      boolean checksum) throws IOException {
+      boolean checksum, long readahead) throws IOException {
     MiniDFSCluster cluster = null;
     HdfsConfiguration conf = new HdfsConfiguration();
     conf.setBoolean(DFSConfigKeys.
         DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum);
+    conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+        BlockReaderLocalTest.BYTES_PER_CHECKSUM);
     conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
-    FileInputStream dataIn = null, checkIn = null;
+    conf.setLong(DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
+    test.setConfiguration(conf);
+    FileInputStream dataIn = null, metaIn = null;
     final Path TEST_PATH = new Path("/a");
     final long RANDOM_SEED = 4567L;
     BlockReaderLocal blockReaderLocal = null;
@@ -143,45 +158,51 @@ public class TestBlockReaderLocal {
       cluster.shutdown();
       cluster = null;
       test.setup(dataFile, checksum);
-      dataIn = new FileInputStream(dataFile);
-      checkIn = new FileInputStream(metaFile);
-      blockReaderLocal = new BlockReaderLocal(new DFSClient.Conf(conf),
-          TEST_PATH.getName(), block, 0, -1,
-          dataIn, checkIn, datanodeID, checksum, null);
+      FileInputStream streams[] = {
+          new FileInputStream(dataFile),
+          new FileInputStream(metaFile)
+      };
+      dataIn = streams[0];
+      metaIn = streams[1];
+      blockReaderLocal = new BlockReaderLocal.Builder(
+              new DFSClient.Conf(conf)).
+          setFilename(TEST_PATH.getName()).
+          setBlock(block).
+          setStreams(streams).
+          setDatanodeID(datanodeID).
+          setCachingStrategy(new CachingStrategy(false, readahead)).
+          setVerifyChecksum(checksum).
+          setBlockMetadataHeader(BlockMetadataHeader.preadHeader(
+              metaIn.getChannel())).
+          build();
       dataIn = null;
-      checkIn = null;
+      metaIn = null;
       test.doTest(blockReaderLocal, original);
+      // BlockReaderLocal should not alter the file position.
+      Assert.assertEquals(0, streams[0].getChannel().position());
+      Assert.assertEquals(0, streams[1].getChannel().position());
     } finally {
       if (fsIn != null) fsIn.close();
       if (fs != null) fs.close();
       if (cluster != null) cluster.shutdown();
       if (dataIn != null) dataIn.close();
-      if (checkIn != null) checkIn.close();
+      if (metaIn != null) metaIn.close();
       if (blockReaderLocal != null) blockReaderLocal.close();
     }
   }
   
   private static class TestBlockReaderLocalImmediateClose 
-      implements BlockReaderLocalTest {
-    @Override
-    public void setup(File blockFile, boolean usingChecksums)
-        throws IOException { }
-    @Override
-    public void doTest(BlockReaderLocal reader, byte original[]) 
-        throws IOException { }
+      extends BlockReaderLocalTest {
   }
   
   @Test
   public void testBlockReaderLocalImmediateClose() throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true);
-    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false);
+    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0);
+    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0);
   }
   
   private static class TestBlockReaderSimpleReads 
-      implements BlockReaderLocalTest {
-    @Override
-    public void setup(File blockFile, boolean usingChecksums)
-        throws IOException { }
+      extends BlockReaderLocalTest {
     @Override
     public void doTest(BlockReaderLocal reader, byte original[]) 
         throws IOException {
@@ -194,24 +215,43 @@ public class TestBlockReaderLocal {
       assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
       reader.readFully(buf, 1537, 514);
       assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
+      // Readahead is always at least the size of one chunk in this test.
+      Assert.assertTrue(reader.getMaxReadaheadLength() >=
+          BlockReaderLocalTest.BYTES_PER_CHECKSUM);
     }
   }
   
   @Test
   public void testBlockReaderSimpleReads() throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true);
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
+        DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderSimpleReadsShortReadahead() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
+        BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1);
   }
 
   @Test
   public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false);
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false,
+        DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderSimpleReadsNoReadahead() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0);
+  }
+
+  @Test
+  public void testBlockReaderSimpleReadsNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0);
   }
   
   private static class TestBlockReaderLocalArrayReads2 
-      implements BlockReaderLocalTest {
-    @Override
-    public void setup(File blockFile, boolean usingChecksums)
-        throws IOException { }
+      extends BlockReaderLocalTest {
     @Override
     public void doTest(BlockReaderLocal reader, byte original[]) 
         throws IOException {
@@ -234,21 +274,30 @@ public class TestBlockReaderLocal {
   @Test
   public void testBlockReaderLocalArrayReads2() throws IOException {
     runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
-        true);
+        true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
   }
 
   @Test
   public void testBlockReaderLocalArrayReads2NoChecksum()
       throws IOException {
     runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
-        false);
+        false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalArrayReads2NoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0);
   }
 
   private static class TestBlockReaderLocalByteBufferReads 
-      implements BlockReaderLocalTest {
-    @Override
-    public void setup(File blockFile, boolean usingChecksums)
-        throws IOException { }
+      extends BlockReaderLocalTest {
     @Override
     public void doTest(BlockReaderLocal reader, byte original[]) 
         throws IOException {
@@ -268,19 +317,105 @@ public class TestBlockReaderLocal {
   @Test
   public void testBlockReaderLocalByteBufferReads()
       throws IOException {
-    runBlockReaderLocalTest(
-        new TestBlockReaderLocalByteBufferReads(), true);
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+        true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
   }
 
   @Test
   public void testBlockReaderLocalByteBufferReadsNoChecksum()
       throws IOException {
     runBlockReaderLocalTest(
-        new TestBlockReaderLocalByteBufferReads(), false);
+        new TestBlockReaderLocalByteBufferReads(),
+        false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+  
+  @Test
+  public void testBlockReaderLocalByteBufferReadsNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+        false, 0);
+  }
+
+  /**
+   * Test reads that bypass the bounce buffer (because they are aligned
+   * and bigger than the readahead).
+   */
+  private static class TestBlockReaderLocalByteBufferFastLaneReads 
+      extends BlockReaderLocalTest {
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      ByteBuffer buf = ByteBuffer.allocateDirect(TEST_LENGTH);
+      readFully(reader, buf, 0, 5120);
+      buf.flip();
+      assertArrayRegionsEqual(original, 0,
+          DFSTestUtil.asArray(buf), 0,
+          5120);
+      reader.skip(1537);
+      readFully(reader, buf, 0, 1);
+      buf.flip();
+      assertArrayRegionsEqual(original, 6657,
+          DFSTestUtil.asArray(buf), 0,
+          1);
+      reader.setMlocked(true);
+      readFully(reader, buf, 0, 5120);
+      buf.flip();
+      assertArrayRegionsEqual(original, 6658,
+          DFSTestUtil.asArray(buf), 0,
+          5120);
+      reader.setMlocked(false);
+      readFully(reader, buf, 0, 513);
+      buf.flip();
+      assertArrayRegionsEqual(original, 11778,
+          DFSTestUtil.asArray(buf), 0,
+          513);
+      reader.skip(3);
+      readFully(reader, buf, 0, 50);
+      buf.flip();
+      assertArrayRegionsEqual(original, 12294,
+          DFSTestUtil.asArray(buf), 0,
+          50);
+    }
   }
   
+  @Test
+  public void testBlockReaderLocalByteBufferFastLaneReads()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+        true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(
+        new TestBlockReaderLocalByteBufferFastLaneReads(),
+        false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+        false, 0);
+  }
+
   private static class TestBlockReaderLocalReadCorruptStart
-      implements BlockReaderLocalTest {
+      extends BlockReaderLocalTest {
     boolean usingChecksums = false;
     @Override
     public void setup(File blockFile, boolean usingChecksums)
@@ -314,11 +449,12 @@ public class TestBlockReaderLocal {
   @Test
   public void testBlockReaderLocalReadCorruptStart()
       throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true);
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true,
+        DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
   }
   
   private static class TestBlockReaderLocalReadCorrupt
-      implements BlockReaderLocalTest {
+      extends BlockReaderLocalTest {
     boolean usingChecksums = false;
     @Override
     public void setup(File blockFile, boolean usingChecksums) 
@@ -364,8 +500,136 @@ public class TestBlockReaderLocal {
   @Test
   public void testBlockReaderLocalReadCorrupt()
       throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true);
-    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false);
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true,
+        DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadCorruptNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false,
+        DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadCorruptNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0);
+  }
+
+  private static class TestBlockReaderLocalWithMlockChanges
+      extends BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException {
+    }
+    
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+      reader.skip(1);
+      readFully(reader, buf, 1, 9);
+      assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
+      readFully(reader, buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+      reader.setMlocked(true);
+      readFully(reader, buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+      reader.setMlocked(false);
+      reader.skip(1); // skip from offset 810 to offset 811
+      readFully(reader, buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalWithMlockChanges()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+        true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalWithMlockChangesNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+        false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalWithMlockChangesNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+        false, 0);
+  }
+
+  private static class TestBlockReaderLocalOnFileWithoutChecksum
+      extends BlockReaderLocalTest {
+    @Override
+    public void setConfiguration(HdfsConfiguration conf) {
+      conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
+    }
+
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      Assert.assertTrue(!reader.getVerifyChecksum());
+      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+      reader.skip(1);
+      readFully(reader, buf, 1, 9);
+      assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
+      readFully(reader, buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+      reader.setMlocked(true);
+      readFully(reader, buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+      reader.setMlocked(false);
+      reader.skip(1); // skip from offset 810 to offset 811
+      readFully(reader, buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalOnFileWithoutChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+        true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+        false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+        false, 0);
   }
 
   @Test(timeout=60000)

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Tue Dec 17 20:57:00 2013
@@ -259,7 +259,6 @@ public class TestShortCircuitLocalRead {
       assertTrue("/ should be a directory", fs.getFileStatus(path)
           .isDirectory() == true);
       
-      // create a new file in home directory. Do not close it.
       byte[] fileData = AppendTestUtil.randomBytes(seed, size);
       Path file1 = fs.makeQualified(new Path("filelocal.dat"));
       FSDataOutputStream stm = createFile(fs, file1, 1);