You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2019/04/30 21:52:23 UTC

[hadoop] branch trunk updated: HDFS-3246: pRead equivalent for direct read path (#597)

This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4877f0a  HDFS-3246: pRead equivalent for direct read path (#597)
4877f0a is described below

commit 4877f0aa518832c37a06e6d3bd2c9552fc3141dc
Author: Sahil Takiar <sa...@users.noreply.github.com>
AuthorDate: Tue Apr 30 16:52:16 2019 -0500

    HDFS-3246: pRead equivalent for direct read path (#597)
    
    HDFS-3246: pRead equivalent for direct read path
    
    Contributed by Sahil Takiar
---
 .../apache/hadoop/crypto/CryptoInputStream.java    | 292 ++++++++++++++-------
 .../hadoop/fs/ByteBufferPositionedReadable.java    |  66 +++++
 .../org/apache/hadoop/fs/ByteBufferReadable.java   |  17 +-
 .../org/apache/hadoop/fs/FSDataInputStream.java    |  15 +-
 .../org/apache/hadoop/fs/StreamCapabilities.java   |   6 +
 .../hadoop/crypto/CryptoStreamsTestBase.java       | 185 ++++++++++++-
 .../apache/hadoop/crypto/TestCryptoStreams.java    |  35 ++-
 .../hadoop/crypto/TestCryptoStreamsForLocalFS.java |  10 +
 .../hadoop/crypto/TestCryptoStreamsNormal.java     |  10 +
 .../org/apache/hadoop/hdfs/DFSInputStream.java     |  14 +-
 .../src/main/native/libhdfs-tests/hdfs_test.h      |  18 ++
 .../main/native/libhdfs-tests/test_libhdfs_ops.c   | 142 +++++++++-
 .../src/main/native/libhdfs/hdfs.c                 | 210 +++++++++++++--
 .../apache/hadoop/hdfs/TestByteBufferPread.java    | 269 +++++++++++++++++++
 14 files changed, 1134 insertions(+), 155 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
index 67e8690..80364ce 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
@@ -64,7 +65,8 @@ import org.apache.hadoop.util.StringUtils;
 public class CryptoInputStream extends FilterInputStream implements 
     Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, 
     CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, 
-    ReadableByteChannel, CanUnbuffer, StreamCapabilities {
+    ReadableByteChannel, CanUnbuffer, StreamCapabilities,
+    ByteBufferPositionedReadable {
   private final byte[] oneByteBuf = new byte[1];
   private final CryptoCodec codec;
   private final Decryptor decryptor;
@@ -327,19 +329,39 @@ public class CryptoInputStream extends FilterInputStream implements
   public int read(long position, byte[] buffer, int offset, int length)
       throws IOException {
     checkStream();
-    try {
-      final int n = ((PositionedReadable) in).read(position, buffer, offset, 
-          length);
-      if (n > 0) {
-        // This operation does not change the current offset of the file
-        decrypt(position, buffer, offset, n);
-      }
-      
-      return n;
-    } catch (ClassCastException e) {
+    if (!(in instanceof PositionedReadable)) {
       throw new UnsupportedOperationException("This stream does not support " +
           "positioned read.");
     }
+    final int n = ((PositionedReadable) in).read(position, buffer, offset,
+        length);
+    if (n > 0) {
+      // This operation does not change the current offset of the file
+      decrypt(position, buffer, offset, n);
+    }
+
+    return n;
+  }
+
+  /**
+   * Positioned read using {@link ByteBuffer}s. This method is thread-safe.
+   */
+  @Override
+  public int read(long position, final ByteBuffer buf)
+      throws IOException {
+    checkStream();
+    if (!(in instanceof ByteBufferPositionedReadable)) {
+      throw new UnsupportedOperationException("This stream does not support " +
+          "positioned reads with byte buffers.");
+    }
+    int bufPos = buf.position();
+    final int n = ((ByteBufferPositionedReadable) in).read(position, buf);
+    if (n > 0) {
+      // This operation does not change the current offset of the file
+      decrypt(position, buf, n, bufPos);
+    }
+
+    return n;
   }
   
   /**
@@ -348,49 +370,124 @@ public class CryptoInputStream extends FilterInputStream implements
    */
   private void decrypt(long position, byte[] buffer, int offset, int length) 
       throws IOException {
-    ByteBuffer inBuffer = getBuffer();
-    ByteBuffer outBuffer = getBuffer();
+    ByteBuffer localInBuffer = null;
+    ByteBuffer localOutBuffer = null;
     Decryptor decryptor = null;
     try {
+      localInBuffer = getBuffer();
+      localOutBuffer = getBuffer();
       decryptor = getDecryptor();
       byte[] iv = initIV.clone();
       updateDecryptor(decryptor, position, iv);
       byte padding = getPadding(position);
-      inBuffer.position(padding); // Set proper position for input data.
+      localInBuffer.position(padding); // Set proper position for input data.
       
       int n = 0;
       while (n < length) {
-        int toDecrypt = Math.min(length - n, inBuffer.remaining());
-        inBuffer.put(buffer, offset + n, toDecrypt);
+        int toDecrypt = Math.min(length - n, localInBuffer.remaining());
+        localInBuffer.put(buffer, offset + n, toDecrypt);
         // Do decryption
-        decrypt(decryptor, inBuffer, outBuffer, padding);
+        decrypt(decryptor, localInBuffer, localOutBuffer, padding);
         
-        outBuffer.get(buffer, offset + n, toDecrypt);
+        localOutBuffer.get(buffer, offset + n, toDecrypt);
         n += toDecrypt;
-        padding = afterDecryption(decryptor, inBuffer, position + n, iv);
+        padding = afterDecryption(decryptor, localInBuffer, position + n, iv);
       }
     } finally {
-      returnBuffer(inBuffer);
-      returnBuffer(outBuffer);
+      returnBuffer(localInBuffer);
+      returnBuffer(localOutBuffer);
       returnDecryptor(decryptor);
     }
   }
-  
+
+  /**
+   * Decrypts the given {@link ByteBuffer} in place. {@code length} bytes are
+   * decrypted from {@code buf} starting at {@code start}.
+   * {@code buf.position()} and {@code buf.limit()} are unchanged after this
+   * method returns. This method is thread-safe.
+   *
+   * <p>
+   *   This method decrypts the input buf chunk-by-chunk and writes the
+   *   decrypted output back into the input buf. It uses two local buffers
+   *   taken from the {@link #bufferPool} to assist in this process: one is
+   *   designated as the input buffer and it stores a single chunk of the
+   *   given buf, the other is designated as the output buffer, which stores
+   *   the output of decrypting the input buffer. Both buffers are of size
+   *   {@link #bufferSize}.
+   * </p>
+   *
+   * <p>
+   *   Decryption is done by using a {@link Decryptor} and the
+   *   {@link #decrypt(Decryptor, ByteBuffer, ByteBuffer, byte)} method. Once
+   *   the decrypted data is written into the output buffer, is is copied back
+   *   into buf. Both buffers are returned back into the pool once the entire
+   *   buf is decrypted.
+   * </p>
+   *
+   * @param filePosition the current position of the file being read
+   * @param buf the {@link ByteBuffer} to decrypt
+   * @param length the number of bytes in {@code buf} to decrypt
+   * @param start the position in {@code buf} to start decrypting data from
+   */
+  private void decrypt(long filePosition, ByteBuffer buf, int length, int start)
+          throws IOException {
+    ByteBuffer localInBuffer = null;
+    ByteBuffer localOutBuffer = null;
+
+    // Duplicate the buffer so we don't have to worry about resetting the
+    // original position and limit at the end of the method
+    buf = buf.duplicate();
+
+    int decryptedBytes = 0;
+    Decryptor localDecryptor = null;
+    try {
+      localInBuffer = getBuffer();
+      localOutBuffer = getBuffer();
+      localDecryptor = getDecryptor();
+      byte[] localIV = initIV.clone();
+      updateDecryptor(localDecryptor, filePosition, localIV);
+      byte localPadding = getPadding(filePosition);
+      // Set proper filePosition for inputdata.
+      localInBuffer.position(localPadding);
+
+      while (decryptedBytes < length) {
+        buf.position(start + decryptedBytes);
+        buf.limit(start + decryptedBytes +
+                Math.min(length - decryptedBytes, localInBuffer.remaining()));
+        localInBuffer.put(buf);
+        // Do decryption
+        try {
+          decrypt(localDecryptor, localInBuffer, localOutBuffer, localPadding);
+          buf.position(start + decryptedBytes);
+          buf.limit(start + length);
+          decryptedBytes += localOutBuffer.remaining();
+          buf.put(localOutBuffer);
+        } finally {
+          localPadding = afterDecryption(localDecryptor, localInBuffer,
+                                         filePosition + length, localIV);
+        }
+      }
+    } finally {
+      returnBuffer(localInBuffer);
+      returnBuffer(localOutBuffer);
+      returnDecryptor(localDecryptor);
+    }
+  }
+
   /** Positioned read fully. It is thread-safe */
   @Override
   public void readFully(long position, byte[] buffer, int offset, int length)
       throws IOException {
     checkStream();
-    try {
-      ((PositionedReadable) in).readFully(position, buffer, offset, length);
-      if (length > 0) {
-        // This operation does not change the current offset of the file
-        decrypt(position, buffer, offset, length);
-      }
-    } catch (ClassCastException e) {
+    if (!(in instanceof PositionedReadable)) {
       throw new UnsupportedOperationException("This stream does not support " +
           "positioned readFully.");
     }
+    ((PositionedReadable) in).readFully(position, buffer, offset, length);
+    if (length > 0) {
+      // This operation does not change the current offset of the file
+      decrypt(position, buffer, offset, length);
+    }
   }
 
   @Override
@@ -405,23 +502,22 @@ public class CryptoInputStream extends FilterInputStream implements
       throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
     }
     checkStream();
-    try {
-      /*
-       * If data of target pos in the underlying stream has already been read 
-       * and decrypted in outBuffer, we just need to re-position outBuffer.
-       */
-      if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
-        int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
-        if (forward > 0) {
-          outBuffer.position(outBuffer.position() + forward);
-        }
-      } else {
-        ((Seekable) in).seek(pos);
-        resetStreamOffset(pos);
+    /*
+     * If data of target pos in the underlying stream has already been read
+     * and decrypted in outBuffer, we just need to re-position outBuffer.
+     */
+    if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
+      int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
+      if (forward > 0) {
+        outBuffer.position(outBuffer.position() + forward);
       }
-    } catch (ClassCastException e) {
-      throw new UnsupportedOperationException("This stream does not support " +
-          "seek.");
+    } else {
+      if (!(in instanceof Seekable)) {
+        throw new UnsupportedOperationException("This stream does not " +
+                "support seek.");
+      }
+      ((Seekable) in).seek(pos);
+      resetStreamOffset(pos);
     }
   }
   
@@ -519,31 +615,34 @@ public class CryptoInputStream extends FilterInputStream implements
   }
   
   /**
-   * Decrypt all data in buf: total n bytes from given start position.
-   * Output is also buf and same start position.
-   * buf.position() and buf.limit() should be unchanged after decryption.
+   * Decrypts the given {@link ByteBuffer} in place. {@code length} bytes are
+   * decrypted from {@code buf} starting at {@code start}.
+   * {@code buf.position()} and {@code buf.limit()} are unchanged after this
+   * method returns.
+   *
+   * @see #decrypt(long, ByteBuffer, int, int)
    */
-  private void decrypt(ByteBuffer buf, int n, int start) 
+  private void decrypt(ByteBuffer buf, int length, int start)
       throws IOException {
-    final int pos = buf.position();
-    final int limit = buf.limit();
-    int len = 0;
-    while (len < n) {
-      buf.position(start + len);
-      buf.limit(start + len + Math.min(n - len, inBuffer.remaining()));
+    buf = buf.duplicate();
+    int decryptedBytes = 0;
+    while (decryptedBytes < length) {
+      buf.position(start + decryptedBytes);
+      buf.limit(start + decryptedBytes +
+              Math.min(length - decryptedBytes, inBuffer.remaining()));
       inBuffer.put(buf);
       // Do decryption
       try {
         decrypt(decryptor, inBuffer, outBuffer, padding);
-        buf.position(start + len);
-        buf.limit(limit);
-        len += outBuffer.remaining();
+        buf.position(start + decryptedBytes);
+        buf.limit(start + length);
+        decryptedBytes += outBuffer.remaining();
         buf.put(outBuffer);
       } finally {
-        padding = afterDecryption(decryptor, inBuffer, streamOffset - (n - len), iv);
+        padding = afterDecryption(decryptor, inBuffer,
+                streamOffset - (length - decryptedBytes), iv);
       }
     }
-    buf.position(pos);
   }
   
   @Override
@@ -572,14 +671,13 @@ public class CryptoInputStream extends FilterInputStream implements
     Preconditions.checkArgument(targetPos >= 0, 
         "Cannot seek to negative offset.");
     checkStream();
-    try {
-      boolean result = ((Seekable) in).seekToNewSource(targetPos);
-      resetStreamOffset(targetPos);
-      return result;
-    } catch (ClassCastException e) {
+    if (!(in instanceof Seekable)) {
       throw new UnsupportedOperationException("This stream does not support " +
           "seekToNewSource.");
     }
+    boolean result = ((Seekable) in).seekToNewSource(targetPos);
+    resetStreamOffset(targetPos);
+    return result;
   }
 
   @Override
@@ -587,59 +685,59 @@ public class CryptoInputStream extends FilterInputStream implements
       EnumSet<ReadOption> opts) throws IOException,
       UnsupportedOperationException {
     checkStream();
-    try {
-      if (outBuffer.remaining() > 0) {
-        // Have some decrypted data unread, need to reset.
-        ((Seekable) in).seek(getPos());
-        resetStreamOffset(getPos());
+    if (outBuffer.remaining() > 0) {
+      if (!(in instanceof Seekable)) {
+        throw new UnsupportedOperationException("This stream does not " +
+                "support seek.");
       }
-      final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
-          read(bufferPool, maxLength, opts);
-      if (buffer != null) {
-        final int n = buffer.remaining();
-        if (n > 0) {
-          streamOffset += buffer.remaining(); // Read n bytes
-          final int pos = buffer.position();
-          decrypt(buffer, n, pos);
-        }
-      }
-      return buffer;
-    } catch (ClassCastException e) {
-      throw new UnsupportedOperationException("This stream does not support " + 
+      // Have some decrypted data unread, need to reset.
+      ((Seekable) in).seek(getPos());
+      resetStreamOffset(getPos());
+    }
+    if (!(in instanceof HasEnhancedByteBufferAccess)) {
+      throw new UnsupportedOperationException("This stream does not support " +
           "enhanced byte buffer access.");
     }
+    final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
+        read(bufferPool, maxLength, opts);
+    if (buffer != null) {
+      final int n = buffer.remaining();
+      if (n > 0) {
+        streamOffset += buffer.remaining(); // Read n bytes
+        final int pos = buffer.position();
+        decrypt(buffer, n, pos);
+      }
+    }
+    return buffer;
   }
 
   @Override
   public void releaseBuffer(ByteBuffer buffer) {
-    try {
-      ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
-    } catch (ClassCastException e) {
+    if (!(in instanceof HasEnhancedByteBufferAccess)) {
       throw new UnsupportedOperationException("This stream does not support " + 
           "release buffer.");
     }
+    ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
   }
 
   @Override
   public void setReadahead(Long readahead) throws IOException,
       UnsupportedOperationException {
-    try {
-      ((CanSetReadahead) in).setReadahead(readahead);
-    } catch (ClassCastException e) {
+    if (!(in instanceof CanSetReadahead)) {
       throw new UnsupportedOperationException("This stream does not support " +
           "setting the readahead caching strategy.");
     }
+    ((CanSetReadahead) in).setReadahead(readahead);
   }
 
   @Override
   public void setDropBehind(Boolean dropCache) throws IOException,
       UnsupportedOperationException {
-    try {
-      ((CanSetDropBehind) in).setDropBehind(dropCache);
-    } catch (ClassCastException e) {
+    if (!(in instanceof CanSetReadahead)) {
       throw new UnsupportedOperationException("This stream does not " +
           "support setting the drop-behind caching setting.");
     }
+    ((CanSetDropBehind) in).setDropBehind(dropCache);
   }
 
   @Override
@@ -737,11 +835,17 @@ public class CryptoInputStream extends FilterInputStream implements
   @Override
   public boolean hasCapability(String capability) {
     switch (StringUtils.toLowerCase(capability)) {
+    case StreamCapabilities.UNBUFFER:
+      return true;
     case StreamCapabilities.READAHEAD:
     case StreamCapabilities.DROPBEHIND:
-    case StreamCapabilities.UNBUFFER:
     case StreamCapabilities.READBYTEBUFFER:
-      return true;
+    case StreamCapabilities.PREADBYTEBUFFER:
+      if (!(in instanceof StreamCapabilities)) {
+        throw new UnsupportedOperationException("This stream does not expose " +
+          "its stream capabilities.");
+      }
+      return ((StreamCapabilities) in).hasCapability(capability);
     default:
       return false;
     }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
new file mode 100644
index 0000000..d99ee16
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Implementers of this interface provide a positioned read API that writes to a
+ * {@link ByteBuffer} rather than a {@code byte[]}.
+ *
+ * @see PositionedReadable
+ * @see ByteBufferReadable
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ByteBufferPositionedReadable {
+  /**
+   * Reads up to {@code buf.remaining()} bytes into buf from a given position
+   * in the file and returns the number of bytes read. Callers should use
+   * {@code buf.limit(...)} to control the size of the desired read and
+   * {@code buf.position(...)} to control the offset into the buffer the data
+   * should be written to.
+   * <p>
+   * After a successful call, {@code buf.position()} will be advanced by the
+   * number of bytes read and {@code buf.limit()} will be unchanged.
+   * <p>
+   * In the case of an exception, the state of the buffer (the contents of the
+   * buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
+   * undefined, and callers should be prepared to recover from this
+   * eventuality.
+   * <p>
+   * Callers should use {@link StreamCapabilities#hasCapability(String)} with
+   * {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying
+   * stream supports this interface, otherwise they might get a
+   * {@link UnsupportedOperationException}.
+   * <p>
+   * Implementations should treat 0-length requests as legitimate, and must not
+   * signal an error upon their receipt.
+   *
+   * @param position position within file
+   * @param buf the ByteBuffer to receive the results of the read operation.
+   * @return the number of bytes read, possibly zero, or -1 if reached
+   *         end-of-stream
+   * @throws IOException if there is some error performing the read
+   */
+  int read(long position, ByteBuffer buf) throws IOException;
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java
index 926b554..5a4ae04 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.fs;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -33,16 +34,18 @@ public interface ByteBufferReadable {
    * Reads up to buf.remaining() bytes into buf. Callers should use
    * buf.limit(..) to control the size of the desired read.
    * <p>
-   * After a successful call, buf.position() will be advanced by the number 
-   * of bytes read and buf.limit() should be unchanged.
+   * After a successful call, {@code buf.position()} will be advanced by the
+   * number of bytes read and {@code buf.limit()} will be unchanged.
    * <p>
-   * In the case of an exception, the values of buf.position() and buf.limit()
-   * are undefined, and callers should be prepared to recover from this
+   * In the case of an exception, the state of the buffer (the contents of the
+   * buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
+   * undefined, and callers should be prepared to recover from this
    * eventuality.
    * <p>
-   * Many implementations will throw {@link UnsupportedOperationException}, so
-   * callers that are not confident in support for this method from the
-   * underlying filesystem should be prepared to handle that exception.
+   * Callers should use {@link StreamCapabilities#hasCapability(String)} with
+   * {@link StreamCapabilities#READBYTEBUFFER} to check if the underlying
+   * stream supports this interface, otherwise they might get a
+   * {@link UnsupportedOperationException}.
    * <p>
    * Implementations should treat 0-length requests as legitimate, and must not
    * signal an error upon their receipt.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
index 62c45f1..066cc3d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
@@ -39,7 +39,8 @@ import org.apache.hadoop.util.IdentityHashStore;
 public class FSDataInputStream extends DataInputStream
     implements Seekable, PositionedReadable, 
       ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
-      HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
+      HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
+      ByteBufferPositionedReadable {
   /**
    * Map ByteBuffers that we have handed out to readers to ByteBufferPool 
    * objects
@@ -148,7 +149,8 @@ public class FSDataInputStream extends DataInputStream
       return ((ByteBufferReadable)in).read(buf);
     }
 
-    throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
+    throw new UnsupportedOperationException("Byte-buffer read unsupported " +
+            "by input stream");
   }
 
   @Override
@@ -247,4 +249,13 @@ public class FSDataInputStream extends DataInputStream
   public String toString() {
     return super.toString() + ": " + in;
   }
+
+  @Override
+  public int read(long position, ByteBuffer buf) throws IOException {
+    if (in instanceof ByteBufferPositionedReadable) {
+      return ((ByteBufferPositionedReadable) in).read(position, buf);
+    }
+    throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
+        "by input stream");
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
index c52d307..e68e7b3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
@@ -66,6 +66,12 @@ public interface StreamCapabilities {
   String READBYTEBUFFER = "in:readbytebuffer";
 
   /**
+   * Stream read(long, ByteBuffer) capability implemented by
+   * {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}.
+   */
+  String PREADBYTEBUFFER = "in:preadbytebuffer";
+
+  /**
    * Capabilities that a stream can support and be queried for.
    */
   @Deprecated
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
index a0eb105..7463d6c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
@@ -26,6 +26,7 @@ import java.nio.ByteOrder;
 import java.util.EnumSet;
 import java.util.Random;
 
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -129,6 +130,32 @@ public abstract class CryptoStreamsTestBase {
     Assert.assertArrayEquals(result, expectedData);
   }
 
+  private int byteBufferPreadAll(ByteBufferPositionedReadable in,
+                                 ByteBuffer buf) throws IOException {
+    int n = 0;
+    int total = 0;
+    while (n != -1) {
+      total += n;
+      if (!buf.hasRemaining()) {
+        break;
+      }
+      n = in.read(total, buf);
+    }
+
+    return total;
+  }
+
+  private void byteBufferPreadCheck(ByteBufferPositionedReadable in)
+          throws Exception {
+    ByteBuffer result = ByteBuffer.allocate(dataLen);
+    int n = byteBufferPreadAll(in, result);
+
+    Assert.assertEquals(dataLen, n);
+    ByteBuffer expectedData = ByteBuffer.allocate(n);
+    expectedData.put(data, 0, n);
+    Assert.assertArrayEquals(result.array(), expectedData.array());
+  }
+
   protected OutputStream getOutputStream(int bufferSize) throws IOException {
     return getOutputStream(bufferSize, key, iv);
   }
@@ -288,20 +315,36 @@ public abstract class CryptoStreamsTestBase {
     
     return total;
   }
+
+  private int readAll(InputStream in, long pos, ByteBuffer buf)
+      throws IOException {
+    int n = 0;
+    int total = 0;
+    while (n != -1) {
+      total += n;
+      if (!buf.hasRemaining()) {
+        break;
+      }
+      n = ((ByteBufferPositionedReadable) in).read(pos + total, buf);
+    }
+
+    return total;
+  }
   
   /** Test positioned read. */
   @Test(timeout=120000)
   public void testPositionedRead() throws Exception {
-    OutputStream out = getOutputStream(defaultBufferSize);
-    writeData(out);
+    try (OutputStream out = getOutputStream(defaultBufferSize)) {
+      writeData(out);
+    }
     
-    InputStream in = getInputStream(defaultBufferSize);
-    // Pos: 1/3 dataLen
-    positionedReadCheck(in , dataLen / 3);
+    try (InputStream in = getInputStream(defaultBufferSize)) {
+      // Pos: 1/3 dataLen
+      positionedReadCheck(in, dataLen / 3);
 
-    // Pos: 1/2 dataLen
-    positionedReadCheck(in, dataLen / 2);
-    in.close();
+      // Pos: 1/2 dataLen
+      positionedReadCheck(in, dataLen / 2);
+    }
   }
   
   private void positionedReadCheck(InputStream in, int pos) throws Exception {
@@ -315,6 +358,35 @@ public abstract class CryptoStreamsTestBase {
     System.arraycopy(data, pos, expectedData, 0, n);
     Assert.assertArrayEquals(readData, expectedData);
   }
+
+  /** Test positioned read with ByteBuffers. */
+  @Test(timeout=120000)
+  public void testPositionedReadWithByteBuffer() throws Exception {
+    try (OutputStream out = getOutputStream(defaultBufferSize)) {
+      writeData(out);
+    }
+
+    try (InputStream in = getInputStream(defaultBufferSize)) {
+      // Pos: 1/3 dataLen
+      positionedReadCheckWithByteBuffer(in, dataLen / 3);
+
+      // Pos: 1/2 dataLen
+      positionedReadCheckWithByteBuffer(in, dataLen / 2);
+    }
+  }
+
+  private void positionedReadCheckWithByteBuffer(InputStream in, int pos)
+          throws Exception {
+    ByteBuffer result = ByteBuffer.allocate(dataLen);
+    int n = readAll(in, pos, result);
+
+    Assert.assertEquals(dataLen, n + pos);
+    byte[] readData = new byte[n];
+    System.arraycopy(result.array(), 0, readData, 0, n);
+    byte[] expectedData = new byte[n];
+    System.arraycopy(data, pos, expectedData, 0, n);
+    Assert.assertArrayEquals(readData, expectedData);
+  }
   
   /** Test read fully */
   @Test(timeout=120000)
@@ -505,12 +577,40 @@ public abstract class CryptoStreamsTestBase {
     System.arraycopy(data, 0, expectedData, 0, n);
     Assert.assertArrayEquals(readData, expectedData);
   }
+
+  private void byteBufferPreadCheck(InputStream in, ByteBuffer buf,
+      int bufPos) throws Exception {
+    // Test reading from position 0
+    buf.position(bufPos);
+    int n = ((ByteBufferPositionedReadable) in).read(0, buf);
+    Assert.assertEquals(bufPos + n, buf.position());
+    byte[] readData = new byte[n];
+    buf.rewind();
+    buf.position(bufPos);
+    buf.get(readData);
+    byte[] expectedData = new byte[n];
+    System.arraycopy(data, 0, expectedData, 0, n);
+    Assert.assertArrayEquals(readData, expectedData);
+
+    // Test reading from half way through the data
+    buf.position(bufPos);
+    n = ((ByteBufferPositionedReadable) in).read(dataLen / 2, buf);
+    Assert.assertEquals(bufPos + n, buf.position());
+    readData = new byte[n];
+    buf.rewind();
+    buf.position(bufPos);
+    buf.get(readData);
+    expectedData = new byte[n];
+    System.arraycopy(data, dataLen / 2, expectedData, 0, n);
+    Assert.assertArrayEquals(readData, expectedData);
+  }
   
   /** Test byte buffer read with different buffer size. */
   @Test(timeout=120000)
   public void testByteBufferRead() throws Exception {
-    OutputStream out = getOutputStream(defaultBufferSize);
-    writeData(out);
+    try (OutputStream out = getOutputStream(defaultBufferSize)) {
+      writeData(out);
+    }
     
     // Default buffer size, initial buffer position is 0
     InputStream in = getInputStream(defaultBufferSize);
@@ -560,6 +660,53 @@ public abstract class CryptoStreamsTestBase {
     byteBufferReadCheck(in, buf, 11);
     in.close();
   }
+
+  /** Test byte buffer pread with different buffer size. */
+  @Test(timeout=120000)
+  public void testByteBufferPread() throws Exception {
+    try (OutputStream out = getOutputStream(defaultBufferSize)) {
+      writeData(out);
+    }
+
+    try (InputStream defaultBuf = getInputStream(defaultBufferSize);
+         InputStream smallBuf = getInputStream(smallBufferSize)) {
+
+      ByteBuffer buf = ByteBuffer.allocate(dataLen + 100);
+
+      // Default buffer size, initial buffer position is 0
+      byteBufferPreadCheck(defaultBuf, buf, 0);
+
+      // Default buffer size, initial buffer position is not 0
+      buf.clear();
+      byteBufferPreadCheck(defaultBuf, buf, 11);
+
+      // Small buffer size, initial buffer position is 0
+      buf.clear();
+      byteBufferPreadCheck(smallBuf, buf, 0);
+
+      // Small buffer size, initial buffer position is not 0
+      buf.clear();
+      byteBufferPreadCheck(smallBuf, buf, 11);
+
+      // Test with direct ByteBuffer
+      buf = ByteBuffer.allocateDirect(dataLen + 100);
+
+      // Direct buffer, default buffer size, initial buffer position is 0
+      byteBufferPreadCheck(defaultBuf, buf, 0);
+
+      // Direct buffer, default buffer size, initial buffer position is not 0
+      buf.clear();
+      byteBufferPreadCheck(defaultBuf, buf, 11);
+
+      // Direct buffer, small buffer size, initial buffer position is 0
+      buf.clear();
+      byteBufferPreadCheck(smallBuf, buf, 0);
+
+      // Direct buffer, small buffer size, initial buffer position is not 0
+      buf.clear();
+      byteBufferPreadCheck(smallBuf, buf, 11);
+    }
+  }
   
   @Test(timeout=120000)
   public void testCombinedOp() throws Exception {
@@ -797,5 +944,23 @@ public abstract class CryptoStreamsTestBase {
         // The close will be called when exiting this try-with-resource block
       }
     }
+
+    // Test ByteBuffer pread
+    try (InputStream in = getInputStream(smallBufferSize)) {
+      if (in instanceof ByteBufferPositionedReadable) {
+        ByteBufferPositionedReadable bbpin = (ByteBufferPositionedReadable) in;
+
+        // Test unbuffer after pread
+        byteBufferPreadCheck(bbpin);
+        ((CanUnbuffer) in).unbuffer();
+
+        // Test pread again after unbuffer
+        byteBufferPreadCheck(bbpin);
+
+        // Test close after unbuffer
+        ((CanUnbuffer) in).unbuffer();
+        // The close will be called when exiting this try-with-resource block
+      }
+    }
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
index cd7391a..8bcf46e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
@@ -180,7 +181,7 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
       implements Seekable, PositionedReadable, ByteBufferReadable,
                  HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
                  HasEnhancedByteBufferAccess, CanUnbuffer,
-                 StreamCapabilities {
+                 StreamCapabilities, ByteBufferPositionedReadable {
     private final byte[] oneByteBuf = new byte[1];
     private int pos = 0;
     private final byte[] data;
@@ -304,6 +305,32 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
     }
 
     @Override
+    public int read(long position, ByteBuffer buf) throws IOException {
+      if (buf == null) {
+        throw new NullPointerException();
+      } else if (!buf.hasRemaining()) {
+        return 0;
+      }
+
+      if (position > length) {
+        throw new IOException("Cannot read after EOF.");
+      }
+      if (position < 0) {
+        throw new IOException("Cannot read to negative offset.");
+      }
+
+      checkStream();
+
+      if (position < length) {
+        int n = (int) Math.min(buf.remaining(), length - position);
+        buf.put(data, (int) position, n);
+        return n;
+      }
+
+      return -1;
+    }
+
+    @Override
     public void readFully(long position, byte[] b, int off, int len)
         throws IOException {
       if (b == null) {
@@ -378,6 +405,8 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
       case StreamCapabilities.READAHEAD:
       case StreamCapabilities.DROPBEHIND:
       case StreamCapabilities.UNBUFFER:
+      case StreamCapabilities.READBYTEBUFFER:
+      case StreamCapabilities.PREADBYTEBUFFER:
         return true;
       default:
         return false;
@@ -439,7 +468,9 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
         new String[] {
             StreamCapabilities.DROPBEHIND,
             StreamCapabilities.READAHEAD,
-            StreamCapabilities.UNBUFFER
+            StreamCapabilities.UNBUFFER,
+            StreamCapabilities.READBYTEBUFFER,
+            StreamCapabilities.PREADBYTEBUFFER
         },
         new String[] {
             StreamCapabilities.HFLUSH,
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
index bb3fd7a..e7d922e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
@@ -90,11 +90,21 @@ public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase {
   @Override
   @Test(timeout=10000)
   public void testByteBufferRead() throws Exception {}
+
+  @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+  @Override
+  @Test(timeout=10000)
+  public void testPositionedReadWithByteBuffer() throws IOException {}
   
   @Ignore("ChecksumFSOutputSummer doesn't support Syncable")
   @Override
   @Test(timeout=10000)
   public void testSyncable() throws IOException {}
+
+  @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+  @Override
+  @Test(timeout=10000)
+  public void testByteBufferPread() throws IOException {}
   
   @Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read")
   @Override
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
index 7e30077..036706f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
@@ -91,6 +91,11 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase {
   @Test(timeout=10000)
   public void testPositionedRead() throws IOException {}
 
+  @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+  @Override
+  @Test(timeout=10000)
+  public void testPositionedReadWithByteBuffer() throws IOException {}
+
   @Ignore("Wrapped stream doesn't support ReadFully")
   @Override
   @Test(timeout=10000)
@@ -105,6 +110,11 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase {
   @Override
   @Test(timeout=10000)
   public void testByteBufferRead() throws IOException {}
+
+  @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+  @Override
+  @Test(timeout=10000)
+  public void testByteBufferPread() throws IOException {}
   
   @Ignore("Wrapped stream doesn't support ByteBufferRead, Seek")
   @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index a3e2ad5..2262295 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.ByteBufferUtil;
 import org.apache.hadoop.fs.CanSetDropBehind;
@@ -84,7 +85,6 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.IdentityHashStore;
 import org.apache.hadoop.util.StopWatch;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.htrace.core.SpanId;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -99,7 +99,8 @@ import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream
     implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
-               HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
+               HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
+               ByteBufferPositionedReadable {
   @VisibleForTesting
   public static boolean tcpReadsDisabledForTesting = false;
   private long hedgedReadOpsLoopNumForTesting = 0;
@@ -1561,6 +1562,14 @@ public class DFSInputStream extends FSInputStream
     throw new IOException("Mark/reset not supported");
   }
 
+  @Override
+  public int read(long position, final ByteBuffer buf) throws IOException {
+    if (!buf.hasRemaining()) {
+      return 0;
+    }
+    return pread(position, buf);
+  }
+
   /** Utility class to encapsulate data node info and its address. */
   static final class DNAddrPair {
     final DatanodeInfo info;
@@ -1780,6 +1789,7 @@ public class DFSInputStream extends FSInputStream
     case StreamCapabilities.DROPBEHIND:
     case StreamCapabilities.UNBUFFER:
     case StreamCapabilities.READBYTEBUFFER:
+    case StreamCapabilities.PREADBYTEBUFFER:
       return true;
     default:
       return false;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h
index 0eab9a6..f003263 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h
@@ -50,6 +50,24 @@ extern  "C" {
     void hdfsFileDisableDirectRead(struct hdfsFile_internal *file);
 
     /**
+    * Determine if a file is using the "direct pread" optimization.
+    *
+    * @param file     The HDFS file
+    * @return         1 if the file is using the direct pread optimization,
+    *                 0 otherwise.
+    */
+    int hdfsFileUsesDirectPread(struct hdfsFile_internal *file);
+
+    /**
+     * Disable the direct pread optimization for a file.
+     *
+     * This is mainly provided for unit testing purposes.
+     *
+     * @param file     The HDFS file
+     */
+    void hdfsFileDisableDirectPread(struct hdfsFile_internal *file);
+
+    /**
      * Disable domain socket security checks.
      *
      * @param          0 if domain socket security was disabled;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
index 1cd497b..ebf0dd7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
@@ -88,9 +88,9 @@ int main(int argc, char **argv) {
     const char *userPath = "/tmp/usertestfile.txt";
 
     char buffer[32], buffer2[256], rdbuffer[32];
-    tSize num_written_bytes, num_read_bytes;
+    tSize num_written_bytes, num_read_bytes, num_pread_bytes;
     hdfsFS fs, lfs;
-    hdfsFile writeFile, readFile, localFile, appendFile, userFile;
+    hdfsFile writeFile, readFile, preadFile, localFile, appendFile, userFile;
     tOffset currentPos, seekPos;
     int exists, totalResult, result, numEntries, i, j;
     const char *resp;
@@ -145,7 +145,7 @@ int main(int argc, char **argv) {
     } 
 
     {
-        //Write tests
+        // Write tests
         
         writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
         if(!writeFile) {
@@ -188,7 +188,7 @@ int main(int argc, char **argv) {
     }
 
     {
-        //Read tests
+        // Read tests
         
         exists = hdfsExists(fs, readPath);
 
@@ -250,6 +250,7 @@ int main(int argc, char **argv) {
         }
         fprintf(stderr, "Read (direct) following %d bytes:\n%s\n",
                 num_read_bytes, buffer);
+        memset(buffer, 0, strlen(fileContents + 1));
         if (hdfsSeek(fs, readFile, 0L)) {
             fprintf(stderr, "Failed to seek to file start!\n");
             shutdown_and_exit(cl, -1);
@@ -259,17 +260,27 @@ int main(int argc, char **argv) {
         // read path
         hdfsFileDisableDirectRead(readFile);
 
-        num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, 
-                sizeof(buffer));
-        fprintf(stderr, "Read following %d bytes:\n%s\n", 
-                num_read_bytes, buffer);
+        if (hdfsFileUsesDirectRead(readFile)) {
+            fprintf(stderr, "Disabled direct reads, but it is still enabled");
+            shutdown_and_exit(cl, -1);
+        }
 
-        memset(buffer, 0, strlen(fileContents + 1));
+        if (!hdfsFileUsesDirectPread(readFile)) {
+            fprintf(stderr, "Disabled direct reads, but direct preads was "
+                            "disabled as well");
+            shutdown_and_exit(cl, -1);
+        }
 
-        num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer, 
+        num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
                 sizeof(buffer));
-        fprintf(stderr, "Read following %d bytes:\n%s\n", 
+        if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+            fprintf(stderr, "Failed to read. Expected %s but got %s (%d bytes)\n",
+                    fileContents, buffer, num_read_bytes);
+            shutdown_and_exit(cl, -1);
+        }
+        fprintf(stderr, "Read following %d bytes:\n%s\n",
                 num_read_bytes, buffer);
+        memset(buffer, 0, strlen(fileContents + 1));
 
         hdfsCloseFile(fs, readFile);
 
@@ -295,6 +306,115 @@ int main(int argc, char **argv) {
         hdfsCloseFile(lfs, localFile);
     }
 
+    {
+        // Pread tests
+
+        exists = hdfsExists(fs, readPath);
+
+        if (exists) {
+            fprintf(stderr, "Failed to validate existence of %s\n", readPath);
+            shutdown_and_exit(cl, -1);
+        }
+
+        preadFile = hdfsOpenFile(fs, readPath, O_RDONLY, 0, 0, 0);
+        if (!preadFile) {
+            fprintf(stderr, "Failed to open %s for reading!\n", readPath);
+            shutdown_and_exit(cl, -1);
+        }
+
+        if (!hdfsFileIsOpenForRead(preadFile)) {
+            fprintf(stderr, "hdfsFileIsOpenForRead: we just opened a file "
+                            "with O_RDONLY, and it did not show up as 'open for "
+                            "read'\n");
+            shutdown_and_exit(cl, -1);
+        }
+
+        fprintf(stderr, "hdfsAvailable: %d\n", hdfsAvailable(fs, preadFile));
+
+        num_pread_bytes = hdfsPread(fs, preadFile, 0, (void*)buffer, sizeof(buffer));
+        if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+            fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n",
+                    fileContents, buffer, num_read_bytes);
+            shutdown_and_exit(cl, -1);
+        }
+        fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n",
+                num_pread_bytes, buffer);
+        memset(buffer, 0, strlen(fileContents + 1));
+        if (hdfsTell(fs, preadFile) != 0) {
+            fprintf(stderr, "Pread changed position of file\n");
+            shutdown_and_exit(cl, -1);
+        }
+
+        // Test pread midway through the file rather than at the beginning
+        const char *fileContentsChunk = "World!";
+        num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer));
+        if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) {
+            fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n",
+                    fileContentsChunk, buffer, num_read_bytes);
+            shutdown_and_exit(cl, -1);
+        }
+        fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", num_pread_bytes, buffer);
+        memset(buffer, 0, strlen(fileContents + 1));
+        if (hdfsTell(fs, preadFile) != 0) {
+            fprintf(stderr, "Pread changed position of file\n");
+            shutdown_and_exit(cl, -1);
+        }
+
+        // Disable the direct pread path so that we really go through the slow
+        // read path
+        hdfsFileDisableDirectPread(preadFile);
+
+        if (hdfsFileUsesDirectPread(preadFile)) {
+            fprintf(stderr, "Disabled direct preads, but it is still enabled");
+            shutdown_and_exit(cl, -1);
+        }
+
+        if (!hdfsFileUsesDirectRead(preadFile)) {
+            fprintf(stderr, "Disabled direct preads, but direct read was "
+                            "disabled as well");
+            shutdown_and_exit(cl, -1);
+        }
+
+        num_pread_bytes = hdfsPread(fs, preadFile, 0, (void*)buffer, sizeof(buffer));
+        if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+            fprintf(stderr, "Failed to pread. Expected %s but got %s (%d bytes)\n",
+                    fileContents, buffer, num_pread_bytes);
+            shutdown_and_exit(cl, -1);
+        }
+        fprintf(stderr, "Pread following %d bytes:\n%s\n", num_pread_bytes, buffer);
+        memset(buffer, 0, strlen(fileContents + 1));
+        if (hdfsTell(fs, preadFile) != 0) {
+            fprintf(stderr, "Pread changed position of file\n");
+            shutdown_and_exit(cl, -1);
+        }
+
+        num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer));
+        if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) {
+            fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n",
+                    fileContentsChunk, buffer, num_read_bytes);
+            shutdown_and_exit(cl, -1);
+        }
+        fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", num_pread_bytes, buffer);
+        memset(buffer, 0, strlen(fileContents + 1));
+        if (hdfsTell(fs, preadFile) != 0) {
+            fprintf(stderr, "Pread changed position of file\n");
+            shutdown_and_exit(cl, -1);
+        }
+
+        hdfsCloseFile(fs, preadFile);
+
+        // Test correct behaviour for unsupported filesystems
+        localFile = hdfsOpenFile(lfs, writePath, O_RDONLY, 0, 0, 0);
+
+        if (hdfsFileUsesDirectPread(localFile)) {
+            fprintf(stderr, "Direct pread support incorrectly detected for local "
+                            "filesystem\n");
+            shutdown_and_exit(cl, -1);
+        }
+
+        hdfsCloseFile(lfs, localFile);
+    }
+
     totalResult = 0;
     result = 0;
     {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
index e212f21..2fd0592 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
@@ -40,8 +40,23 @@
 
 // Bit fields for hdfsFile_internal flags
 #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
+#define HDFS_FILE_SUPPORTS_DIRECT_PREAD (1<<1)
 
+/**
+ * Reads bytes using the read(ByteBuffer) API. By using Java
+ * DirectByteBuffers we can avoid copying the bytes onto the Java heap.
+ * Instead the data will be directly copied from kernel space to the C heap.
+ */
 tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
+
+/**
+ * Reads bytes using the read(long, ByteBuffer) API. By using Java
+ * DirectByteBuffers we can avoid copying the bytes onto the Java heap.
+ * Instead the data will be directly copied from kernel space to the C heap.
+ */
+tSize preadDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer,
+                  tSize length);
+
 static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);
 
 /**
@@ -285,7 +300,7 @@ int hdfsFileIsOpenForWrite(hdfsFile file)
 
 int hdfsFileUsesDirectRead(hdfsFile file)
 {
-    return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ);
+    return (file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) != 0;
 }
 
 void hdfsFileDisableDirectRead(hdfsFile file)
@@ -293,6 +308,17 @@ void hdfsFileDisableDirectRead(hdfsFile file)
     file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
 }
 
+int hdfsFileUsesDirectPread(hdfsFile file)
+{
+    return (file->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) != 0;
+}
+
+void hdfsFileDisableDirectPread(hdfsFile file)
+{
+    file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_PREAD;
+}
+
+
 int hdfsDisableDomainSocketSecurity(void)
 {
     jthrowable jthr;
@@ -985,6 +1011,62 @@ int hdfsStreamBuilderSetDefaultBlockSize(struct hdfsStreamBuilder *bld,
     return 0;
 }
 
+/**
+ * Delegates to FsDataInputStream#hasCapability(String). Used to check if a
+ * given input stream supports certain methods, such as
+ * ByteBufferReadable#read(ByteBuffer).
+ *
+ * @param jFile the FsDataInputStream to call hasCapability on
+ * @param capability the name of the capability to query; for a full list of
+ *        possible values see StreamCapabilities
+ *
+ * @return true if the given jFile has the given capability, false otherwise
+ *
+ * @see org.apache.hadoop.fs.StreamCapabilities
+ */
+static int hdfsHasStreamCapability(jobject jFile,
+        const char *capability) {
+    int ret = 0;
+    jthrowable jthr = NULL;
+    jvalue jVal;
+    jstring jCapabilityString = NULL;
+
+    /* Get the JNIEnv* corresponding to current thread */
+    JNIEnv* env = getJNIEnv();
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return 0;
+    }
+
+    jthr = newJavaStr(env, capability, &jCapabilityString);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hdfsHasStreamCapability(%s): newJavaStr", capability);
+        goto done;
+    }
+    jthr = invokeMethod(env, &jVal, INSTANCE, jFile,
+            JC_FS_DATA_INPUT_STREAM, "hasCapability", "(Ljava/lang/String;)Z",
+            jCapabilityString);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hdfsHasStreamCapability(%s): FSDataInputStream#hasCapability",
+                capability);
+        goto done;
+    }
+
+done:
+    destroyLocalReference(env, jthr);
+    destroyLocalReference(env, jCapabilityString);
+    if (ret) {
+        errno = ret;
+        return 0;
+    }
+    if (jVal.z == JNI_TRUE) {
+        return 1;
+    }
+    return 0;
+}
+
 static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
                   int32_t bufferSize, int16_t replication, int64_t blockSize)
 {
@@ -995,7 +1077,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
        return f{is|os};
     */
     int accmode = flags & O_ACCMODE;
-    jstring jStrBufferSize = NULL, jStrReplication = NULL, jCapabilityString = NULL;
+    jstring jStrBufferSize = NULL, jStrReplication = NULL;
     jobject jConfiguration = NULL, jPath = NULL, jFile = NULL;
     jobject jFS = (jobject)fs;
     jthrowable jthr;
@@ -1024,13 +1106,15 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
       errno = ENOTSUP;
       return NULL;
     } else {
-      fprintf(stderr, "ERROR: cannot open an hdfs file in mode 0x%x\n", accmode);
+      fprintf(stderr, "ERROR: cannot open an hdfs file in mode 0x%x\n",
+              accmode);
       errno = EINVAL;
       return NULL;
     }
 
     if ((flags & O_CREAT) && (flags & O_EXCL)) {
-      fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
+      fprintf(stderr,
+              "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
     }
 
     if (accmode == O_RDONLY) {
@@ -1153,34 +1237,26 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
     file->flags = 0;
 
     if ((flags & O_WRONLY) == 0) {
-        // Check the StreamCapabilities of jFile to see if we can do direct reads
-        jthr = newJavaStr(env, "in:readbytebuffer", &jCapabilityString);
-        if (jthr) {
-            ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-                                        "hdfsOpenFile(%s): newJavaStr", path);
-            goto done;
-        }
-        jthr = invokeMethod(env, &jVal, INSTANCE, jFile,
-                JC_FS_DATA_INPUT_STREAM, "hasCapability",
-                "(Ljava/lang/String;)Z", jCapabilityString);
-        if (jthr) {
-            ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
-                    "hdfsOpenFile(%s): FSDataInputStream#hasCapability", path);
-            goto done;
-        }
-        if (jVal.z) {
+        // Check the StreamCapabilities of jFile to see if we can do direct
+        // reads
+        if (hdfsHasStreamCapability(jFile, "in:readbytebuffer")) {
             file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
         }
+
+        // Check the StreamCapabilities of jFile to see if we can do direct
+        // preads
+        if (hdfsHasStreamCapability(jFile, "in:preadbytebuffer")) {
+            file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD;
+        }
     }
     ret = 0;
 
 done:
     destroyLocalReference(env, jStrBufferSize);
     destroyLocalReference(env, jStrReplication);
-    destroyLocalReference(env, jConfiguration); 
-    destroyLocalReference(env, jPath); 
+    destroyLocalReference(env, jConfiguration);
+    destroyLocalReference(env, jPath);
     destroyLocalReference(env, jFile);
-    destroyLocalReference(env, jCapabilityString);
     if (ret) {
         if (file) {
             if (file->file) {
@@ -1385,6 +1461,13 @@ static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f,
     return 0;
 }
 
+/**
+ * If the underlying stream supports the ByteBufferReadable interface then
+ * this method will transparently use read(ByteBuffer). This can help
+ * improve performance as it avoids unnecessarily copying data on to the Java
+ * heap. Instead the data will be directly copied from kernel space to the C
+ * heap.
+ */
 tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
 {
     jobject jInputStream;
@@ -1459,12 +1542,11 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
     return jVal.i;
 }
 
-// Reads using the read(ByteBuffer) API, which does fewer copies
 tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
 {
     // JAVA EQUIVALENT:
-    //  ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer
-    //  fis.read(bbuffer);
+    //  ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer
+    //  fis.read(buf);
 
     jobject jInputStream;
     jvalue jVal;
@@ -1499,9 +1581,25 @@ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
             "readDirect: FSDataInputStream#read");
         return -1;
     }
-    return (jVal.i < 0) ? 0 : jVal.i;
+    // Reached EOF, return 0
+    if (jVal.i < 0) {
+        return 0;
+    }
+    // 0 bytes read, return error
+    if (jVal.i == 0) {
+        errno = EINTR;
+        return -1;
+    }
+    return jVal.i;
 }
 
+/**
+ * If the underlying stream supports the ByteBufferPositionedReadable
+ * interface then this method will transparently use read(long, ByteBuffer).
+ * This can help improve performance as it avoids unnecessarily copying data
+ * on to the Java heap. Instead the data will be directly copied from kernel
+ * space to the C heap.
+ */
 tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
                 void* buffer, tSize length)
 {
@@ -1521,6 +1619,10 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
         return -1;
     }
 
+    if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) {
+      return preadDirect(fs, f, position, buffer, length);
+    }
+
     env = getJNIEnv();
     if (env == NULL) {
       errno = EINTERNAL;
@@ -1571,6 +1673,60 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
     return jVal.i;
 }
 
+tSize preadDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer,
+                  tSize length)
+{
+    // JAVA EQUIVALENT:
+    //  ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer
+    //  fis.read(position, buf);
+
+    jvalue jVal;
+    jthrowable jthr;
+    jobject bb;
+
+    //Get the JNIEnv* corresponding to current thread
+    JNIEnv* env = getJNIEnv();
+    if (env == NULL) {
+      errno = EINTERNAL;
+      return -1;
+    }
+
+    //Error checking... make sure that this file is 'readable'
+    if (f->type != HDFS_STREAM_INPUT) {
+        fprintf(stderr, "Cannot read from a non-InputStream object!\n");
+        errno = EINVAL;
+        return -1;
+    }
+
+    //Read the requisite bytes
+    bb = (*env)->NewDirectByteBuffer(env, buffer, length);
+    if (bb == NULL) {
+        errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+            "readDirect: NewDirectByteBuffer");
+        return -1;
+    }
+
+    jthr = invokeMethod(env, &jVal, INSTANCE, f->file,
+            JC_FS_DATA_INPUT_STREAM, "read", "(JLjava/nio/ByteBuffer;)I",
+            position, bb);
+    destroyLocalReference(env, bb);
+    if (jthr) {
+       errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+           "preadDirect: FSDataInputStream#read");
+       return -1;
+    }
+    // Reached EOF, return 0
+    if (jVal.i < 0) {
+        return 0;
+    }
+    // 0 bytes read, return error
+    if (jVal.i == 0) {
+        errno = EINTR;
+        return -1;
+    }
+    return jVal.i;
+}
+
 tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
 {
     // JAVA EQUIVALENT
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java
new file mode 100644
index 0000000..4547db1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class tests the DFS positional read functionality on a single node
+ * mini-cluster. These tests are inspired from {@link TestPread}. The tests
+ * are much less comprehensive than other pread tests because pread already
+ * internally uses {@link ByteBuffer}s.
+ */
+public class TestByteBufferPread {
+
+  private static MiniDFSCluster cluster;
+  private static FileSystem fs;
+  private static byte[] fileContents;
+  private static Path testFile;
+  private static Random rand;
+
+  private static final long SEED = 0xDEADBEEFL;
+  private static final int BLOCK_SIZE = 4096;
+  private static final int FILE_SIZE = 12 * BLOCK_SIZE;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    // Setup the cluster with a small block size so we can create small files
+    // that span multiple blocks
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    fs = cluster.getFileSystem();
+
+    // Create a test file that spans 12 blocks, and contains a bunch of random
+    // bytes
+    fileContents = new byte[FILE_SIZE];
+    rand = new Random(SEED);
+    rand.nextBytes(fileContents);
+    testFile = new Path("/byte-buffer-pread-test.dat");
+    try (FSDataOutputStream out = fs.create(testFile, (short) 3)) {
+      out.write(fileContents);
+    }
+  }
+
+  /**
+   * Test preads with {@link java.nio.HeapByteBuffer}s.
+   */
+  @Test
+  public void testPreadWithHeapByteBuffer() throws IOException {
+    testPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+    testPreadWithFullByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+    testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+    testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+    testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+  }
+
+  /**
+   * Test preads with {@link java.nio.DirectByteBuffer}s.
+   */
+  @Test
+  public void testPreadWithDirectByteBuffer() throws IOException {
+    testPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+    testPreadWithFullByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+    testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+    testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+    testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+  }
+
+  /**
+   * Reads the entire testFile using the pread API and validates that its
+   * contents are properly loaded into the supplied {@link ByteBuffer}.
+   */
+  private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException {
+    int bytesRead;
+    int totalBytesRead = 0;
+    try (FSDataInputStream in = fs.open(testFile)) {
+      while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
+        totalBytesRead += bytesRead;
+        // Check that each call to read changes the position of the ByteBuffer
+        // correctly
+        assertEquals(totalBytesRead, buffer.position());
+      }
+
+      // Make sure the buffer is full
+      assertFalse(buffer.hasRemaining());
+      // Make sure the contents of the read buffer equal the contents of the
+      // file
+      buffer.position(0);
+      byte[] bufferContents = new byte[FILE_SIZE];
+      buffer.get(bufferContents);
+      assertArrayEquals(bufferContents, fileContents);
+      buffer.position(buffer.limit());
+    }
+  }
+
+  /**
+   * Attempts to read the testFile into a {@link ByteBuffer} that is already
+   * full, and validates that doing so does not change the contents of the
+   * supplied {@link ByteBuffer}.
+   */
+  private void testPreadWithFullByteBuffer(ByteBuffer buffer)
+          throws IOException {
+    // Load some dummy data into the buffer
+    byte[] existingBufferBytes = new byte[FILE_SIZE];
+    rand.nextBytes(existingBufferBytes);
+    buffer.put(existingBufferBytes);
+    // Make sure the buffer is full
+    assertFalse(buffer.hasRemaining());
+
+    try (FSDataInputStream in = fs.open(testFile)) {
+      // Attempt to read into the buffer, 0 bytes should be read since the
+      // buffer is full
+      assertEquals(0, in.read(buffer));
+
+      // Double check the buffer is still full and its contents have not
+      // changed
+      assertFalse(buffer.hasRemaining());
+      buffer.position(0);
+      byte[] bufferContents = new byte[FILE_SIZE];
+      buffer.get(bufferContents);
+      assertArrayEquals(bufferContents, existingBufferBytes);
+    }
+  }
+
+  /**
+   * Reads half of the testFile into the {@link ByteBuffer} by setting a
+   * {@link ByteBuffer#limit} on the buffer. Validates that only half of the
+   * testFile is loaded into the buffer.
+   */
+  private void testPreadWithLimitedByteBuffer(
+          ByteBuffer buffer) throws IOException {
+    int bytesRead;
+    int totalBytesRead = 0;
+    // Set the buffer limit to half the size of the file
+    buffer.limit(FILE_SIZE / 2);
+
+    try (FSDataInputStream in = fs.open(testFile)) {
+      while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
+        totalBytesRead += bytesRead;
+        // Check that each call to read changes the position of the ByteBuffer
+        // correctly
+        assertEquals(totalBytesRead, buffer.position());
+      }
+
+      // Since we set the buffer limit to half the size of the file, we should
+      // have only read half of the file into the buffer
+      assertEquals(totalBytesRead, FILE_SIZE / 2);
+      // Check that the buffer is full and the contents equal the first half of
+      // the file
+      assertFalse(buffer.hasRemaining());
+      buffer.position(0);
+      byte[] bufferContents = new byte[FILE_SIZE / 2];
+      buffer.get(bufferContents);
+      assertArrayEquals(bufferContents,
+              Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
+    }
+  }
+
+  /**
+   * Reads half of the testFile into the {@link ByteBuffer} by setting the
+   * {@link ByteBuffer#position} the half the size of the file. Validates that
+   * only half of the testFile is loaded into the buffer.
+   */
+  private void testPreadWithPositionedByteBuffer(
+          ByteBuffer buffer) throws IOException {
+    int bytesRead;
+    int totalBytesRead = 0;
+    // Set the buffer position to half the size of the file
+    buffer.position(FILE_SIZE / 2);
+
+    try (FSDataInputStream in = fs.open(testFile)) {
+      while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
+        totalBytesRead += bytesRead;
+        // Check that each call to read changes the position of the ByteBuffer
+        // correctly
+        assertEquals(totalBytesRead + FILE_SIZE / 2, buffer.position());
+      }
+
+      // Since we set the buffer position to half the size of the file, we
+      // should have only read half of the file into the buffer
+      assertEquals(totalBytesRead, FILE_SIZE / 2);
+      // Check that the buffer is full and the contents equal the first half of
+      // the file
+      assertFalse(buffer.hasRemaining());
+      buffer.position(FILE_SIZE / 2);
+      byte[] bufferContents = new byte[FILE_SIZE / 2];
+      buffer.get(bufferContents);
+      assertArrayEquals(bufferContents,
+              Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
+    }
+  }
+
+  /**
+   * Reads half of the testFile into the {@link ByteBuffer} by specifying a
+   * position for the pread API that is half of the file size. Validates that
+   * only half of the testFile is loaded into the buffer.
+   */
+  private void testPositionedPreadWithByteBuffer(
+          ByteBuffer buffer) throws IOException {
+    int bytesRead;
+    int totalBytesRead = 0;
+
+    try (FSDataInputStream in = fs.open(testFile)) {
+      // Start reading from halfway through the file
+      while ((bytesRead = in.read(totalBytesRead + FILE_SIZE / 2,
+              buffer)) > 0) {
+        totalBytesRead += bytesRead;
+        // Check that each call to read changes the position of the ByteBuffer
+        // correctly
+        assertEquals(totalBytesRead, buffer.position());
+      }
+
+      // Since we starting reading halfway through the file, the buffer should
+      // only be half full
+      assertEquals(totalBytesRead, FILE_SIZE / 2);
+      assertEquals(buffer.position(), FILE_SIZE / 2);
+      assertTrue(buffer.hasRemaining());
+      // Check that the buffer contents equal the second half of the file
+      buffer.position(0);
+      byte[] bufferContents = new byte[FILE_SIZE / 2];
+      buffer.get(bufferContents);
+      assertArrayEquals(bufferContents,
+              Arrays.copyOfRange(fileContents, FILE_SIZE / 2, FILE_SIZE));
+    }
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    try {
+      fs.delete(testFile, false);
+      fs.close();
+    } finally {
+      cluster.shutdown(true);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org