You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/05/30 16:50:35 UTC

[hadoop] 01/05: HADOOP-18028. High performance S3A input stream (#4109)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch feature-HADOOP-18028-s3a-prefetch
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit c1d82cd95e375410cb0dffc2931063d48687386f
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Mon Mar 28 11:05:34 2022 +0100

    HADOOP-18028. High performance S3A input stream (#4109)
    
    This is the the initial merge of the HADOOP-18028 S3A performance input stream.
    This patch on its own is incomplete and must be accompanied by all other commits
    with HADOOP-18028 in their git commit message. Consult the JIRA for that list
    
    Contributed by Bhalchandra Pandit.
---
 hadoop-tools/hadoop-aws/pom.xml                    |   6 +
 .../org/apache/hadoop/fs/common/BlockCache.java    |  70 +++
 .../org/apache/hadoop/fs/common/BlockData.java     | 248 +++++++++
 .../org/apache/hadoop/fs/common/BlockManager.java  | 140 +++++
 .../apache/hadoop/fs/common/BlockOperations.java   | 420 +++++++++++++++
 .../hadoop/fs/common/BoundedResourcePool.java      | 182 +++++++
 .../org/apache/hadoop/fs/common/BufferData.java    | 298 +++++++++++
 .../org/apache/hadoop/fs/common/BufferPool.java    | 292 +++++++++++
 .../hadoop/fs/common/CachingBlockManager.java      | 581 +++++++++++++++++++++
 .../org/apache/hadoop/fs/common/FilePosition.java  | 282 ++++++++++
 .../main/java/org/apache/hadoop/fs/common/Io.java  |  45 ++
 .../org/apache/hadoop/fs/common/ResourcePool.java  |  71 +++
 .../java/org/apache/hadoop/fs/common/Retryer.java  |  87 +++
 .../hadoop/fs/common/SingleFilePerBlockCache.java  | 335 ++++++++++++
 .../java/org/apache/hadoop/fs/common/Validate.java | 405 ++++++++++++++
 .../org/apache/hadoop/fs/common/package-info.java  |  27 +
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |  20 +
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  65 ++-
 .../org/apache/hadoop/fs/s3a/S3AReadOpContext.java |  53 +-
 .../java/org/apache/hadoop/fs/s3a/read/README.md   | 107 ++++
 .../apache/hadoop/fs/s3a/read/S3BlockManager.java  |  77 +++
 .../hadoop/fs/s3a/read/S3CachingBlockManager.java  |  90 ++++
 .../hadoop/fs/s3a/read/S3CachingInputStream.java   | 195 +++++++
 .../java/org/apache/hadoop/fs/s3a/read/S3File.java | 219 ++++++++
 .../hadoop/fs/s3a/read/S3InMemoryInputStream.java  |  95 ++++
 .../apache/hadoop/fs/s3a/read/S3InputStream.java   | 461 ++++++++++++++++
 .../fs/s3a/read/S3PrefetchingInputStream.java      | 241 +++++++++
 .../org/apache/hadoop/fs/s3a/read/S3Reader.java    | 159 ++++++
 .../apache/hadoop/fs/s3a/read/package-info.java    |  28 +
 .../apache/hadoop/fs/common/ExceptionAsserts.java  |  60 +++
 .../hadoop/fs/common/SampleDataForTests.java       |  57 ++
 .../apache/hadoop/fs/common/TestBlockCache.java    |  92 ++++
 .../org/apache/hadoop/fs/common/TestBlockData.java | 158 ++++++
 .../hadoop/fs/common/TestBlockOperations.java      | 105 ++++
 .../hadoop/fs/common/TestBoundedResourcePool.java  | 148 ++++++
 .../apache/hadoop/fs/common/TestBufferData.java    | 247 +++++++++
 .../apache/hadoop/fs/common/TestBufferPool.java    | 156 ++++++
 .../apache/hadoop/fs/common/TestFilePosition.java  | 216 ++++++++
 .../org/apache/hadoop/fs/common/TestIoClass.java   |  62 +++
 .../org/apache/hadoop/fs/common/TestRetryer.java   |  81 +++
 .../org/apache/hadoop/fs/common/TestValidate.java  | 322 ++++++++++++
 .../org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java  |   4 +-
 .../java/org/apache/hadoop/fs/s3a/read/Fakes.java  | 369 +++++++++++++
 .../org/apache/hadoop/fs/s3a/read/MockS3File.java  | 105 ++++
 .../hadoop/fs/s3a/read/TestS3BlockManager.java     |  86 +++
 .../fs/s3a/read/TestS3CachingBlockManager.java     | 331 ++++++++++++
 .../org/apache/hadoop/fs/s3a/read/TestS3File.java  |  78 +++
 .../hadoop/fs/s3a/read/TestS3InputStream.java      | 253 +++++++++
 .../apache/hadoop/fs/s3a/read/TestS3Reader.java    | 106 ++++
 .../hadoop/fs/s3a/scale/S3AScaleTestBase.java      |  12 +-
 50 files changed, 8332 insertions(+), 15 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 5583bb7ad05..2e6b8ff3595 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -435,6 +435,12 @@
       <artifactId>aws-java-sdk-bundle</artifactId>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>util-core_2.11</artifactId>
+      <version>21.2.0</version>
+      <scope>compile</scope>
+    </dependency>
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockCache.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockCache.java
new file mode 100644
index 00000000000..16354c7be5f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockCache.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides functionality necessary for caching blocks of data read from FileSystem.
+ */
+public interface BlockCache extends Closeable {
+
+  /**
+   * Indicates whether the given block is in this cache.
+   *
+   * @param blockNumber the id of the given block.
+   * @return true if the given block is in this cache, false otherwise.
+   */
+  boolean containsBlock(int blockNumber);
+
+  /**
+   * Gets the blocks in this cache.
+   *
+   * @return the blocks in this cache.
+   */
+  Iterable<Integer> blocks();
+
+  /**
+   * Gets the number of blocks in this cache.
+   *
+   * @return the number of blocks in this cache.
+   */
+  int size();
+
+  /**
+   * Gets the block having the given {@code blockNumber}.
+   *
+   * @param blockNumber the id of the desired block.
+   * @param buffer contents of the desired block are copied to this buffer.
+   * @throws IOException if there is an error reading the given block.
+   */
+  void get(int blockNumber, ByteBuffer buffer) throws IOException;
+
+  /**
+   * Puts the given block in this cache.
+   *
+   * @param blockNumber the id of the given block.
+   * @param buffer contents of the given block to be added to this cache.
+   * @throws IOException if there is an error writing the given block.
+   */
+  void put(int blockNumber, ByteBuffer buffer) throws IOException;
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockData.java
new file mode 100644
index 00000000000..df226d09938
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockData.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+/**
+ * Holds information about blocks of data in a file.
+ */
+public class BlockData {
+  // State of each block of data.
+  enum State {
+    // Data is not yet ready to be read from this block (still being prefetched).
+    NOT_READY,
+
+    // A read of this block has been enqueued in the prefetch queue.
+    QUEUED,
+
+    // This block is ready to be read. That is, it has been fully read.
+    READY,
+
+    // This block has been cached in the local disk cache.
+    CACHED
+  }
+
+  // State of all blocks in a file.
+  private State[] state;
+
+  // The size of a file.
+  private final long fileSize;
+
+  // The file is divided into blocks of this size.
+  private final int blockSize;
+
+  // The file has these many blocks.
+  private final int numBlocks;
+
+  /**
+   * Constructs an instance of {@link BlockData}.
+   *
+   * @param fileSize the size of a file.
+   * @param blockSize the file is divided into blocks of this size.
+   *
+   * @throws IllegalArgumentException if fileSize is negative.
+   * @throws IllegalArgumentException if blockSize is negative.
+   * @throws IllegalArgumentException if blockSize is zero or negative.
+   */
+  public BlockData(long fileSize, int blockSize) {
+    Validate.checkNotNegative(fileSize, "fileSize");
+    if (fileSize == 0) {
+      Validate.checkNotNegative(blockSize, "blockSize");
+    } else {
+      Validate.checkPositiveInteger(blockSize, "blockSize");
+    }
+
+    this.fileSize = fileSize;
+    this.blockSize = blockSize;
+    this.numBlocks =
+        (fileSize == 0) ? 0 : ((int) (fileSize / blockSize)) + (fileSize % blockSize > 0 ? 1 : 0);
+    this.state = new State[this.numBlocks];
+    for (int b = 0; b < this.numBlocks; b++) {
+      this.setState(b, State.NOT_READY);
+    }
+  }
+
+  /**
+   * Gets the size of each block.
+   *
+   * @return the size of each block.
+   */
+  public int getBlockSize() {
+    return this.blockSize;
+  }
+
+  /**
+   * Gets the size of the associated file.
+   *
+   * @return the size of the associated file.
+   */
+  public long getFileSize() {
+    return this.fileSize;
+  }
+
+  /**
+   * Gets the number of blocks in the associated file.
+   *
+   * @return the number of blocks in the associated file.
+   */
+  public int getNumBlocks() {
+    return this.numBlocks;
+  }
+
+  /**
+   * Indicates whether the given block is the last block in the associated file.
+   *
+   * @param blockNumber the id of the desired block.
+   * @return true if the given block is the last block in the associated file, false otherwise.
+   *
+   * @throws IllegalArgumentException if blockNumber is invalid.
+   */
+  public boolean isLastBlock(int blockNumber) {
+    if (this.fileSize == 0) {
+      return false;
+    }
+
+    throwIfInvalidBlockNumber(blockNumber);
+
+    return blockNumber == (this.numBlocks - 1);
+  }
+
+  /**
+   * Gets the id of the block that contains the given absolute offset.
+   *
+   * @param offset the absolute offset to check.
+   * @return the id of the block that contains the given absolute offset.
+   *
+   * @throws IllegalArgumentException if offset is invalid.
+   */
+  public int getBlockNumber(long offset) {
+    throwIfInvalidOffset(offset);
+
+    return (int) (offset / this.blockSize);
+  }
+
+  /**
+   * Gets the size of the given block.
+   *
+   * @param blockNumber the id of the desired block.
+   * @return the size of the given block.
+   */
+  public int getSize(int blockNumber) {
+    if (this.fileSize == 0) {
+      return 0;
+    }
+
+    if (this.isLastBlock(blockNumber)) {
+      return (int) (this.fileSize - (((long) this.blockSize) * (this.numBlocks - 1)));
+    } else {
+      return this.blockSize;
+    }
+  }
+
+  /**
+   * Indicates whether the given absolute offset is valid.
+   *
+   * @param offset absolute offset in the file..
+   * @return true if the given absolute offset is valid, false otherwise.
+   */
+  public boolean isValidOffset(long offset) {
+    return (offset >= 0) && (offset < this.fileSize);
+  }
+
+  /**
+   * Gets the start offset of the given block.
+
+   * @param blockNumber the id of the given block.
+   * @return the start offset of the given block.
+   *
+   * @throws IllegalArgumentException if blockNumber is invalid.
+   */
+  public long getStartOffset(int blockNumber) {
+    throwIfInvalidBlockNumber(blockNumber);
+
+    return blockNumber * (long) this.blockSize;
+  }
+
+  /**
+   * Gets the relative offset corresponding to the given block and the absolute offset.
+   *
+   * @param blockNumber the id of the given block.
+   * @param offset absolute offset in the file.
+   * @return the relative offset corresponding to the given block and the absolute offset.
+   *
+   * @throws IllegalArgumentException if either blockNumber or offset is invalid.
+   */
+  public int getRelativeOffset(int blockNumber, long offset) {
+    throwIfInvalidOffset(offset);
+
+    return (int) (offset - this.getStartOffset(blockNumber));
+  }
+
+  /**
+   * Gets the state of the given block.
+   *
+   * @param blockNumber the id of the given block.
+   * @return the state of the given block.
+   *
+   * @throws IllegalArgumentException if blockNumber is invalid.
+   */
+  public State getState(int blockNumber) {
+    throwIfInvalidBlockNumber(blockNumber);
+
+    return this.state[blockNumber];
+  }
+
+  /**
+   * Sets the state of the given block to the given value.
+   *
+   * @param blockNumber the id of the given block.
+   * @param blockState the target state.
+   *
+   * @throws IllegalArgumentException if blockNumber is invalid.
+   */
+  public void setState(int blockNumber, State blockState) {
+    throwIfInvalidBlockNumber(blockNumber);
+
+    this.state[blockNumber] = blockState;
+  }
+
+  // Debug helper.
+  public String getStateString() {
+    StringBuilder sb = new StringBuilder();
+    int blockNumber = 0;
+    while (blockNumber < this.numBlocks) {
+      State tstate = this.getState(blockNumber);
+      int endBlockNumber = blockNumber;
+      while ((endBlockNumber < this.numBlocks) && (this.getState(endBlockNumber) == tstate)) {
+        endBlockNumber++;
+      }
+      sb.append(String.format("[%03d ~ %03d] %s%n", blockNumber, endBlockNumber - 1, tstate));
+      blockNumber = endBlockNumber;
+    }
+    return sb.toString();
+  }
+
+  private void throwIfInvalidBlockNumber(int blockNumber) {
+    Validate.checkWithinRange(blockNumber, "blockNumber", 0, this.numBlocks - 1);
+  }
+
+  private void throwIfInvalidOffset(long offset) {
+    Validate.checkWithinRange(offset, "offset", 0, this.fileSize - 1);
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockManager.java
new file mode 100644
index 00000000000..f5672aa6920
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockManager.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides read access to the underlying file one block at a time.
+ *
+ * This class is the simplest form of a {@code BlockManager} that does
+ * perform prefetching or caching.
+ */
+public abstract class BlockManager implements Closeable {
+
+  // Information about each block of the underlying file.
+  private BlockData blockData;
+
+  /**
+   * Constructs an instance of {@code BlockManager}.
+   *
+   * @param blockData information about each block of the underlying file.
+   *
+   * @throws IllegalArgumentException if blockData is null.
+   */
+  public BlockManager(BlockData blockData) {
+    Validate.checkNotNull(blockData, "blockData");
+
+    this.blockData = blockData;
+  }
+
+  /**
+   * Gets block data information.
+   *
+   * @return instance of {@code BlockData}.
+   */
+  public BlockData getBlockData() {
+    return this.blockData;
+  }
+
+  /**
+   * Gets the block having the given {@code blockNumber}.
+   *
+   * The entire block is read into memory and returned as a {@code BufferData}.
+   * The blocks are treated as a limited resource and must be released when
+   * one is done reading them.
+   *
+   * @param blockNumber the number of the block to be read and returned.
+   * @return {@code BufferData} having data from the given block.
+   *
+   * @throws IOException if there an error reading the given block.
+   * @throws IllegalArgumentException if blockNumber is negative.
+   */
+  public BufferData get(int blockNumber) throws IOException {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    int size = this.blockData.getSize(blockNumber);
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    long startOffset = this.blockData.getStartOffset(blockNumber);
+    this.read(buffer, startOffset, size);
+    buffer.flip();
+    return new BufferData(blockNumber, buffer);
+  }
+
+  /**
+   * Reads into the given {@code buffer} {@code size} bytes from the underlying file
+   * starting at {@code startOffset}.
+   *
+   * @param buffer the buffer to read data in to.
+   * @param startOffset the offset at which reading starts.
+   * @param size the number bytes to read.
+   * @return number of bytes read.
+   * @throws IOException if there an error reading the given block.
+   */
+  public abstract int read(ByteBuffer buffer, long startOffset, int size) throws IOException;
+
+  /**
+   * Releases resources allocated to the given block.
+   *
+   * @param data the {@code BufferData} to release.
+   *
+   * @throws IllegalArgumentException if data is null.
+   */
+  public void release(BufferData data) {
+    Validate.checkNotNull(data, "data");
+
+    // Do nothing because we allocate a new buffer each time.
+  }
+
+  /**
+   * Requests optional prefetching of the given block.
+   *
+   * @param blockNumber the id of the block to prefetch.
+   *
+   * @throws IllegalArgumentException if blockNumber is negative.
+   */
+  public void requestPrefetch(int blockNumber) {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    // Do nothing because we do not support prefetches.
+  }
+
+  /**
+   * Requests cancellation of any previously issued prefetch requests.
+   */
+  public void cancelPrefetches() {
+    // Do nothing because we do not support prefetches.
+  }
+
+  /**
+   * Requests that the given block should be copied to the cache. Optional operation.
+   *
+   * @param data the {@code BufferData} instance to optionally cache.
+   */
+  public void requestCaching(BufferData data) {
+    // Do nothing because we do not support caching.
+  }
+
+  @Override
+  public void close() {
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockOperations.java
new file mode 100644
index 00000000000..2b322856293
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockOperations.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.DoubleSummaryStatistics;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Block level operations performed on a file.
+ * This class is meant to be used by {@code BlockManager}.
+ * It is separated out in its own file due to its size.
+ *
+ * This class is used for debugging/logging. Calls to this class
+ * can be safely removed without affecting the overall operation.
+ */
+public class BlockOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(BlockOperations.class);
+
+  public enum Kind {
+    UNKNOWN("??", "unknown", false),
+    CANCEL_PREFETCHES("CP", "cancelPrefetches", false),
+    CLOSE("CX", "close", false),
+    CACHE_PUT("C+", "putC", true),
+    GET_CACHED("GC", "getCached", true),
+    GET_PREFETCHED("GP", "getPrefetched", true),
+    GET_READ("GR", "getRead", true),
+    PREFETCH("PF", "prefetch", true),
+    RELEASE("RL", "release", true),
+    REQUEST_CACHING("RC", "requestCaching", true),
+    REQUEST_PREFETCH("RP", "requestPrefetch", true);
+
+    private String shortName;
+    private String name;
+    private boolean hasBlock;
+
+    Kind(String shortName, String name, boolean hasBlock) {
+      this.shortName = shortName;
+      this.name = name;
+      this.hasBlock = hasBlock;
+    }
+
+    private static Map<String, Kind> shortNameToKind = new HashMap<>();
+
+    public static Kind fromShortName(String shortName) {
+      if (shortNameToKind.size() == 0) {
+        for (Kind kind : Kind.values()) {
+          shortNameToKind.put(kind.shortName, kind);
+        }
+      }
+      return shortNameToKind.get(shortName);
+    }
+  }
+
+  public static class Operation {
+    private final Kind kind;
+    private final int blockNumber;
+    private final long timestamp;
+
+    public Operation(Kind kind, int blockNumber) {
+      this.kind = kind;
+      this.blockNumber = blockNumber;
+      this.timestamp = System.nanoTime();
+    }
+
+    public Kind getKind() {
+      return this.kind;
+    }
+
+    public int getBlockNumber() {
+      return this.blockNumber;
+    }
+
+    public long getTimestamp() {
+      return this.timestamp;
+    }
+
+    public void getSummary(StringBuilder sb) {
+      if (this.kind.hasBlock) {
+        sb.append(String.format("%s(%d)", this.kind.shortName, this.blockNumber));
+      } else {
+        sb.append(String.format("%s", this.kind.shortName));
+      }
+    }
+
+    public String getDebugInfo() {
+      if (this.kind.hasBlock) {
+        return String.format("--- %s(%d)", this.kind.name, this.blockNumber);
+      } else {
+        return String.format("... %s()", this.kind.name);
+      }
+    }
+  }
+
+  public static class End extends Operation {
+    private Operation op;
+
+    public End(Operation op) {
+      super(op.kind, op.blockNumber);
+      this.op = op;
+    }
+
+    @Override
+    public void getSummary(StringBuilder sb) {
+      sb.append("E");
+      super.getSummary(sb);
+    }
+
+    @Override
+    public String getDebugInfo() {
+      return "***" + super.getDebugInfo().substring(3);
+    }
+
+    public double duration() {
+      return (this.getTimestamp() - this.op.getTimestamp()) / 1e9;
+    }
+  }
+
+  private ArrayList<Operation> ops;
+  private boolean debugMode;
+
+  public BlockOperations() {
+    this.ops = new ArrayList<>();
+  }
+
+  public synchronized void setDebug(boolean state) {
+    this.debugMode = state;
+  }
+
+  private synchronized Operation add(Operation op) {
+    if (this.debugMode) {
+      LOG.info(op.getDebugInfo());
+    }
+    ops.add(op);
+    return op;
+  }
+
+  public Operation getPrefetched(int blockNumber) {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    return this.add(new Operation(Kind.GET_PREFETCHED, blockNumber));
+  }
+
+  public Operation getCached(int blockNumber) {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    return this.add(new Operation(Kind.GET_CACHED, blockNumber));
+  }
+
+  public Operation getRead(int blockNumber) {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    return this.add(new Operation(Kind.GET_READ, blockNumber));
+  }
+
+  public Operation release(int blockNumber) {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    return this.add(new Operation(Kind.RELEASE, blockNumber));
+  }
+
+  public Operation requestPrefetch(int blockNumber) {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    return this.add(new Operation(Kind.REQUEST_PREFETCH, blockNumber));
+  }
+
+  public Operation prefetch(int blockNumber) {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    return this.add(new Operation(Kind.PREFETCH, blockNumber));
+  }
+
+  public Operation cancelPrefetches() {
+    return this.add(new Operation(Kind.CANCEL_PREFETCHES, -1));
+  }
+
+  public Operation close() {
+    return this.add(new Operation(Kind.CLOSE, -1));
+  }
+
+  public Operation requestCaching(int blockNumber) {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    return this.add(new Operation(Kind.REQUEST_CACHING, blockNumber));
+  }
+
+  public Operation addToCache(int blockNumber) {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    return this.add(new Operation(Kind.CACHE_PUT, blockNumber));
+  }
+
+  public Operation end(Operation op) {
+    return this.add(new End(op));
+  }
+
+  private static void append(StringBuilder sb, String format, Object... args) {
+    sb.append(String.format(format, args));
+  }
+
+  public synchronized String getSummary(boolean showDebugInfo) {
+    StringBuilder sb = new StringBuilder();
+    for (Operation op : this.ops) {
+      if (op != null) {
+        if (showDebugInfo) {
+          sb.append(op.getDebugInfo());
+          sb.append("\n");
+        } else {
+          op.getSummary(sb);
+          sb.append(";");
+        }
+      }
+    }
+
+    sb.append("\n");
+    this.getDurationInfo(sb);
+
+    return sb.toString();
+  }
+
+  public synchronized void getDurationInfo(StringBuilder sb) {
+    Map<Kind, DoubleSummaryStatistics> durations = new HashMap<>();
+    for (Operation op : this.ops) {
+      if (op instanceof End) {
+        End endOp = (End) op;
+        DoubleSummaryStatistics stats = durations.get(endOp.getKind());
+        if (stats == null) {
+          stats = new DoubleSummaryStatistics();
+          durations.put(endOp.getKind(), stats);
+        }
+        stats.accept(endOp.duration());
+      }
+    }
+
+    List<Kind> kinds = Arrays.asList(
+        Kind.GET_CACHED,
+        Kind.GET_PREFETCHED,
+        Kind.GET_READ,
+        Kind.CACHE_PUT,
+        Kind.PREFETCH,
+        Kind.REQUEST_CACHING,
+        Kind.REQUEST_PREFETCH,
+        Kind.CANCEL_PREFETCHES,
+        Kind.RELEASE,
+        Kind.CLOSE
+    );
+
+    for (Kind kind : kinds) {
+      append(sb, "%-18s : ", kind);
+      DoubleSummaryStatistics stats = durations.get(kind);
+      if (stats == null) {
+        append(sb, "--\n");
+      } else {
+        append(
+            sb,
+            "#ops = %3d, total = %5.1f, min: %3.1f, avg: %3.1f, max: %3.1f\n",
+            stats.getCount(),
+            stats.getSum(),
+            stats.getMin(),
+            stats.getAverage(),
+            stats.getMax());
+      }
+    }
+  }
+
+  public synchronized void analyze(StringBuilder sb) {
+    Map<Integer, List<Operation>> blockOps = new HashMap<>();
+
+    // Group-by block number.
+    for (Operation op : this.ops) {
+      if (op.blockNumber < 0) {
+        continue;
+      }
+
+      List<Operation> perBlockOps;
+      if (!blockOps.containsKey(op.blockNumber)) {
+        perBlockOps = new ArrayList<>();
+        blockOps.put(op.blockNumber, perBlockOps);
+      }
+
+      perBlockOps = blockOps.get(op.blockNumber);
+      perBlockOps.add(op);
+    }
+
+    List<Integer> prefetchedNotUsed = new ArrayList<>();
+    List<Integer> cachedNotUsed = new ArrayList<>();
+
+    for (Map.Entry<Integer, List<Operation>> entry : blockOps.entrySet()) {
+      Integer blockNumber = entry.getKey();
+      List<Operation> perBlockOps = entry.getValue();
+      Map<Kind, Integer> kindCounts = new HashMap<>();
+      Map<Kind, Integer> endKindCounts = new HashMap<>();
+
+      for (Operation op : perBlockOps) {
+        if (op instanceof End) {
+          int endCount = endKindCounts.getOrDefault(op.kind, 0) + 1;
+          endKindCounts.put(op.kind, endCount);
+        } else {
+          int count = kindCounts.getOrDefault(op.kind, 0) + 1;
+          kindCounts.put(op.kind, count);
+        }
+      }
+
+      for (Kind kind : kindCounts.keySet()) {
+        int count = kindCounts.getOrDefault(kind, 0);
+        int endCount = endKindCounts.getOrDefault(kind, 0);
+        if (count != endCount) {
+          append(sb, "[%d] %s : #ops(%d) != #end-ops(%d)\n", blockNumber, kind, count, endCount);
+        }
+
+        if (count > 1) {
+          append(sb, "[%d] %s = %d\n", blockNumber, kind, count);
+        }
+      }
+
+      int prefetchCount = kindCounts.getOrDefault(Kind.PREFETCH, 0);
+      int getPrefetchedCount = kindCounts.getOrDefault(Kind.GET_PREFETCHED, 0);
+      if ((prefetchCount > 0) && (getPrefetchedCount < prefetchCount)) {
+        prefetchedNotUsed.add(blockNumber);
+      }
+
+      int cacheCount = kindCounts.getOrDefault(Kind.CACHE_PUT, 0);
+      int getCachedCount = kindCounts.getOrDefault(Kind.GET_CACHED, 0);
+      if ((cacheCount > 0) && (getCachedCount < cacheCount)) {
+        cachedNotUsed.add(blockNumber);
+      }
+    }
+
+    if (prefetchedNotUsed.size() > 0) {
+      append(sb, "Prefetched but not used: %s\n", getIntList(prefetchedNotUsed));
+    }
+
+    if (cachedNotUsed.size() > 0) {
+      append(sb, "Cached but not used: %s\n", getIntList(cachedNotUsed));
+    }
+  }
+
+  private static String getIntList(Iterable<Integer> nums) {
+    List<String> numList = new ArrayList<>();
+    for (Integer n : nums) {
+      numList.add(n.toString());
+    }
+    return String.join(", ", numList);
+  }
+
+  public static BlockOperations fromSummary(String summary) {
+    BlockOperations ops = new BlockOperations();
+    ops.setDebug(true);
+    Pattern blockOpPattern = Pattern.compile("([A-Z+]+)(\\(([0-9]+)?\\))?");
+    String[] tokens = summary.split(";");
+    for (String token : tokens) {
+      Matcher matcher = blockOpPattern.matcher(token);
+      if (!matcher.matches()) {
+        String message = String.format("Unknown summary format: %s", token);
+        throw new IllegalArgumentException(message);
+      }
+
+      String shortName = matcher.group(1);
+      String blockNumberStr = matcher.group(3);
+      int blockNumber = (blockNumberStr == null) ? -1 : Integer.parseInt(blockNumberStr);
+      Kind kind = Kind.fromShortName(shortName);
+      Kind endKind = null;
+      if (kind == null) {
+        if (shortName.charAt(0) == 'E') {
+          endKind = Kind.fromShortName(shortName.substring(1));
+        }
+      }
+
+      if (kind == null && endKind == null) {
+        String message = String.format("Unknown short name: %s (token = %s)", shortName, token);
+        throw new IllegalArgumentException(message);
+      }
+
+      if (kind != null) {
+        ops.add(new Operation(kind, blockNumber));
+      } else {
+        Operation op = null;
+        for (int i = ops.ops.size() - 1; i >= 0; i--) {
+          op = ops.ops.get(i);
+          if ((op.blockNumber == blockNumber) && (op.kind == endKind) && !(op instanceof End)) {
+            ops.add(new End(op));
+            break;
+          }
+        }
+
+        if (op == null) {
+          LOG.warn("Start op not found: %s(%d)", endKind, blockNumber);
+        }
+      }
+    }
+
+    return ops;
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BoundedResourcePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BoundedResourcePool.java
new file mode 100644
index 00000000000..40f0a1cee57
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BoundedResourcePool.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * Manages a fixed pool of resources.
+ *
+ * Avoids creating a new resource if a previously created instance is already available.
+ */
+public abstract class BoundedResourcePool<T> extends ResourcePool<T> {
+  // The size of this pool. Fixed at creation time.
+  private final int size;
+
+  // Items currently available in the pool.
+  private ArrayBlockingQueue<T> items;
+
+  // Items that have been created so far (regardless of whether they are currently available).
+  private Set<T> createdItems;
+
+  /**
+   * Constructs a resource pool of the given size.
+   *
+   * @param size the size of this pool. Cannot be changed post creation.
+   *
+   * @throws IllegalArgumentException if size is zero or negative.
+   */
+  public BoundedResourcePool(int size) {
+    Validate.checkPositiveInteger(size, "size");
+
+    this.size = size;
+    this.items = new ArrayBlockingQueue<T>(size);
+
+    // The created items are identified based on their object reference.
+    this.createdItems = Collections.newSetFromMap(new IdentityHashMap<T, Boolean>());
+  }
+
+  /**
+   * Acquires a resource blocking if necessary until one becomes available.
+   */
+  @Override
+  public T acquire() {
+    return this.acquireHelper(true);
+  }
+
+  /**
+   * Acquires a resource blocking if one is immediately available. Otherwise returns null.
+   */
+  @Override
+  public T tryAcquire() {
+    return this.acquireHelper(false);
+  }
+
+  /**
+   * Releases a previously acquired resource.
+   *
+   * @throws IllegalArgumentException if item is null.
+   */
+  @Override
+  public void release(T item) {
+    Validate.checkNotNull(item, "item");
+
+    synchronized (this.createdItems) {
+      if (!this.createdItems.contains(item)) {
+        throw new IllegalArgumentException("This item is not a part of this pool");
+      }
+    }
+
+    // Return if this item was released earlier.
+    // We cannot use this.items.contains() because that check is not based on reference equality.
+    for (T entry : this.items) {
+      if (entry == item) {
+        return;
+      }
+    }
+
+    try {
+      this.items.put(item);
+      return;
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("release() should never block");
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    for (T item : this.createdItems) {
+      this.close(item);
+    }
+
+    this.items.clear();
+    this.items = null;
+
+    this.createdItems.clear();
+    this.createdItems = null;
+  }
+
+  /**
+   * Derived classes may implement a way to cleanup each item.
+   */
+  @Override
+  protected synchronized void close(T item) {
+    // Do nothing in this class. Allow overriding classes to take any cleanup action.
+  }
+
+  // Number of items created so far. Mostly for testing purposes.
+  public int numCreated() {
+    synchronized (this.createdItems) {
+      return this.createdItems.size();
+    }
+  }
+
+  // Number of items available to be acquired. Mostly for testing purposes.
+  public synchronized int numAvailable() {
+    return (this.size - this.numCreated()) + this.items.size();
+  }
+
+  // For debugging purposes.
+  @Override
+  public synchronized String toString() {
+    return String.format(
+        "size = %d, #created = %d, #in-queue = %d, #available = %d",
+        this.size, this.numCreated(), this.items.size(), this.numAvailable());
+  }
+
+  /**
+   * Derived classes must implement a way to create an instance of a resource.
+   */
+  protected abstract T createNew();
+
+  private T acquireHelper(boolean canBlock) {
+
+    // Prefer reusing an item if one is available.
+    // That avoids unnecessarily creating new instances.
+    T result = this.items.poll();
+    if (result != null) {
+      return result;
+    }
+
+    synchronized (this.createdItems) {
+      // Create a new instance if allowed by the capacity of this pool.
+      if (this.createdItems.size() < this.size) {
+        T item = this.createNew();
+        this.createdItems.add(item);
+        return item;
+      }
+    }
+
+    if (canBlock) {
+      try {
+        // Block for an instance to be available.
+        return this.items.take();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return null;
+      }
+    } else {
+      return null;
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java
new file mode 100644
index 00000000000..34dd6d7ba3b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.CRC32;
+
+import com.twitter.util.Awaitable.CanAwait;
+import com.twitter.util.Future;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds the state of a ByteBuffer that is in use by {@code CachingBlockManager}.
+ *
+ * This class is not meant to be of general use. It exists into its own file due to its size.
+ * We use the term block and buffer interchangeably in this file because one buffer
+ * holds exactly one block of data.
+ *
+ * Holding all of the state associated with a block allows us to validate and control
+ * state transitions in a synchronized fashion.
+ */
+public class BufferData {
+  private static final Logger LOG = LoggerFactory.getLogger(BufferData.class);
+
+  public enum State {
+    // Unknown / invalid state.
+    UNKNOWN,
+
+    // Buffer has been acquired but has no data.
+    BLANK,
+
+    // This block is being prefetched.
+    PREFETCHING,
+
+    // This block is being added to the local cache.
+    CACHING,
+
+    // This block has data and is ready to be read.
+    READY,
+
+    // This block is no longer in-use and should not be used once in this state.
+    DONE
+  }
+
+  // Number of the block associated with this buffer.
+  private final int blockNumber;
+
+  // The buffer associated with this block.
+  private ByteBuffer buffer;
+
+  // Current state of this block.
+  private volatile State state;
+
+  // Future of the action being performed on this block (eg, prefetching or caching).
+  private Future<Void> action;
+
+  // Checksum of the buffer contents once in READY state.
+  private long checksum = 0;
+
+  /**
+   * Constructs an instances of this class.
+   *
+   * @param blockNumber Number of the block associated with this buffer.
+   * @param buffer The buffer associated with this block.
+   *
+   * @throws IllegalArgumentException if blockNumber is negative.
+   * @throws IllegalArgumentException if buffer is null.
+   */
+  public BufferData(int blockNumber, ByteBuffer buffer) {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+    Validate.checkNotNull(buffer, "buffer");
+
+    this.blockNumber = blockNumber;
+    this.buffer = buffer;
+    this.state = State.BLANK;
+  }
+
+  /**
+   * Gets the id of this block.
+   *
+   * @return the id of this block.
+   */
+  public int getBlockNumber() {
+    return this.blockNumber;
+  }
+
+  /**
+   * Gets the buffer associated with this block.
+   *
+   * @return the buffer associated with this block.
+   */
+  public ByteBuffer getBuffer() {
+    return this.buffer;
+  }
+
+  /**
+   * Gets the state of this block.
+   *
+   * @return the state of this block.
+   */
+  public State getState() {
+    return this.state;
+  }
+
+  /**
+   * Gets the checksum of data in this block.
+   *
+   * @return the checksum of data in this block.
+   */
+  public long getChecksum() {
+    return this.checksum;
+  }
+
+  /**
+   * Computes CRC32 checksum of the given buffer's contents.
+   *
+   * @param buffer the buffer whose content's checksum is to be computed.
+   * @return the computed checksum.
+   */
+  public static long getChecksum(ByteBuffer buffer) {
+    ByteBuffer tempBuffer = buffer.duplicate();
+    tempBuffer.rewind();
+    CRC32 crc32 = new CRC32();
+    crc32.update(tempBuffer);
+    return crc32.getValue();
+  }
+
+  public synchronized Future<Void> getActionFuture() {
+    return this.action;
+  }
+
+  /**
+   * Indicates that a prefetch operation is in progress.
+   *
+   * @param actionFuture the {@code Future} of a prefetch action.
+   *
+   * @throws IllegalArgumentException if actionFuture is null.
+   */
+  public synchronized void setPrefetch(Future<Void> actionFuture) {
+    Validate.checkNotNull(actionFuture, "actionFuture");
+
+    this.updateState(State.PREFETCHING, State.BLANK);
+    this.action = actionFuture;
+  }
+
+  /**
+   * Indicates that a caching operation is in progress.
+   *
+   * @param actionFuture the {@code Future} of a caching action.
+   *
+   * @throws IllegalArgumentException if actionFuture is null.
+   */
+  public synchronized void setCaching(Future<Void> actionFuture) {
+    Validate.checkNotNull(actionFuture, "actionFuture");
+
+    this.throwIfStateIncorrect(State.PREFETCHING, State.READY);
+    this.state = State.CACHING;
+    this.action = actionFuture;
+  }
+
+  /**
+   * Marks the completion of reading data into the buffer.
+   * The buffer cannot be modified once in this state.
+   *
+   * @param expectedCurrentState the collection of states from which transition to READY is allowed.
+   */
+  public synchronized void setReady(State... expectedCurrentState) {
+    if (this.checksum != 0) {
+      throw new IllegalStateException("Checksum cannot be changed once set");
+    }
+
+    this.buffer = this.buffer.asReadOnlyBuffer();
+    this.checksum = getChecksum(this.buffer);
+    this.buffer.rewind();
+    this.updateState(State.READY, expectedCurrentState);
+  }
+
+  /**
+   * Indicates that this block is no longer of use and can be reclaimed.
+   */
+  public synchronized void setDone() {
+    if (this.checksum != 0) {
+      if (getChecksum(this.buffer) != this.checksum) {
+        throw new IllegalStateException("checksum changed after setReady()");
+      }
+    }
+    this.state = State.DONE;
+    this.action = null;
+  }
+
+  /**
+   * Updates the current state to the specified value.
+   * Asserts that the current state is as expected.
+   *
+   * @param newState the state to transition to.
+   * @param expectedCurrentState the collection of states from which
+   *        transition to {@code newState} is allowed.
+   *
+   * @throws IllegalArgumentException if newState is null.
+   * @throws IllegalArgumentException if expectedCurrentState is null.
+   */
+  public synchronized void updateState(State newState, State... expectedCurrentState) {
+    Validate.checkNotNull(newState, "newState");
+    Validate.checkNotNull(expectedCurrentState, "expectedCurrentState");
+
+    this.throwIfStateIncorrect(expectedCurrentState);
+    this.state = newState;
+  }
+
+  /**
+   * Helper that asserts the current state is one of the expected values.
+   *
+   * @param states the collection of allowed states.
+   *
+   * @throws IllegalArgumentException if states is null.
+   */
+  public void throwIfStateIncorrect(State... states) {
+    Validate.checkNotNull(states, "states");
+
+    if (this.stateEqualsOneOf(states)) {
+      return;
+    }
+
+    List<String> statesStr = new ArrayList<String>();
+    for (State s : states) {
+      statesStr.add(s.toString());
+    }
+
+    String message = String.format(
+        "Expected buffer state to be '%s' but found: %s", String.join(" or ", statesStr), this);
+    throw new IllegalStateException(message);
+  }
+
+  public boolean stateEqualsOneOf(State... states) {
+    State currentState = this.state;
+
+    for (State s : states) {
+      if (currentState == s) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private static final CanAwait CAN_AWAIT = () -> false;
+
+  public String toString() {
+
+    return String.format(
+        "[%03d] id: %03d, %s: buf: %s, checksum: %d, future: %s",
+        this.blockNumber,
+        System.identityHashCode(this),
+        this.state,
+        this.getBufferStr(this.buffer),
+        this.checksum,
+        this.getFutureStr(this.action));
+  }
+
+  private String getFutureStr(Future<Void> f) {
+    if (f == null) {
+      return "--";
+    } else {
+      return this.action.isReady(CAN_AWAIT) ? "done" : "not done";
+    }
+  }
+
+  private String getBufferStr(ByteBuffer buf) {
+    if (buf == null) {
+      return "--";
+    } else {
+      return String.format(
+          "(id = %d, pos = %d, lim = %d)",
+          System.identityHashCode(buf),
+          buf.position(), buf.limit());
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java
new file mode 100644
index 00000000000..91798e55006
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+
+import com.twitter.util.Future;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages a fixed pool of {@code ByteBuffer} instances.
+ *
+ * Avoids creating a new buffer if a previously created buffer is already available.
+ */
+public class BufferPool implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(BufferPool.class);
+
+  // Max number of buffers in this pool.
+  private final int size;
+
+  // Size in bytes of each buffer.
+  private final int bufferSize;
+
+  // Invariants for internal state.
+  // -- a buffer is either in this.pool or in this.allocated
+  // -- transition between this.pool <==> this.allocated must be atomic
+  // -- only one buffer allocated for a given blockNumber
+
+  // Underlying bounded resource pool.
+  private BoundedResourcePool<ByteBuffer> pool;
+
+  // Allows associating metadata to each buffer in the pool.
+  private Map<BufferData, ByteBuffer> allocated;
+
+  /**
+   * Initializes a new instance of the {@code BufferPool} class.
+   *
+   * @param size number of buffer in this pool.
+   * @param bufferSize size in bytes of each buffer.
+   *
+   * @throws IllegalArgumentException if size is zero or negative.
+   * @throws IllegalArgumentException if bufferSize is zero or negative.
+   */
+  public BufferPool(int size, int bufferSize) {
+    Validate.checkPositiveInteger(size, "size");
+    Validate.checkPositiveInteger(bufferSize, "bufferSize");
+
+    this.size = size;
+    this.bufferSize = bufferSize;
+    this.allocated = new IdentityHashMap<BufferData, ByteBuffer>();
+    this.pool = new BoundedResourcePool<ByteBuffer>(size) {
+        @Override
+        public ByteBuffer createNew() {
+          return ByteBuffer.allocate(bufferSize);
+        }
+      };
+  }
+
+  /**
+   * Gets a list of all blocks in this pool.
+   *
+   * @return a list of all blocks in this pool.
+   */
+  public List<BufferData> getAll() {
+    synchronized (this.allocated) {
+      return Collections.unmodifiableList(new ArrayList<BufferData>(this.allocated.keySet()));
+    }
+  }
+
+  /**
+   * Acquires a {@code ByteBuffer}; blocking if necessary until one becomes available.
+   *
+   * @param blockNumber the id of the block to acquire.
+   * @return the acquired block's {@code BufferData}.
+   */
+  public synchronized BufferData acquire(int blockNumber) {
+    BufferData data;
+    final int maxRetryDelayMs = 600 * 1000;
+    final int statusUpdateDelayMs = 120 * 1000;
+    Retryer retryer = new Retryer(10, maxRetryDelayMs, statusUpdateDelayMs);
+
+    do {
+      if (retryer.updateStatus()) {
+        LOG.warn("waiting to acquire block: {}", blockNumber);
+        LOG.info("state = {}", this.toString());
+        this.releaseReadyBlock(blockNumber);
+      }
+      data = this.tryAcquire(blockNumber);
+    }
+    while ((data == null) && retryer.continueRetry());
+
+    if (data != null) {
+      return data;
+    } else {
+      String message = String.format("Wait failed for acquire(%d)", blockNumber);
+      throw new IllegalStateException(message);
+    }
+  }
+
+  /**
+   * Acquires a buffer if one is immediately available. Otherwise returns null.
+   *
+   * @param blockNumber the id of the block to try acquire.
+   * @return the acquired block's {@code BufferData} or null.
+   */
+  public synchronized BufferData tryAcquire(int blockNumber) {
+    return this.acquireHelper(blockNumber, false);
+  }
+
+  private synchronized BufferData acquireHelper(int blockNumber, boolean canBlock) {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    this.releaseDoneBlocks();
+
+    BufferData data = this.find(blockNumber);
+    if (data != null) {
+      return data;
+    }
+
+    ByteBuffer buffer = canBlock ? this.pool.acquire() : this.pool.tryAcquire();
+    if (buffer == null) {
+      return null;
+    }
+
+    buffer.clear();
+    data = new BufferData(blockNumber, buffer.duplicate());
+
+    synchronized (this.allocated) {
+      Validate.checkState(this.find(blockNumber) == null, "buffer data already exists");
+
+      this.allocated.put(data, buffer);
+    }
+
+    return data;
+  }
+
+  /**
+   * Releases resources for any blocks marked as 'done'.
+   */
+  private synchronized void releaseDoneBlocks() {
+    for (BufferData data : this.getAll()) {
+      if (data.stateEqualsOneOf(BufferData.State.DONE)) {
+        this.release(data);
+      }
+    }
+  }
+
+  /**
+   * If no blocks were released after calling releaseDoneBlocks() a few times,
+   * we may end up waiting forever. To avoid that situation, we try releasing
+   * a 'ready' block farthest away from the given block.
+   */
+  private synchronized void releaseReadyBlock(int blockNumber) {
+    BufferData releaseTarget = null;
+    for (BufferData data : this.getAll()) {
+      if (data.stateEqualsOneOf(BufferData.State.READY)) {
+        if (releaseTarget == null) {
+          releaseTarget = data;
+        } else {
+          if (distance(data, blockNumber) > distance(releaseTarget, blockNumber)) {
+            releaseTarget = data;
+          }
+        }
+      }
+    }
+
+    if (releaseTarget != null) {
+      LOG.warn("releasing 'ready' block: {}", releaseTarget);
+      releaseTarget.setDone();
+    }
+  }
+
+  private int distance(BufferData data, int blockNumber) {
+    return Math.abs(data.getBlockNumber() - blockNumber);
+  }
+
+  /**
+   * Releases a previously acquired resource.
+   *
+   * @param data the {@code BufferData} instance to release.
+   *
+   * @throws IllegalArgumentException if data is null.
+   * @throws IllegalArgumentException if data cannot be released due to its state.
+   */
+  public synchronized void release(BufferData data) {
+    Validate.checkNotNull(data, "data");
+
+    synchronized (data) {
+      Validate.checkArgument(
+          this.canRelease(data),
+          String.format("Unable to release buffer: %s", data));
+
+      ByteBuffer buffer = this.allocated.get(data);
+      if (buffer == null) {
+        // Likely released earlier.
+        return;
+      }
+      buffer.clear();
+      this.pool.release(buffer);
+      this.allocated.remove(data);
+    }
+
+    this.releaseDoneBlocks();
+  }
+
+  @Override
+  public synchronized void close() {
+    for (BufferData data : this.getAll()) {
+      Future<Void> actionFuture = data.getActionFuture();
+      if (actionFuture != null) {
+        actionFuture.raise(new CancellationException("BufferPool is closing."));
+      }
+    }
+
+    this.pool.close();
+    this.pool = null;
+
+    this.allocated.clear();
+    this.allocated = null;
+  }
+
+  // For debugging purposes.
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(this.pool.toString());
+    sb.append("\n");
+    List<BufferData> allData = new ArrayList<>(this.getAll());
+    Collections.sort(allData, (d1, d2) -> d1.getBlockNumber() - d2.getBlockNumber());
+    for (BufferData data : allData) {
+      sb.append(data.toString());
+      sb.append("\n");
+    }
+
+    return sb.toString();
+  }
+
+  // Number of ByteBuffers created so far.
+  public synchronized int numCreated() {
+    return this.pool.numCreated();
+  }
+
+  // Number of ByteBuffers available to be acquired.
+  public synchronized int numAvailable() {
+    this.releaseDoneBlocks();
+    return this.pool.numAvailable();
+  }
+
+  private BufferData find(int blockNumber) {
+    synchronized (this.allocated) {
+      for (BufferData data : this.allocated.keySet()) {
+        if ((data.getBlockNumber() == blockNumber)
+            && !data.stateEqualsOneOf(BufferData.State.DONE)) {
+          return data;
+        }
+      }
+    }
+
+    return null;
+  }
+
+  private boolean canRelease(BufferData data) {
+    return data.stateEqualsOneOf(
+        BufferData.State.DONE,
+        BufferData.State.READY);
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
new file mode 100644
index 00000000000..93417f3fe61
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.twitter.util.Await;
+import com.twitter.util.ExceptionalFunction0;
+import com.twitter.util.Future;
+import com.twitter.util.FuturePool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides read access to the underlying file one block at a time.
+ * Improve read performance by prefetching and locall caching blocks.
+ */
+public abstract class CachingBlockManager extends BlockManager {
+  private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class);
+
+  // Asynchronous tasks are performed in this pool.
+  private final FuturePool futurePool;
+
+  // Pool of shared ByteBuffer instances.
+  private BufferPool bufferPool;
+
+  // Size of the in-memory cache in terms of number of blocks.
+  // Total memory consumption is up to bufferPoolSize * blockSize.
+  private final int bufferPoolSize;
+
+  // Local block cache.
+  private BlockCache cache;
+
+  // Error counts. For testing purposes.
+  private final AtomicInteger numCachingErrors;
+  private final AtomicInteger numReadErrors;
+
+  // Operations performed by this block manager.
+  private final BlockOperations ops;
+
+  private boolean closed;
+
+  // If a single caching operation takes more than this time (in seconds),
+  // we disable caching to prevent further perf degradation due to caching.
+  private static final int SLOW_CACHING_THRESHOLD = 5;
+
+  // Once set to true, any further caching requests will be ignored.
+  private final AtomicBoolean cachingDisabled;
+
+  /**
+   * Constructs an instance of a {@code CachingBlockManager}.
+   *
+   * @param futurePool asynchronous tasks are performed in this pool.
+   * @param blockData information about each block of the underlying file.
+   * @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
+   *
+   * @throws IllegalArgumentException if futurePool is null.
+   * @throws IllegalArgumentException if bufferPoolSize is zero or negative.
+   */
+  public CachingBlockManager(
+      FuturePool futurePool,
+      BlockData blockData,
+      int bufferPoolSize) {
+    super(blockData);
+
+    Validate.checkNotNull(futurePool, "futurePool");
+    Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
+
+    this.futurePool = futurePool;
+    this.bufferPoolSize = bufferPoolSize;
+    this.numCachingErrors = new AtomicInteger();
+    this.numReadErrors = new AtomicInteger();
+    this.cachingDisabled = new AtomicBoolean();
+
+    if (this.getBlockData().getFileSize() > 0) {
+      this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize());
+      this.cache = this.createCache();
+    }
+
+    this.ops = new BlockOperations();
+    this.ops.setDebug(false);
+  }
+
+  /**
+   * Gets the block having the given {@code blockNumber}.
+   *
+   * @throws IllegalArgumentException if blockNumber is negative.
+   */
+  @Override
+  public BufferData get(int blockNumber) throws IOException {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    BufferData data = null;
+    final int maxRetryDelayMs = this.bufferPoolSize * 120 * 1000;
+    final int statusUpdateDelayMs = 120 * 1000;
+    Retryer retryer = new Retryer(10, maxRetryDelayMs, statusUpdateDelayMs);
+    boolean done;
+
+    do {
+      if (this.closed) {
+        throw new IOException("this stream is already closed");
+      }
+
+      data = this.bufferPool.acquire(blockNumber);
+      done = this.getInternal(data);
+
+      if (retryer.updateStatus()) {
+        LOG.warn("waiting to get block: {}", blockNumber);
+        LOG.info("state = {}", this.toString());
+      }
+    }
+    while (!done && retryer.continueRetry());
+
+    if (done) {
+      return data;
+    } else {
+      String message = String.format("Wait failed for get(%d)", blockNumber);
+      throw new IllegalStateException(message);
+    }
+  }
+
+  private boolean getInternal(BufferData data) throws IOException {
+    Validate.checkNotNull(data, "data");
+
+    // Opportunistic check without locking.
+    if (data.stateEqualsOneOf(
+        BufferData.State.PREFETCHING,
+        BufferData.State.CACHING,
+        BufferData.State.DONE)) {
+      return false;
+    }
+
+    synchronized (data) {
+      // Reconfirm state after locking.
+      if (data.stateEqualsOneOf(
+          BufferData.State.PREFETCHING,
+          BufferData.State.CACHING,
+          BufferData.State.DONE)) {
+        return false;
+      }
+
+      int blockNumber = data.getBlockNumber();
+      if (data.getState() == BufferData.State.READY) {
+        BlockOperations.Operation op = this.ops.getPrefetched(blockNumber);
+        this.ops.end(op);
+        return true;
+      }
+
+      data.throwIfStateIncorrect(BufferData.State.BLANK);
+      this.read(data);
+      return true;
+    }
+  }
+
+  /**
+   * Releases resources allocated to the given block.
+   *
+   * @throws IllegalArgumentException if data is null.
+   */
+  @Override
+  public void release(BufferData data) {
+    if (this.closed) {
+      return;
+    }
+
+    Validate.checkNotNull(data, "data");
+
+    BlockOperations.Operation op = this.ops.release(data.getBlockNumber());
+    this.bufferPool.release(data);
+    this.ops.end(op);
+  }
+
+  @Override
+  public synchronized void close() {
+    if (this.closed) {
+      return;
+    }
+
+    this.closed = true;
+
+    final BlockOperations.Operation op = this.ops.close();
+
+    // Cancel any prefetches in progress.
+    this.cancelPrefetches();
+
+    Io.closeIgnoringIoException(this.cache);
+
+    this.ops.end(op);
+    LOG.info(this.ops.getSummary(false));
+
+    this.bufferPool.close();
+    this.bufferPool = null;
+  }
+
+  /**
+   * Requests optional prefetching of the given block.
+   * The block is prefetched only if we can acquire a free buffer.
+   *
+   * @throws IllegalArgumentException if blockNumber is negative.
+   */
+  @Override
+  public void requestPrefetch(int blockNumber) {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    if (this.closed) {
+      return;
+    }
+
+    // We initiate a prefetch only if we can acquire a buffer from the shared pool.
+    BufferData data = this.bufferPool.tryAcquire(blockNumber);
+    if (data == null) {
+      return;
+    }
+
+    // Opportunistic check without locking.
+    if (!data.stateEqualsOneOf(BufferData.State.BLANK)) {
+      // The block is ready or being prefetched/cached.
+      return;
+    }
+
+    synchronized (data) {
+      // Reconfirm state after locking.
+      if (!data.stateEqualsOneOf(BufferData.State.BLANK)) {
+        // The block is ready or being prefetched/cached.
+        return;
+      }
+
+      BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber);
+      PrefetchTask prefetchTask = new PrefetchTask(data, this);
+      Future<Void> prefetchFuture = this.futurePool.apply(prefetchTask);
+      data.setPrefetch(prefetchFuture);
+      this.ops.end(op);
+    }
+  }
+
+  /**
+   * Requests cancellation of any previously issued prefetch requests.
+   */
+  @Override
+  public void cancelPrefetches() {
+    BlockOperations.Operation op = this.ops.cancelPrefetches();
+
+    for (BufferData data : this.bufferPool.getAll()) {
+      // We add blocks being prefetched to the local cache so that the prefetch is not wasted.
+      if (data.stateEqualsOneOf(BufferData.State.PREFETCHING, BufferData.State.READY)) {
+        this.requestCaching(data);
+      }
+    }
+
+    this.ops.end(op);
+  }
+
+  private void read(BufferData data) throws IOException {
+    synchronized (data) {
+      this.readBlock(data, false, BufferData.State.BLANK);
+    }
+  }
+
+  private void prefetch(BufferData data) throws IOException {
+    synchronized (data) {
+      this.readBlock(
+          data,
+          true,
+          BufferData.State.PREFETCHING,
+          BufferData.State.CACHING);
+    }
+  }
+
+  private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... expectedState)
+      throws IOException {
+
+    if (this.closed) {
+      return;
+    }
+
+    BlockOperations.Operation op = null;
+
+    synchronized (data) {
+      try {
+        if (data.stateEqualsOneOf(BufferData.State.DONE, BufferData.State.READY)) {
+          // DONE  : Block was released, likely due to caching being disabled on slow perf.
+          // READY : Block was already fetched by another thread. No need to re-read.
+          return;
+        }
+
+        data.throwIfStateIncorrect(expectedState);
+        int blockNumber = data.getBlockNumber();
+
+        // Prefer reading from cache over reading from network.
+        if (this.cache.containsBlock(blockNumber)) {
+          op = this.ops.getCached(blockNumber);
+          this.cache.get(blockNumber, data.getBuffer());
+          data.setReady(expectedState);
+          return;
+        }
+
+        if (isPrefetch) {
+          op = this.ops.prefetch(data.getBlockNumber());
+        } else {
+          op = this.ops.getRead(data.getBlockNumber());
+        }
+
+        long offset = this.getBlockData().getStartOffset(data.getBlockNumber());
+        int size = this.getBlockData().getSize(data.getBlockNumber());
+        ByteBuffer buffer = data.getBuffer();
+        buffer.clear();
+        this.read(buffer, offset, size);
+        buffer.flip();
+        data.setReady(expectedState);
+      } catch (Exception e) {
+        String message = String.format("error during readBlock(%s)", data.getBlockNumber());
+        LOG.error(message, e);
+        this.numReadErrors.incrementAndGet();
+        data.setDone();
+        throw e;
+      } finally {
+        if (op != null) {
+          this.ops.end(op);
+        }
+      }
+    }
+  }
+
+  /**
+   * Read task that is submitted to the future pool.
+   */
+  private static class PrefetchTask extends ExceptionalFunction0<Void> {
+    private final BufferData data;
+    private final CachingBlockManager blockManager;
+
+    PrefetchTask(BufferData data, CachingBlockManager blockManager) {
+      this.data = data;
+      this.blockManager = blockManager;
+    }
+
+    @Override
+    public Void applyE() {
+      try {
+        this.blockManager.prefetch(data);
+      } catch (Exception e) {
+        LOG.error("error during prefetch", e);
+      }
+      return null;
+    }
+  }
+
+  private static final BufferData.State[] EXPECTED_STATE_AT_CACHING =
+      new BufferData.State[] {
+          BufferData.State.PREFETCHING, BufferData.State.READY
+      };
+
+  /**
+   * Requests that the given block should be copied to the local cache.
+   * The block must not be accessed by the caller after calling this method
+   * because it will released asynchronously relative to the caller.
+   *
+   * @throws IllegalArgumentException if data is null.
+   */
+  @Override
+  public void requestCaching(BufferData data) {
+    if (this.closed) {
+      return;
+    }
+
+    if (this.cachingDisabled.get()) {
+      data.setDone();
+      return;
+    }
+
+    Validate.checkNotNull(data, "data");
+
+    // Opportunistic check without locking.
+    if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
+      return;
+    }
+
+    synchronized (data) {
+      // Reconfirm state after locking.
+      if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
+        return;
+      }
+
+      if (this.cache.containsBlock(data.getBlockNumber())) {
+        data.setDone();
+        return;
+      }
+
+      BufferData.State state = data.getState();
+
+      BlockOperations.Operation op = this.ops.requestCaching(data.getBlockNumber());
+      Future<Void> blockFuture;
+      if (state == BufferData.State.PREFETCHING) {
+        blockFuture = data.getActionFuture();
+      } else {
+        blockFuture = Future.value(null);
+      }
+
+      CachePutTask task = new CachePutTask(data, blockFuture, this);
+      Future<Void> actionFuture = this.futurePool.apply(task);
+      data.setCaching(actionFuture);
+      this.ops.end(op);
+    }
+  }
+
+  private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture) {
+    if (this.closed) {
+      return;
+    }
+
+    if (this.cachingDisabled.get()) {
+      data.setDone();
+      return;
+    }
+
+    try {
+      Await.result(blockFuture);
+      if (data.stateEqualsOneOf(BufferData.State.DONE)) {
+        // There was an error during prefetch.
+        return;
+      }
+    } catch (Exception e) {
+      String message = String.format("error waitng on blockFuture: %s", data);
+      LOG.error(message, e);
+      data.setDone();
+      return;
+    }
+
+    if (this.cachingDisabled.get()) {
+      data.setDone();
+      return;
+    }
+
+    BlockOperations.Operation op = null;
+
+    synchronized (data) {
+      try {
+        if (data.stateEqualsOneOf(BufferData.State.DONE)) {
+          return;
+        }
+
+        if (this.cache.containsBlock(data.getBlockNumber())) {
+          data.setDone();
+          return;
+        }
+
+        op = this.ops.addToCache(data.getBlockNumber());
+        ByteBuffer buffer = data.getBuffer().duplicate();
+        buffer.rewind();
+        this.cachePut(data.getBlockNumber(), buffer);
+        data.setDone();
+      } catch (Exception e) {
+        this.numCachingErrors.incrementAndGet();
+        String message = String.format("error adding block to cache after wait: %s", data);
+        LOG.error(message, e);
+        data.setDone();
+      }
+
+      if (op != null) {
+        BlockOperations.End endOp = (BlockOperations.End) this.ops.end(op);
+        if (endOp.duration() > SLOW_CACHING_THRESHOLD) {
+          if (!this.cachingDisabled.getAndSet(true)) {
+            String message = String.format(
+                "Caching disabled because of slow operation (%.1f sec)", endOp.duration());
+            LOG.warn(message);
+          }
+        }
+      }
+    }
+  }
+
+  protected BlockCache createCache() {
+    return new SingleFilePerBlockCache();
+  }
+
+  protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
+    if (this.closed) {
+      return;
+    }
+
+    this.cache.put(blockNumber, buffer);
+  }
+
+  private static class CachePutTask extends ExceptionalFunction0<Void> {
+    private final BufferData data;
+
+    // Block being asynchronously fetched.
+    private final Future<Void> blockFuture;
+
+    // Block manager that manages this block.
+    private final CachingBlockManager blockManager;
+
+    CachePutTask(
+        BufferData data,
+        Future<Void> blockFuture,
+        CachingBlockManager blockManager) {
+      this.data = data;
+      this.blockFuture = blockFuture;
+      this.blockManager = blockManager;
+    }
+
+    @Override
+    public Void applyE() {
+      this.blockManager.addToCacheAndRelease(this.data, this.blockFuture);
+      return null;
+    }
+  }
+
+  /**
+   * Number of ByteBuffers available to be acquired.
+   *
+   * @return the number of available buffers.
+   */
+  public int numAvailable() {
+    return this.bufferPool.numAvailable();
+  }
+
+  /**
+   * Number of caching operations completed.
+   *
+   * @return the number of cached buffers.
+   */
+  public int numCached() {
+    return this.cache.size();
+  }
+
+  /**
+   * Number of errors encountered when caching.
+   *
+   * @return the number of errors encountered when caching.
+   */
+  public int numCachingErrors() {
+    return this.numCachingErrors.get();
+  }
+
+  /**
+   * Number of errors encountered when reading.
+   *
+   * @return the number of errors encountered when reading.
+   */
+  public int numReadErrors() {
+    return this.numReadErrors.get();
+  }
+
+  BufferData getData(int blockNumber) {
+    return this.bufferPool.tryAcquire(blockNumber);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+
+    sb.append("cache(");
+    sb.append(this.cache.toString());
+    sb.append("); ");
+
+    sb.append("pool: ");
+    sb.append(this.bufferPool.toString());
+
+    return sb.toString();
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/FilePosition.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/FilePosition.java
new file mode 100644
index 00000000000..24c46bc4d1f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/FilePosition.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Provides functionality related to tracking the position within a file.
+ *
+ * The file is accessed through an in memory buffer. The absolute position within
+ * the file is the sum of start offset of the buffer within the file and the relative
+ * offset of the current access location within the buffer.
+ *
+ * A file is made up of equal sized blocks. The last block may be of a smaller size.
+ * The size of a buffer associated with this file is typically the same as block size.
+ */
+public class FilePosition {
+  // Holds block based information about a file.
+  private BlockData blockData;
+
+  // Information about the buffer in use.
+  private BufferData data;
+
+  // Provides access to the underlying file.
+  private ByteBuffer buffer;
+
+  // Start offset of the buffer relative to the start of a file.
+  private long bufferStartOffset;
+
+  // Offset where reading starts relative to the start of a file.
+  private long readStartOffset;
+
+  // Read stats after a seek (mostly for debugging use).
+  private int numSingleByteReads;
+  private int numBytesRead;
+  private int numBufferReads;
+
+  /**
+   * Constructs an instance of {@link FilePosition}.
+   *
+   * @param fileSize size of the associated file.
+   * @param blockSize size of each block within the file.
+   *
+   * @throws IllegalArgumentException if fileSize is negative.
+   * @throws IllegalArgumentException if blockSize is zero or negative.
+   */
+  public FilePosition(long fileSize, int blockSize) {
+    Validate.checkNotNegative(fileSize, "fileSize");
+    if (fileSize == 0) {
+      Validate.checkNotNegative(blockSize, "blockSize");
+    } else {
+      Validate.checkPositiveInteger(blockSize, "blockSize");
+    }
+
+    this.blockData = new BlockData(fileSize, blockSize);
+
+    // The position is valid only when a valid buffer is associated with this file.
+    this.invalidate();
+  }
+
+  /**
+   * Associates a buffer with this file.
+   *
+   * @param bufferData the buffer associated with this file.
+   * @param startOffset Start offset of the buffer relative to the start of a file.
+   * @param readOffset Offset where reading starts relative to the start of a file.
+   *
+   * @throws IllegalArgumentException if bufferData is null.
+   * @throws IllegalArgumentException if startOffset is negative.
+   * @throws IllegalArgumentException if readOffset is negative.
+   * @throws IllegalArgumentException if readOffset is outside the range [startOffset, buffer end].
+   */
+  public void setData(BufferData bufferData, long startOffset, long readOffset) {
+    Validate.checkNotNull(bufferData, "bufferData");
+    Validate.checkNotNegative(startOffset, "startOffset");
+    Validate.checkNotNegative(readOffset, "readOffset");
+    Validate.checkWithinRange(
+        readOffset,
+        "readOffset",
+        startOffset,
+        startOffset + bufferData.getBuffer().limit() - 1);
+
+    this.data = bufferData;
+    this.buffer = bufferData.getBuffer().duplicate();
+    this.bufferStartOffset = startOffset;
+    this.readStartOffset = readOffset;
+    this.setAbsolute(readOffset);
+
+    this.resetReadStats();
+  }
+
+  public ByteBuffer buffer() {
+    throwIfInvalidBuffer();
+    return this.buffer;
+  }
+
+  public BufferData data() {
+    throwIfInvalidBuffer();
+    return this.data;
+  }
+
+  /**
+   * Gets the current absolute position within this file.
+   *
+   * @return the current absolute position within this file.
+   */
+  public long absolute() {
+    throwIfInvalidBuffer();
+    return this.bufferStartOffset + this.relative();
+  }
+
+  /**
+   * If the given {@code pos} lies within the current buffer, updates the current position to
+   * the specified value and returns true; otherwise returns false without changing the position.
+   *
+   * @param pos the absolute position to change the current position to if possible.
+   * @return true if the given current position was updated, false otherwise.
+   */
+  public boolean setAbsolute(long pos) {
+    if (this.isValid() && this.isWithinCurrentBuffer(pos)) {
+      int relativePos = (int) (pos - this.bufferStartOffset);
+      this.buffer.position(relativePos);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Gets the current position within this file relative to the start of the associated buffer.
+   *
+   * @return the current position within this file relative to the start of the associated buffer.
+   */
+  public int relative() {
+    throwIfInvalidBuffer();
+    return this.buffer.position();
+  }
+
+  /**
+   * Determines whether the given absolute position lies within the current buffer.
+   *
+   * @param pos the position to check.
+   * @return true if the given absolute position lies within the current buffer, false otherwise.
+   */
+  public boolean isWithinCurrentBuffer(long pos) {
+    throwIfInvalidBuffer();
+    long bufferEndOffset = this.bufferStartOffset + this.buffer.limit() - 1;
+    return (pos >= this.bufferStartOffset) && (pos <= bufferEndOffset);
+  }
+
+  /**
+   * Gets the id of the current block.
+   *
+   * @return the id of the current block.
+   */
+  public int blockNumber() {
+    throwIfInvalidBuffer();
+    return this.blockData.getBlockNumber(this.bufferStartOffset);
+  }
+
+  /**
+   * Determines whether the current block is the last block in this file.
+   *
+   * @return true if the current block is the last block in this file, false otherwise.
+   */
+  public boolean isLastBlock() {
+    return this.blockData.isLastBlock(this.blockNumber());
+  }
+
+  /**
+   * Determines if the current position is valid.
+   *
+   * @return true if the current position is valid, false otherwise.
+   */
+  public boolean isValid() {
+    return this.buffer != null;
+  }
+
+  /**
+   * Marks the current position as invalid.
+   */
+  public void invalidate() {
+    this.buffer = null;
+    this.bufferStartOffset = -1;
+    this.data = null;
+  }
+
+  /**
+   * Gets the start of the current block's absolute offset.
+   *
+   * @return the start of the current block's absolute offset.
+   */
+  public long bufferStartOffset() {
+    throwIfInvalidBuffer();
+    return this.bufferStartOffset;
+  }
+
+  /**
+   * Determines whether the current buffer has been fully read.
+   *
+   * @return true if the current buffer has been fully read, false otherwise.
+   */
+  public boolean bufferFullyRead() {
+    throwIfInvalidBuffer();
+    return (this.bufferStartOffset == this.readStartOffset)
+        && (this.relative() == this.buffer.limit())
+        && (this.numBytesRead == this.buffer.limit());
+  }
+
+  public void incrementBytesRead(int n) {
+    this.numBytesRead += n;
+    if (n == 1) {
+      this.numSingleByteReads++;
+    } else {
+      this.numBufferReads++;
+    }
+  }
+
+  public int numBytesRead() {
+    return this.numBytesRead;
+  }
+
+  public int numSingleByteReads() {
+    return this.numSingleByteReads;
+  }
+
+  public int numBufferReads() {
+    return this.numBufferReads;
+  }
+
+  private void resetReadStats() {
+    numBytesRead = 0;
+    numSingleByteReads = 0;
+    numBufferReads = 0;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    if (this.buffer == null) {
+      sb.append("currentBuffer = null");
+    } else {
+      int pos = this.buffer.position();
+      int val;
+      if (pos >= this.buffer.limit()) {
+        val = -1;
+      } else {
+        val = this.buffer.get(pos);
+      }
+      String currentBufferState =
+          String.format("%d at pos: %d, lim: %d", val, pos, this.buffer.limit());
+      sb.append(String.format(
+          "block: %d, pos: %d (CBuf: %s)%n",
+          this.blockNumber(), this.absolute(),
+          currentBufferState));
+      sb.append("\n");
+    }
+    return sb.toString();
+  }
+
+  private void throwIfInvalidBuffer() {
+    if (!this.isValid()) {
+      Validate.checkState(buffer != null, "'buffer' must not be null");
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Io.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Io.java
new file mode 100644
index 00000000000..fe11cdccf87
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Io.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Provides misc functionality related to IO.
+ */
+public final class Io {
+  private Io() {}
+
+  /**
+   * Closes the given resource and ignores any IOException if thrown.
+   *
+   * @param resource the resource to close.
+   */
+  public static void closeIgnoringIoException(Closeable resource) {
+    try {
+      if (resource != null) {
+        resource.close();
+      }
+    } catch (IOException e) {
+      // Ignored on purpose as there is not much we can do here.
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ResourcePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ResourcePool.java
new file mode 100644
index 00000000000..11affb590f4
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ResourcePool.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.io.Closeable;
+
+/**
+ * Manages a fixed pool of resources.
+ *
+ * Avoids creating a new resource if a previously created instance is already available.
+ */
+public abstract class ResourcePool<T> implements Closeable {
+
+  /**
+   * Acquires a resource blocking if necessary until one becomes available.
+   *
+   * @return the acquired resource instance.
+   */
+  public abstract T acquire();
+
+  /**
+   * Acquires a resource blocking if one is immediately available. Otherwise returns null.
+
+   * @return the acquired resource instance (if immediately available) or null.
+   */
+  public abstract T tryAcquire();
+
+  /**
+   * Releases a previously acquired resource.
+   *
+   * @param item the resource to release.
+   */
+  public abstract void release(T item);
+
+  @Override
+  public void close() {
+  }
+
+  /**
+   * Derived classes may implement a way to cleanup each item.
+   *
+   * @param item the resource to close.
+   */
+  protected void close(T item) {
+    // Do nothing in this class. Allow overriding classes to take any cleanup action.
+  }
+
+  /**
+   * Derived classes must implement a way to create an instance of a resource.
+   *
+   * @return the created instance.
+   */
+  protected abstract T createNew();
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Retryer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Retryer.java
new file mode 100644
index 00000000000..5aea7897008
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Retryer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+/**
+ * Provides retry related functionality.
+ */
+public class Retryer {
+  // Maximum amount of delay (in ms) before retry fails.
+  private int maxDelay;
+
+  // Per retry delay (in ms).
+  private int perRetryDelay;
+
+  // The time interval (in ms) at which status update would be made.
+  private int statusUpdateInterval;
+
+  // Current delay.
+  private int delay;
+
+  /**
+   * Initializes a new instance of the {@code Retryer} class.
+   *
+   * @param perRetryDelay per retry delay (in ms).
+   * @param maxDelay maximum amount of delay (in ms) before retry fails.
+   * @param statusUpdateInterval time interval (in ms) at which status update would be made.
+   *
+   * @throws IllegalArgumentException if perRetryDelay is zero or negative.
+   * @throws IllegalArgumentException if maxDelay is less than or equal to perRetryDelay.
+   * @throws IllegalArgumentException if statusUpdateInterval is zero or negative.
+   */
+  public Retryer(int perRetryDelay, int maxDelay, int statusUpdateInterval) {
+    Validate.checkPositiveInteger(perRetryDelay, "perRetryDelay");
+    Validate.checkGreater(maxDelay, "maxDelay", perRetryDelay, "perRetryDelay");
+    Validate.checkPositiveInteger(statusUpdateInterval, "statusUpdateInterval");
+
+    this.perRetryDelay = perRetryDelay;
+    this.maxDelay = maxDelay;
+    this.statusUpdateInterval = statusUpdateInterval;
+  }
+
+  /**
+   * Returns true if retrying should continue, false otherwise.
+   *
+   * @return true if the caller should retry, false otherwise.
+   */
+  public boolean continueRetry() {
+    if (this.delay >= this.maxDelay) {
+      return false;
+    }
+
+    try {
+      Thread.sleep(this.perRetryDelay);
+    } catch (InterruptedException e) {
+      // Ignore the exception as required by the semantic of this class;
+    }
+
+    this.delay += this.perRetryDelay;
+    return true;
+  }
+
+  /**
+   * Returns true if status update interval has been reached.
+   *
+   * @return true if status update interval has been reached.
+   */
+  public boolean updateStatus() {
+    return (this.delay > 0) && this.delay % this.statusUpdateInterval == 0;
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java
new file mode 100644
index 00000000000..0f3d59b6cb9
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides functionality necessary for caching blocks of data read from FileSystem.
+ * Each cache block is stored on the local disk as a separate file.
+ */
+public class SingleFilePerBlockCache implements BlockCache {
+  private static final Logger LOG = LoggerFactory.getLogger(SingleFilePerBlockCache.class);
+
+  // Blocks stored in this cache.
+  private Map<Integer, Entry> blocks = new ConcurrentHashMap<>();
+
+  // Number of times a block was read from this cache.
+  // Used for determining cache utilization factor.
+  private int numGets = 0;
+
+  private boolean closed;
+
+  // Cache entry.
+  // Each block is stored as a separate file.
+  private static class Entry {
+    private final int blockNumber;
+    private final Path path;
+    private final int size;
+    private final long checksum;
+
+    Entry(int blockNumber, Path path, int size, long checksum) {
+      this.blockNumber = blockNumber;
+      this.path = path;
+      this.size = size;
+      this.checksum = checksum;
+    }
+
+    @Override
+    public String toString() {
+      return String.format(
+          "([%03d] %s: size = %d, checksum = %d)",
+          this.blockNumber, this.path, this.size, this.checksum);
+    }
+  }
+
+  public SingleFilePerBlockCache() {
+  }
+
+  /**
+   * Indicates whether the given block is in this cache.
+   */
+  @Override
+  public boolean containsBlock(int blockNumber) {
+    return this.blocks.containsKey(blockNumber);
+  }
+
+  /**
+   * Gets the blocks in this cache.
+   */
+  @Override
+  public Iterable<Integer> blocks() {
+    return Collections.unmodifiableList(new ArrayList<Integer>(this.blocks.keySet()));
+  }
+
+  /**
+   * Gets the number of blocks in this cache.
+   */
+  @Override
+  public int size() {
+    return this.blocks.size();
+  }
+
+  /**
+   * Gets the block having the given {@code blockNumber}.
+   *
+   * @throws IllegalArgumentException if buffer is null.
+   */
+  @Override
+  public void get(int blockNumber, ByteBuffer buffer) throws IOException {
+    if (this.closed) {
+      return;
+    }
+
+    Validate.checkNotNull(buffer, "buffer");
+
+    Entry entry = this.getEntry(blockNumber);
+    buffer.clear();
+    this.readFile(entry.path, buffer);
+    buffer.rewind();
+
+    validateEntry(entry, buffer);
+  }
+
+  protected int readFile(Path path, ByteBuffer buffer) throws IOException {
+    int numBytesRead = 0;
+    int numBytes;
+    FileChannel channel = FileChannel.open(path, StandardOpenOption.READ);
+    while ((numBytes = channel.read(buffer)) > 0) {
+      numBytesRead += numBytes;
+    }
+    buffer.limit(buffer.position());
+    channel.close();
+    return numBytesRead;
+  }
+
+  private Entry getEntry(int blockNumber) {
+    Validate.checkNotNegative(blockNumber, "blockNumber");
+
+    Entry entry = this.blocks.get(blockNumber);
+    if (entry == null) {
+      throw new IllegalStateException(String.format("block %d not found in cache", blockNumber));
+    }
+    this.numGets++;
+    return entry;
+  }
+
+  /**
+   * Puts the given block in this cache.
+   *
+   * @throws IllegalArgumentException if buffer is null.
+   * @throws IllegalArgumentException if buffer.limit() is zero or negative.
+   */
+  @Override
+  public void put(int blockNumber, ByteBuffer buffer) throws IOException {
+    if (this.closed) {
+      return;
+    }
+
+    Validate.checkNotNull(buffer, "buffer");
+
+    if (this.blocks.containsKey(blockNumber)) {
+      Entry entry = this.blocks.get(blockNumber);
+      validateEntry(entry, buffer);
+      return;
+    }
+
+    Validate.checkPositiveInteger(buffer.limit(), "buffer.limit()");
+
+    Path blockFilePath = getCacheFilePath();
+    long size = Files.size(blockFilePath);
+    if (size != 0) {
+      String message =
+          String.format("[%d] temp file already has data. %s (%d)",
+          blockNumber, blockFilePath, size);
+      throw new IllegalStateException(message);
+    }
+
+    this.writeFile(blockFilePath, buffer);
+    long checksum = BufferData.getChecksum(buffer);
+    Entry entry = new Entry(blockNumber, blockFilePath, buffer.limit(), checksum);
+    this.blocks.put(blockNumber, entry);
+  }
+
+  private static final Set<? extends OpenOption> CREATE_OPTIONS =
+      EnumSet.of(StandardOpenOption.WRITE,
+          StandardOpenOption.CREATE,
+          StandardOpenOption.TRUNCATE_EXISTING);
+
+  protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
+    buffer.rewind();
+    WritableByteChannel writeChannel = Files.newByteChannel(path, CREATE_OPTIONS);
+    while (buffer.hasRemaining()) {
+      writeChannel.write(buffer);
+    }
+    writeChannel.close();
+  }
+
+  protected Path getCacheFilePath() throws IOException {
+    return getTempFilePath();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.closed) {
+      return;
+    }
+
+    this.closed = true;
+
+    LOG.info(this.getStats());
+    int numFilesDeleted = 0;
+
+    for (Entry entry : this.blocks.values()) {
+      try {
+        Files.deleteIfExists(entry.path);
+        numFilesDeleted++;
+      } catch (IOException e) {
+        // Ignore while closing so that we can delete as many cache files as possible.
+      }
+    }
+
+    if (numFilesDeleted > 0) {
+      LOG.info("Deleted {} cache files", numFilesDeleted);
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("stats: ");
+    sb.append(getStats());
+    sb.append(", blocks:[");
+    sb.append(this.getIntList(this.blocks()));
+    sb.append("]");
+    return sb.toString();
+  }
+
+  private void validateEntry(Entry entry, ByteBuffer buffer) {
+    if (entry.size != buffer.limit()) {
+      String message = String.format(
+          "[%d] entry.size(%d) != buffer.limit(%d)",
+          entry.blockNumber, entry.size, buffer.limit());
+      throw new IllegalStateException(message);
+    }
+
+    long checksum = BufferData.getChecksum(buffer);
+    if (entry.checksum != checksum) {
+      String message = String.format(
+          "[%d] entry.checksum(%d) != buffer checksum(%d)",
+          entry.blockNumber, entry.checksum, checksum);
+      throw new IllegalStateException(message);
+    }
+  }
+
+  /**
+   * Produces a human readable list of blocks for the purpose of logging.
+   * This method minimizes the length of returned list by converting
+   * a contiguous list of blocks into a range.
+   * for example,
+   * 1, 3, 4, 5, 6, 8 becomes 1, 3~6, 8
+   */
+  private String getIntList(Iterable<Integer> nums) {
+    List<String> numList = new ArrayList<>();
+    List<Integer> numbers = new ArrayList<Integer>();
+    for (Integer n : nums) {
+      numbers.add(n);
+    }
+    Collections.sort(numbers);
+
+    int index = 0;
+    while (index < numbers.size()) {
+      int start = numbers.get(index);
+      int prev = start;
+      int end = start;
+      while ((++index < numbers.size()) && ((end = numbers.get(index)) == prev + 1)) {
+        prev = end;
+      }
+
+      if (start == prev) {
+        numList.add(Integer.toString(start));
+      } else {
+        numList.add(String.format("%d~%d", start, prev));
+      }
+    }
+
+    return String.join(", ", numList);
+  }
+
+  private String getStats() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(String.format(
+        "#entries = %d, #gets = %d",
+        this.blocks.size(), this.numGets));
+    return sb.toString();
+  }
+
+  private static final String CACHE_FILE_PREFIX = "fs-cache-";
+
+  public static boolean isCacheSpaceAvailable(long fileSize) {
+    try {
+      Path cacheFilePath = getTempFilePath();
+      long freeSpace = new File(cacheFilePath.toString()).getUsableSpace();
+      LOG.info("fileSize = {}, freeSpace = {}", fileSize, freeSpace);
+      Files.deleteIfExists(cacheFilePath);
+      return fileSize < freeSpace;
+    } catch (IOException e) {
+      LOG.error("isCacheSpaceAvailable", e);
+      return false;
+    }
+  }
+
+  // The suffix (file extension) of each serialized index file.
+  private static final String BINARY_FILE_SUFFIX = ".bin";
+
+  // File attributes attached to any intermediate temporary file created during index creation.
+  private static final FileAttribute<Set<PosixFilePermission>> TEMP_FILE_ATTRS =
+      PosixFilePermissions.asFileAttribute(EnumSet.of(PosixFilePermission.OWNER_READ,
+          PosixFilePermission.OWNER_WRITE));
+
+  private static Path getTempFilePath() throws IOException {
+    return Files.createTempFile(
+        CACHE_FILE_PREFIX,
+        BINARY_FILE_SUFFIX,
+        TEMP_FILE_ATTRS
+    );
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Validate.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Validate.java
new file mode 100644
index 00000000000..18e77fd1d0e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Validate.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collection;
+
+/**
+ * A superset of Validate class in Apache commons lang3.
+ *
+ * It provides consistent message strings for frequently encountered checks.
+ * That simplifies callers because they have to supply only the name of the argument
+ * that failed a check instead of having to supply the entire message.
+ */
+public final class Validate {
+  private Validate() {}
+
+  /**
+   * Validates that the given reference argument is not null.
+   *
+   * @param obj the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNull(Object obj, String argName) {
+    checkArgument(obj != null, "'%s' must not be null.", argName);
+  }
+
+  /**
+   * Validates that the given integer argument is not zero or negative.
+   *
+   * @param value the argument value to validate
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkPositiveInteger(long value, String argName) {
+    checkArgument(value > 0, "'%s' must be a positive integer.", argName);
+  }
+
+  /**
+   * Validates that the given integer argument is not negative.
+   *
+   * @param value the argument value to validate
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNegative(long value, String argName) {
+    checkArgument(value >= 0, "'%s' must not be negative.", argName);
+  }
+
+  /*
+   * Validates that the expression (that checks a required field is present) is true.
+   *
+   * @param isPresent indicates whether the given argument is present.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkRequired(boolean isPresent, String argName) {
+    checkArgument(isPresent, "'%s' is required.", argName);
+  }
+
+  /**
+   * Validates that the expression (that checks a field is valid) is true.
+   *
+   * @param isValid indicates whether the given argument is valid.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkValid(boolean isValid, String argName) {
+    checkArgument(isValid, "'%s' is invalid.", argName);
+  }
+
+  /**
+   * Validates that the expression (that checks a field is valid) is true.
+   *
+   * @param isValid indicates whether the given argument is valid.
+   * @param argName the name of the argument being validated.
+   * @param validValues the list of values that are allowed.
+   */
+  public static void checkValid(boolean isValid, String argName, String validValues) {
+    checkArgument(isValid, "'%s' is invalid. Valid values are: %s.", argName, validValues);
+  }
+
+  /**
+   * Validates that the given string is not null and has non-zero length.
+   *
+   * @param arg the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNullAndNotEmpty(String arg, String argName) {
+    Validate.checkNotNull(arg, argName);
+    Validate.checkArgument(
+        arg.length() > 0,
+        "'%s' must not be empty.",
+        argName);
+  }
+
+  /**
+   * Validates that the given array is not null and has at least one element.
+   *
+   * @param <T> the type of array's elements.
+   * @param array the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static <T> void checkNotNullAndNotEmpty(T[] array, String argName) {
+    Validate.checkNotNull(array, argName);
+    checkNotEmpty(array.length, argName);
+  }
+
+  /**
+   * Validates that the given array is not null and has at least one element.
+   *
+   * @param array the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNullAndNotEmpty(byte[] array, String argName) {
+    Validate.checkNotNull(array, argName);
+    checkNotEmpty(array.length, argName);
+  }
+
+  /**
+   * Validates that the given array is not null and has at least one element.
+   *
+   * @param array the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNullAndNotEmpty(short[] array, String argName) {
+    Validate.checkNotNull(array, argName);
+    checkNotEmpty(array.length, argName);
+  }
+
+  /**
+   * Validates that the given array is not null and has at least one element.
+   *
+   * @param array the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNullAndNotEmpty(int[] array, String argName) {
+    Validate.checkNotNull(array, argName);
+    checkNotEmpty(array.length, argName);
+  }
+
+  /**
+   * Validates that the given array is not null and has at least one element.
+   *
+   * @param array the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNullAndNotEmpty(long[] array, String argName) {
+    Validate.checkNotNull(array, argName);
+    checkNotEmpty(array.length, argName);
+  }
+
+  /**
+   * Validates that the given buffer is not null and has non-zero capacity.
+   *
+   * @param <T> the type of iterable's elements.
+   * @param iter the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static <T> void checkNotNullAndNotEmpty(Iterable<T> iter, String argName) {
+    Validate.checkNotNull(iter, argName);
+    int minNumElements = iter.iterator().hasNext() ? 1 : 0;
+    checkNotEmpty(minNumElements, argName);
+  }
+
+  /**
+   * Validates that the given set is not null and has an exact number of items.
+   *
+   * @param <T> the type of collection's elements.
+   * @param collection the argument reference to validate.
+   * @param numElements the expected number of elements in the collection.
+   * @param argName the name of the argument being validated.
+   */
+  public static <T> void checkNotNullAndNumberOfElements(
+      Collection<T> collection, int numElements, String argName) {
+    Validate.checkNotNull(collection, argName);
+    checkArgument(
+        collection.size() == numElements,
+        "Number of elements in '%s' must be exactly %s, %s given.",
+        argName,
+        numElements,
+        collection.size()
+    );
+  }
+
+  /**
+   * Validates that the given two values are equal.
+   *
+   * @param value1 the first value to check.
+   * @param value1Name the name of the first argument.
+   * @param value2 the second value to check.
+   * @param value2Name the name of the second argument.
+   */
+  public static void checkValuesEqual(
+      long value1,
+      String value1Name,
+      long value2,
+      String value2Name) {
+    checkArgument(
+        value1 == value2,
+        "'%s' (%s) must equal '%s' (%s).",
+        value1Name,
+        value1,
+        value2Name,
+        value2);
+  }
+
+  /**
+   * Validates that the first value is an integer multiple of the second value.
+   *
+   * @param value1 the first value to check.
+   * @param value1Name the name of the first argument.
+   * @param value2 the second value to check.
+   * @param value2Name the name of the second argument.
+   */
+  public static void checkIntegerMultiple(
+      long value1,
+      String value1Name,
+      long value2,
+      String value2Name) {
+    checkArgument(
+        (value1 % value2) == 0,
+        "'%s' (%s) must be an integer multiple of '%s' (%s).",
+        value1Name,
+        value1,
+        value2Name,
+        value2);
+  }
+
+  /**
+   * Validates that the first value is greater than the second value.
+   *
+   * @param value1 the first value to check.
+   * @param value1Name the name of the first argument.
+   * @param value2 the second value to check.
+   * @param value2Name the name of the second argument.
+   */
+  public static void checkGreater(
+      long value1,
+      String value1Name,
+      long value2,
+      String value2Name) {
+    checkArgument(
+        value1 > value2,
+        "'%s' (%s) must be greater than '%s' (%s).",
+        value1Name,
+        value1,
+        value2Name,
+        value2);
+  }
+
+  /**
+   * Validates that the first value is greater than or equal to the second value.
+   *
+   * @param value1 the first value to check.
+   * @param value1Name the name of the first argument.
+   * @param value2 the second value to check.
+   * @param value2Name the name of the second argument.
+   */
+  public static void checkGreaterOrEqual(
+      long value1,
+      String value1Name,
+      long value2,
+      String value2Name) {
+    checkArgument(
+        value1 >= value2,
+        "'%s' (%s) must be greater than or equal to '%s' (%s).",
+        value1Name,
+        value1,
+        value2Name,
+        value2);
+  }
+
+  /**
+   * Validates that the first value is less than or equal to the second value.
+   *
+   * @param value1 the first value to check.
+   * @param value1Name the name of the first argument.
+   * @param value2 the second value to check.
+   * @param value2Name the name of the second argument.
+   */
+  public static void checkLessOrEqual(
+      long value1,
+      String value1Name,
+      long value2,
+      String value2Name) {
+    checkArgument(
+        value1 <= value2,
+        "'%s' (%s) must be less than or equal to '%s' (%s).",
+        value1Name,
+        value1,
+        value2Name,
+        value2);
+  }
+
+  /**
+   * Validates that the given value is within the given range of values.
+   *
+   * @param value the value to check.
+   * @param valueName the name of the argument.
+   * @param minValueInclusive inclusive lower limit for the value.
+   * @param maxValueInclusive inclusive upper limit for the value.
+   */
+  public static void checkWithinRange(
+      long value,
+      String valueName,
+      long minValueInclusive,
+      long maxValueInclusive) {
+    checkArgument(
+        (value >= minValueInclusive) && (value <= maxValueInclusive),
+        "'%s' (%s) must be within the range [%s, %s].",
+        valueName,
+        value,
+        minValueInclusive,
+        maxValueInclusive);
+  }
+
+  /**
+   * Validates that the given value is within the given range of values.
+   *
+   * @param value the value to check.
+   * @param valueName the name of the argument.
+   * @param minValueInclusive inclusive lower limit for the value.
+   * @param maxValueInclusive inclusive upper limit for the value.
+   */
+  public static void checkWithinRange(
+      double value,
+      String valueName,
+      double minValueInclusive,
+      double maxValueInclusive) {
+    checkArgument(
+        (value >= minValueInclusive) && (value <= maxValueInclusive),
+        "'%s' (%s) must be within the range [%s, %s].",
+        valueName,
+        value,
+        minValueInclusive,
+        maxValueInclusive);
+  }
+
+  /**
+   * Validates that the given path exists.
+   *
+   * @param path the path to check.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkPathExists(Path path, String argName) {
+    checkNotNull(path, argName);
+    checkArgument(Files.exists(path), "Path %s (%s) does not exist.", argName, path);
+  }
+
+  /**
+   * Validates that the given path exists and is a directory.
+   *
+   * @param path the path to check.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkPathExistsAsDir(Path path, String argName) {
+    checkPathExists(path, argName);
+    checkArgument(
+        Files.isDirectory(path),
+        "Path %s (%s) must point to a directory.",
+        argName,
+        path);
+  }
+
+  /**
+   * Validates that the given path exists and is a file.
+   *
+   * @param path the path to check.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkPathExistsAsFile(Path path, String argName) {
+    checkPathExists(path, argName);
+    checkArgument(Files.isRegularFile(path), "Path %s (%s) must point to a file.", argName, path);
+  }
+
+  public static void checkArgument(boolean expression, String format, Object... args) {
+    org.apache.commons.lang3.Validate.isTrue(expression, format, args);
+  }
+
+  public static void checkState(boolean expression, String format, Object... args) {
+    org.apache.commons.lang3.Validate.validState(expression, format, args);
+  }
+
+  private static void checkNotEmpty(int arraySize, String argName) {
+    Validate.checkArgument(
+        arraySize > 0,
+        "'%s' must have at least one element.",
+        argName);
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/package-info.java
new file mode 100644
index 00000000000..f6f7b6f9e01
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * block caching for use in object store clients.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index e5369b84883..3e1833a4fc8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1108,4 +1108,24 @@ public final class Constants {
    * Require that all S3 access is made through Access Points.
    */
   public static final String AWS_S3_ACCESSPOINT_REQUIRED = "fs.s3a.accesspoint.required";
+
+  /**
+   * Controls whether the prefetching input stream is enabled.
+   */
+  public static final String PREFETCH_ENABLED_KEY = "fs.s3a.prefetch.enabled";
+
+  // If the default values are used, each file opened for reading will consume
+  // 64 MB of heap space (8 blocks x 8 MB each).
+
+  /**
+   * The size of a single prefetched block in number of bytes.
+   */
+  public static final String PREFETCH_BLOCK_SIZE_KEY = "fs.s3a.prefetch.block.size";
+  public static final int PREFETCH_BLOCK_DEFAULT_SIZE = 8 * 1024 * 1024;
+
+  /**
+   * Maximum number of blocks prefetched at any given time.
+   */
+  public static final String PREFETCH_BLOCK_COUNT_KEY = "fs.s3a.prefetch.block.count";
+  public static final int PREFETCH_BLOCK_DEFAULT_COUNT = 8;
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 15e240f9018..c7450e57a70 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -76,6 +76,8 @@ import com.amazonaws.services.s3.transfer.model.CopyResult;
 import com.amazonaws.services.s3.transfer.model.UploadResult;
 import com.amazonaws.event.ProgressListener;
 
+import com.twitter.util.ExecutorServiceFuturePool;
+import com.twitter.util.FuturePool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -165,6 +167,7 @@ import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.fs.s3a.commit.PutTracker;
 import org.apache.hadoop.fs.s3a.commit.MagicCommitIntegration;
 import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.fs.s3a.read.S3PrefetchingInputStream;
 import org.apache.hadoop.fs.s3a.select.SelectBinding;
 import org.apache.hadoop.fs.s3a.select.SelectConstants;
 import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
@@ -281,6 +284,19 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   private TransferManager transfers;
   private ExecutorService boundedThreadPool;
   private ThreadPoolExecutor unboundedThreadPool;
+
+  // S3 reads are prefetched asynchronously using this future pool.
+  private FuturePool futurePool;
+
+  // If true, the prefetching input stream is used for reads.
+  private boolean prefetchEnabled;
+
+  // Size in bytes of a single prefetch block.
+  private int prefetchBlockSize;
+
+  // Size of prefetch queue (in number of blocks).
+  private int prefetchBlockCount;
+
   private int executorCapacity;
   private long multiPartThreshold;
   public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
@@ -477,6 +493,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
       enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
 
+      this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, true);
+      this.prefetchBlockSize = intOption(
+          conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, PREFETCH_BLOCK_DEFAULT_SIZE);
+      this.prefetchBlockCount =
+          intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
+
       initThreadPools(conf);
 
       int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
@@ -581,11 +603,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       // amazon client exception: stop all services then throw the translation
       cleanupWithLogger(LOG, span);
       stopAllServices();
+      if (this.futurePool != null) {
+        this.futurePool = null;
+      }
       throw translateException("initializing ", new Path(name), e);
     } catch (IOException | RuntimeException e) {
       // other exceptions: stop the services.
       cleanupWithLogger(LOG, span);
       stopAllServices();
+      if (this.futurePool != null) {
+        this.futurePool = null;
+      }
       throw e;
     }
   }
@@ -689,9 +717,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
     long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
         DEFAULT_KEEPALIVE_TIME, 0);
+    int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 0;
+
     boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
         maxThreads,
-        maxThreads + totalTasks,
+        maxThreads + totalTasks + numPrefetchThreads,
         keepAliveTime, TimeUnit.SECONDS,
         name + "-bounded");
     unboundedThreadPool = new ThreadPoolExecutor(
@@ -703,6 +733,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     unboundedThreadPool.allowCoreThreadTimeOut(true);
     executorCapacity = intOption(conf,
         EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
+    if (this.prefetchEnabled) {
+      this.futurePool = new ExecutorServiceFuturePool(boundedThreadPool);
+    }
   }
 
   /**
@@ -1442,12 +1475,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         auditSpan);
     fileInformation.applyOptions(readContext);
     LOG.debug("Opening '{}'", readContext);
-    return new FSDataInputStream(
-        new S3AInputStream(
-            readContext.build(),
-            createObjectAttributes(path, fileStatus),
-            createInputStreamCallbacks(auditSpan),
-            inputStreamStats));
+
+    if (this.prefetchEnabled) {
+      return new FSDataInputStream(
+          new S3PrefetchingInputStream(
+              readContext.build(),
+              createObjectAttributes(path, fileStatus),
+              createInputStreamCallbacks(auditSpan)));
+    } else {
+      return new FSDataInputStream(
+          new S3AInputStream(
+              readContext.build(),
+              createObjectAttributes(path, fileStatus),
+              createInputStreamCallbacks(auditSpan),
+              inputStreamStats));
+    }
   }
 
   /**
@@ -1531,7 +1573,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         invoker,
         statistics,
         statisticsContext,
-        fileStatus)
+        fileStatus,
+        futurePool,
+        prefetchBlockSize,
+        prefetchBlockCount)
         .withAuditSpan(auditSpan);
     openFileHelper.applyDefaultOptions(roc);
     return roc.build();
@@ -3551,12 +3596,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   }
 
   protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
-  createCopyFromLocalCallbacks() throws IOException {
+      createCopyFromLocalCallbacks() throws IOException {
     LocalFileSystem local = getLocal(getConf());
     return new CopyFromLocalCallbacksImpl(local);
   }
 
-  protected class CopyFromLocalCallbacksImpl implements
+  protected final class CopyFromLocalCallbacksImpl implements
       CopyFromLocalOperation.CopyFromLocalOperationCallbacks {
     private final LocalFileSystem local;
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
index f416cf9485d..fcdf54a5516 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import com.twitter.util.FuturePool;
+
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -64,6 +66,15 @@ public class S3AReadOpContext extends S3AOpContext {
    */
   private long asyncDrainThreshold;
 
+  // S3 reads are prefetched asynchronously using this future pool.
+  private FuturePool futurePool;
+
+  // Size in bytes of a single prefetch block.
+  private final int prefetchBlockSize;
+
+  // Size of prefetch queue (in number of blocks).
+  private final int prefetchBlockCount;
+
   /**
    * Instantiate.
    * @param path path of read
@@ -71,17 +82,30 @@ public class S3AReadOpContext extends S3AOpContext {
    * @param stats Fileystem statistics (may be null)
    * @param instrumentation statistics context
    * @param dstFileStatus target file status
+   * @param futurePool the FuturePool instance used by async prefetches.
+   * @param prefetchBlockSize the size (in number of bytes) of each prefetched block.
+   * @param prefetchBlockCount maximum number of prefetched blocks.
    */
   public S3AReadOpContext(
       final Path path,
       Invoker invoker,
       @Nullable FileSystem.Statistics stats,
       S3AStatisticsContext instrumentation,
-      FileStatus dstFileStatus) {
+      FileStatus dstFileStatus,
+      FuturePool futurePool,
+      int prefetchBlockSize,
+      int prefetchBlockCount) {
 
     super(invoker, stats, instrumentation,
         dstFileStatus);
     this.path = requireNonNull(path);
+    this.futurePool = futurePool;
+    Preconditions.checkArgument(
+        prefetchBlockSize > 0, "invalid prefetchBlockSize %d", prefetchBlockSize);
+    this.prefetchBlockSize = prefetchBlockSize;
+    Preconditions.checkArgument(
+        prefetchBlockCount > 0, "invalid prefetchBlockCount %d", prefetchBlockCount);
+    this.prefetchBlockCount = prefetchBlockCount;
   }
 
   /**
@@ -199,6 +223,33 @@ public class S3AReadOpContext extends S3AOpContext {
     return asyncDrainThreshold;
   }
 
+  /**
+   * Gets the {@code FuturePool} used for asynchronous prefetches.
+   *
+   * @return the {@code FuturePool} used for asynchronous prefetches.
+   */
+  public FuturePool getFuturePool() {
+    return this.futurePool;
+  }
+
+  /**
+   * Gets the size in bytes of a single prefetch block.
+   *
+   * @return the size in bytes of a single prefetch block.
+   */
+  public int getPrefetchBlockSize() {
+    return this.prefetchBlockSize;
+  }
+
+  /**
+   * Gets the size of prefetch queue (in number of blocks).
+   *
+   * @return the size of prefetch queue (in number of blocks).
+   */
+  public int getPrefetchBlockCount() {
+    return this.prefetchBlockCount;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/README.md b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/README.md
new file mode 100644
index 00000000000..fcc68f5c17c
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/README.md
@@ -0,0 +1,107 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# High Performance S3 InputStream
+
+## Overview
+This document describes how S3EInputStream works. There are two main goals of this document:
+1. Make it easier to understand the current implementation in this folder.
+1. Be useful in applying the underlying technique to other Hadoop file systems.
+
+This document is divided into three high level areas. The first part explains important high level concepts. The second part describes components involved in the implementation in this folder. The components are further divided into two sub sections: those independent of S3 and those that are specific to S3. Finally, the last section brings everything together by describing how the components interact with one another to support sequential and random reads.
+
+Moreover, this blog provides some performance numbers and addtional high level information: https://medium.com/pinterest-engineering/improving-efficiency-and-reducing-runtime-using-s3-read-optimization-b31da4b60fa0
+
+## Motivation
+
+The main motivation behind implementing this feature is to improve performance when reading data stored on S3. This situation arises often when big data stored on S3 is to be processed by a job. Many data processing jobs spend most of their time waiting for data to arrive. If we can increase the speed at which a job reads data, the job will finish sooner and save us considerable time and money in the process. Given that processing is costly, these savings can add up quickly to a substant [...]
+
+## Basic Concepts
+
+- **File** : A binary blob of data stored on some storage device.
+- **Split** : A contiguous portion of a file. Each file is made up of one or more splits. Each split is a contiguous blob of data processed by a single job.
+- **Block** : Similar to a split but much smaller. Each split is made up of a number of blocks. The size of the first n-1 blocks is same. The size of the last block may be same or smaller.
+- **Block based reading** : The granularity of read is one block. That is, we read an entire block and return or none at all. Multiple blocks may be read in parallel.
+- **Position within a file** : The current read position in a split is a 0 based offset relative to the start of the split. Given that the first n-1 blocks of a split are guaranteed to be of the same size, the current position can also be denoted as the 0 based number of the current block and the 0 based offset within the current block.
+
+## Components
+
+This section lists the components in this implementation.
+
+### Components independent of a file system.
+
+The following components are not really tied to S3 file system. They can be used with any other underlying storage.
+
+#### Block related functionality
+
+- `BlockData.java` : Holds metadata about blocks of data in a file. For example, number of blocks in a given file, the start offset of each block, etc. Also holds the state of the block at a glance (whether it is yet to be read, already cached locally, etc).
+- `BufferData.java` : Holds the state of a `ByteBuffer` that is currently in use. Each such buffer holds exactly one block in memory. The maximum number of such buffers and the size of each buffer is fixed during runtime (but configurable at start).
+- `FilePosition.java` : Functionality related to tracking the position within a file (as relative offset from current block start).
+- `S3BlockCache.java` : Despite the S3 in its name, this is simply an interface for a cache of blocks.
+- `S3FilePerBlockCache.java` : Despite the S3 in its name, this class implements local disk based block cache.
+
+
+#### Resource management
+- `ResourcePool.java` : Abstracat base class for a pool of resources of generic type T.
+- `BoundedResourcePool.java` : Abstract base class for a fixed sized pool of resources of generic type T. A fixed sized pool allows reuse of resources instead of having to create new resource each time.
+- `BufferPool.java` : Fixed sized pool of `ByteBuffer` instances. This pool holds the prefetched blocks of data in memory.
+
+#### Misc functionality not specific to a file system
+- `Retryer.java` : Helper for implementing a max delay fixed interval retrying. Used by `BufferPool` and `S3CachingBlockManager` to retry certain operations.
+
+### Supporting functionality (not really a part of the main feature)
+
+- README.md : This file.
+- `BlockOperations.java` : A helper for tracking and logging block level operations on a file. Mostly used for debugging. Can also be used (in future) for keep statistic of block accesses.
+- `Clients.java` : Provides a way to create S3 clients with predefined configuration.
+- `Io.java` : Provides misc functionality related to IO.
+- `Validate.java` : A superset of Validate class in Apache commons lang3.
+
+
+### Components specific to S3 file system.
+
+- `S3AccessRetryer.java` : Provides retry related functionality when accessing S3.
+- `S3CachingInputStream.java` : Heavily relies on `S3CachingBlockManager` to provide the desired functionality. That said, the decision on when to prefetch and when to cache is handled by this class.
+- `S3EFileSystem.java` : Very thin wrapper over `S3AFileSystem` just enough to handle `S3E` specific configuration and overriding of `initialize()` and `open()` methods.
+- `S3EInputStream.java` : Chooses between `S3InMemoryInputStream` and `S3CachingInputStream` depending upon a configurable threshold. That is, any file smaller than the threshold is fully read in-memory.
+- `S3File.java` : Encapsulates low level interactions with S3 object on AWS.
+- `S3InputStream.java` : The base class of `S3EInputStream`. It could be merged into `S3EInputStream`. I had originally created to compare many different implementations.
+- `S3Reader.java` : Provides functionality to read S3 file one block at a time.
+
+Although the following files are currently S3 specific, they can be easily made S3 independent by passing in a reader.
+
+- `S3BlockManager.java` : A naive implementation of a block manager that provides no prefetching or caching. Useful for comparing performance difference between this baseline and `S3CachingBlockManager`.
+- `S3InMemoryInputStream.java` : reads entire file in memory and returns an InputStream over it.
+- `S3CachingBlockManager.java` : The core functionality of this feature. This class provides prefetched + local cached access to an S3 file.
+
+## Operation
+
+This section describes how the above components interact with one another to provide improved read functionality.
+
+### Sequential reads
+
+Immediately aftering opening the `S3InputStream`, the in-memory cache as well as the on-disk cache is empty. When a caller calls any overload of `read()` for the first time, the `ensureCurrentBuffer()` method gets invoked. This is because we always satisfy reads from a block that has been read completely in memory. This method will block only on the very first read so that the first block is fully read in.
+
+The `ensureCurrentBuffer()` method implements bulk of the prefetch/caching decisions. At a high level, here is what it does:
+1. if the current buffer is valid (that is, already in memory) and has not been fully read; then this method does nothing because the next read can be satisfied from the current buffer.
+2. if the current buffer is valid and has been fully read then it moves to the next block. In addition, it issues a prefetch request for next n-1 blocks. Generally speaking prefetch request for n-2 blocks would have been already made when reads started for the current block. Therefore, in most cases this ends up requesting prefetch of the block after next n-2 blocks.
+3. if file position has changed before the current buffer has been fully read (as a result of a seek), we issue a caching request to the block manager to potentially cache the current buffer. Note the word 'potentially'. This is because any request for caching or prefetching are only hints. The block manager is free to ignore them without affecting the overall functionality.
+
+With the above steps, as long as there is no `seek()` calls blocks keep getting prefetched and sequential reads take place without any local caching involved.
+
+### Random access reads
+
+As described in the preveious section, we cache the current block when we detect a `seek()` out of it before the block has been fully read. If another `seek()` brings the file position inside this cached block then that request can be satisfied from the locally cached block. This is typically orders of magnitude faster than a network based read.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3BlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3BlockManager.java
new file mode 100644
index 00000000000..f7c47ef7925
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3BlockManager.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.BlockManager;
+import org.apache.hadoop.fs.common.Validate;
+
+/**
+ * Provides read access to S3 file one block at a time.
+ *
+ * A naive implementation of a {@code BlockManager} that provides no prefetching or caching.
+ * Useful baseline for comparing performance difference against {@code S3CachingBlockManager}.
+ */
+public class S3BlockManager extends BlockManager {
+
+  /**
+   * Reader that reads from S3 file.
+   */
+  private final S3Reader reader;
+
+  /**
+   * Constructs an instance of {@code S3BlockManager}.
+   *
+   * @param reader a reader that reads from S3 file.
+   * @param blockData information about each block of the S3 file.
+   *
+   * @throws IllegalArgumentException if reader is null.
+   * @throws IllegalArgumentException if blockData is null.
+   */
+  public S3BlockManager(S3Reader reader, BlockData blockData) {
+    super(blockData);
+
+    Validate.checkNotNull(reader, "reader");
+
+    this.reader = reader;
+  }
+
+  /**
+   * Reads into the given {@code buffer} {@code size} bytes from the underlying file
+   * starting at {@code startOffset}.
+   *
+   * @param buffer the buffer to read data in to.
+   * @param startOffset the offset at which reading starts.
+   * @param size the number bytes to read.
+   * @return number of bytes read.
+   */
+  @Override
+  public int read(ByteBuffer buffer, long startOffset, int size) throws IOException {
+    return this.reader.read(buffer, startOffset, size);
+  }
+
+  @Override
+  public void close() {
+    this.reader.close();
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java
new file mode 100644
index 00000000000..c4fafd56f1d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.twitter.util.FuturePool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.CachingBlockManager;
+import org.apache.hadoop.fs.common.Validate;
+
+/**
+ * Provides access to S3 file one block at a time.
+ */
+public class S3CachingBlockManager extends CachingBlockManager {
+  private static final Logger LOG = LoggerFactory.getLogger(S3CachingBlockManager.class);
+
+  /**
+   * Reader that reads from S3 file.
+   */
+  private final S3Reader reader;
+
+  /**
+   * Constructs an instance of a {@code S3CachingBlockManager}.
+   *
+   * @param futurePool asynchronous tasks are performed in this pool.
+   * @param reader reader that reads from S3 file.
+   * @param blockData information about each block of the S3 file.
+   * @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
+   *
+   * @throws IllegalArgumentException if reader is null.
+   */
+  public S3CachingBlockManager(
+      FuturePool futurePool,
+      S3Reader reader,
+      BlockData blockData,
+      int bufferPoolSize) {
+    super(futurePool, blockData, bufferPoolSize);
+
+    Validate.checkNotNull(reader, "reader");
+
+    this.reader = reader;
+  }
+
+  protected S3Reader getReader() {
+    return this.reader;
+  }
+
+  /**
+   * Reads into the given {@code buffer} {@code size} bytes from the underlying file
+   * starting at {@code startOffset}.
+   *
+   * @param buffer the buffer to read data in to.
+   * @param startOffset the offset at which reading starts.
+   * @param size the number bytes to read.
+   * @return number of bytes read.
+   */
+  @Override
+  public int read(ByteBuffer buffer, long startOffset, int size) throws IOException {
+    return this.reader.read(buffer, startOffset, size);
+  }
+
+  @Override
+  public synchronized void close() {
+    this.reader.close();
+
+    super.close();
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
new file mode 100644
index 00000000000..11170025268
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.io.IOException;
+
+import com.twitter.util.FuturePool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.BlockManager;
+import org.apache.hadoop.fs.common.BufferData;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+
+/**
+ * Provides an {@code InputStream} that allows reading from an S3 file.
+ * Prefetched blocks are cached to local disk if a seek away from the
+ * current block is issued.
+ */
+public class S3CachingInputStream extends S3InputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(S3CachingInputStream.class);
+
+  /**
+   * Number of blocks queued for prefching.
+   */
+  private final int numBlocksToPrefetch;
+
+  private final BlockManager blockManager;
+
+  /**
+   * Initializes a new instance of the {@code S3CachingInputStream} class.
+   *
+   * @param context read-specific operation context.
+   * @param s3Attributes attributes of the S3 object being read.
+   * @param client callbacks used for interacting with the underlying S3 client.
+   *
+   * @throws IllegalArgumentException if context is null.
+   * @throws IllegalArgumentException if s3Attributes is null.
+   * @throws IllegalArgumentException if client is null.
+   */
+  public S3CachingInputStream(
+      S3AReadOpContext context,
+      S3ObjectAttributes s3Attributes,
+      S3AInputStream.InputStreamCallbacks client) {
+    super(context, s3Attributes, client);
+
+    this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
+    int bufferPoolSize = this.numBlocksToPrefetch + 1;
+    this.blockManager = this.createBlockManager(
+        this.getContext().getFuturePool(),
+        this.getReader(),
+        this.getBlockData(),
+        bufferPoolSize);
+    int fileSize = (int) s3Attributes.getLen();
+    LOG.debug("Created caching input stream for {} (size = {})", this.getName(), fileSize);
+  }
+
+  /**
+   * Moves the current read position so that the next read will occur at {@code pos}.
+   *
+   * @param pos the next read will take place at this position.
+   *
+   * @throws IllegalArgumentException if pos is outside of the range [0, file size].
+   */
+  @Override
+  public void seek(long pos) throws IOException {
+    this.throwIfClosed();
+    this.throwIfInvalidSeek(pos);
+
+    // The call to setAbsolute() returns true if the target position is valid and
+    // within the current block. Therefore, no additional work is needed when we get back true.
+    if (!this.getFilePosition().setAbsolute(pos)) {
+      LOG.info("seek({})", getOffsetStr(pos));
+      // We could be here in two cases:
+      // -- the target position is invalid:
+      //    We ignore this case here as the next read will return an error.
+      // -- it is valid but outside of the current block.
+      if (this.getFilePosition().isValid()) {
+        // There are two cases to consider:
+        // -- the seek was issued after this buffer was fully read.
+        //    In this case, it is very unlikely that this buffer will be needed again;
+        //    therefore we release the buffer without caching.
+        // -- if we are jumping out of the buffer before reading it completely then
+        //    we will likely need this buffer again (as observed empirically for Parquet)
+        //    therefore we issue an async request to cache this buffer.
+        if (!this.getFilePosition().bufferFullyRead()) {
+          this.blockManager.requestCaching(this.getFilePosition().data());
+        } else {
+          this.blockManager.release(this.getFilePosition().data());
+        }
+        this.getFilePosition().invalidate();
+        this.blockManager.cancelPrefetches();
+      }
+      this.setSeekTargetPos(pos);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    this.blockManager.close();
+    LOG.info("closed: {}", this.getName());
+  }
+
+  @Override
+  protected boolean ensureCurrentBuffer() throws IOException {
+    if (this.isClosed()) {
+      return false;
+    }
+
+    if (this.getFilePosition().isValid() && this.getFilePosition().buffer().hasRemaining()) {
+      return true;
+    }
+
+    long readPos;
+    int prefetchCount;
+
+    if (this.getFilePosition().isValid()) {
+      // A sequential read results in a prefetch.
+      readPos = this.getFilePosition().absolute();
+      prefetchCount = this.numBlocksToPrefetch;
+    } else {
+      // A seek invalidates the current position.
+      // We prefetch only 1 block immediately after a seek operation.
+      readPos = this.getSeekTargetPos();
+      prefetchCount = 1;
+    }
+
+    if (!this.getBlockData().isValidOffset(readPos)) {
+      return false;
+    }
+
+    if (this.getFilePosition().isValid()) {
+      if (this.getFilePosition().bufferFullyRead()) {
+        this.blockManager.release(this.getFilePosition().data());
+      } else {
+        this.blockManager.requestCaching(this.getFilePosition().data());
+      }
+    }
+
+    int toBlockNumber = this.getBlockData().getBlockNumber(readPos);
+    long startOffset = this.getBlockData().getStartOffset(toBlockNumber);
+
+    for (int i = 1; i <= prefetchCount; i++) {
+      int b = toBlockNumber + i;
+      if (b < this.getBlockData().getNumBlocks()) {
+        this.blockManager.requestPrefetch(b);
+      }
+    }
+
+    BufferData data = this.blockManager.get(toBlockNumber);
+    this.getFilePosition().setData(data, startOffset, readPos);
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    if (this.isClosed()) {
+      return "closed";
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append(String.format("fpos = (%s)%n", this.getFilePosition()));
+    sb.append(this.blockManager.toString());
+    return sb.toString();
+  }
+
+  protected BlockManager createBlockManager(
+      FuturePool futurePool,
+      S3Reader reader,
+      BlockData blockData,
+      int bufferPoolSize) {
+    return new S3CachingBlockManager(futurePool, reader, blockData, bufferPoolSize);
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java
new file mode 100644
index 00000000000..501186a2549
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+
+import org.apache.hadoop.fs.common.Io;
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+
+/**
+ * Encapsulates low level interactions with S3 object on AWS.
+ */
+public class S3File implements Closeable {
+
+  // Read-specific operation context.
+  private final S3AReadOpContext context;
+
+  // S3 object attributes.
+  private final S3ObjectAttributes s3Attributes;
+
+  // Callbacks used for interacting with the underlying S3 client.
+  private final S3AInputStream.InputStreamCallbacks client;
+
+  // Used for reporting input stream access statistics.
+  private final S3AInputStreamStatistics streamStatistics;
+
+  // Enforces change tracking related policies.
+  private final ChangeTracker changeTracker;
+
+  // Maps a stream returned by openForRead() to the associated S3 object.
+  // That allows us to close the object when closing the stream.
+  private Map<InputStream, S3Object> s3Objects;
+
+  /**
+   * Initializes a new instance of the {@code S3File} class.
+   *
+   * @param context read-specific operation context.
+   * @param s3Attributes attributes of the S3 object being read.
+   * @param client callbacks used for interacting with the underlying S3 client.
+   * @param streamStatistics statistics about stream access.
+   * @param changeTracker helps enforce change tracking policy.
+   *
+   * @throws IllegalArgumentException if context is null.
+   * @throws IllegalArgumentException if s3Attributes is null.
+   * @throws IllegalArgumentException if client is null.
+   * @throws IllegalArgumentException if streamStatistics is null.
+   * @throws IllegalArgumentException if changeTracker is null.
+   */
+  public S3File(
+      S3AReadOpContext context,
+      S3ObjectAttributes s3Attributes,
+      S3AInputStream.InputStreamCallbacks client,
+      S3AInputStreamStatistics streamStatistics,
+      ChangeTracker changeTracker) {
+
+    Validate.checkNotNull(context, "context");
+    Validate.checkNotNull(s3Attributes, "s3Attributes");
+    Validate.checkNotNull(client, "client");
+    Validate.checkNotNull(streamStatistics, "streamStatistics");
+    Validate.checkNotNull(changeTracker, "changeTracker");
+
+    this.context = context;
+    this.s3Attributes = s3Attributes;
+    this.client = client;
+    this.streamStatistics = streamStatistics;
+    this.changeTracker = changeTracker;
+    this.s3Objects = new IdentityHashMap<InputStream, S3Object>();
+  }
+
+  /**
+   * Gets an instance of {@code Invoker} for interacting with S3 API.
+   *
+   * @return an instance of {@code Invoker} for interacting with S3 API.
+   */
+  public Invoker getReadInvoker() {
+    return this.context.getReadInvoker();
+  }
+
+  /**
+   * Gets an instance of {@code S3AInputStreamStatistics} used for reporting access metrics.
+   *
+   * @return an instance of {@code S3AInputStreamStatistics} used for reporting access metrics.
+   */
+  public S3AInputStreamStatistics getStatistics() {
+    return this.streamStatistics;
+  }
+
+  /**
+   * Gets the path of this file.
+   *
+   * @return the path of this file.
+   */
+  public String getPath() {
+    return getPath(this.s3Attributes);
+  }
+
+  /**
+   * Gets the path corresponding to the given s3Attributes.
+   *
+   * @param s3Attributes attributes of an S3 object.
+   * @return the path corresponding to the given s3Attributes.
+   */
+  public static String getPath(S3ObjectAttributes s3Attributes) {
+    return String.format("s3a://%s/%s", s3Attributes.getBucket(), s3Attributes.getKey());
+  }
+
+  /**
+   * Gets the size of this file.
+   * Its value is cached once obtained from AWS.
+   *
+   * @return the size of this file.
+   */
+  public long size() {
+    return this.s3Attributes.getLen();
+  }
+
+  /**
+   * Opens a section of the file for reading.
+   *
+   * @param offset Start offset (0 based) of the section to read.
+   * @param size Size of the section to read.
+   * @return an {@code InputStream} corresponding to the given section of this file.
+   *
+   * @throws IOException if there is an error opening this file section for reading.
+   * @throws IllegalArgumentException if offset is negative.
+   * @throws IllegalArgumentException if offset is greater than or equal to file size.
+   * @throws IllegalArgumentException if size is greater than the remaining bytes.
+   */
+  public InputStream openForRead(long offset, int size) throws IOException {
+    Validate.checkNotNegative(offset, "offset");
+    Validate.checkLessOrEqual(offset, "offset", size(), "size()");
+    Validate.checkLessOrEqual(size, "size", size() - offset, "size() - offset");
+
+    final GetObjectRequest request = client.newGetRequest(this.s3Attributes.getKey())
+        .withRange(offset, offset + size - 1);
+    this.changeTracker.maybeApplyConstraint(request);
+
+    String uri = this.getPath();
+    String operation = String.format(
+        "%s %s at %d", S3AInputStream.OPERATION_OPEN, uri, offset);
+    DurationTracker tracker = streamStatistics.initiateGetRequest();
+    S3Object object = null;
+
+    try {
+      object = Invoker.once(operation, uri, () -> client.getObject(request));
+    } catch(IOException e) {
+      tracker.failed();
+      throw e;
+    } finally {
+      tracker.close();
+    }
+
+    changeTracker.processResponse(object, operation, offset);
+    InputStream stream = object.getObjectContent();
+    synchronized (this.s3Objects) {
+      this.s3Objects.put(stream, object);
+    }
+
+    return stream;
+  }
+
+  /**
+   * Closes this stream and releases all acquired resources.
+   */
+  @Override
+  public synchronized void close() {
+    List<InputStream> streams = new ArrayList<InputStream>(this.s3Objects.keySet());
+    for (InputStream stream : streams) {
+      this.close(stream);
+    }
+  }
+
+  void close(InputStream inputStream) {
+    S3Object obj;
+    synchronized (this.s3Objects) {
+      obj = this.s3Objects.get(inputStream);
+      if (obj == null) {
+        throw new IllegalArgumentException("inputStream not found");
+      }
+      this.s3Objects.remove(inputStream);
+    }
+
+    Io.closeIgnoringIoException(inputStream);
+    Io.closeIgnoringIoException(obj);
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InMemoryInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InMemoryInputStream.java
new file mode 100644
index 00000000000..2be2eaa98f7
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InMemoryInputStream.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.common.BufferData;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+
+/**
+ * Provides an {@code InputStream} that allows reading from an S3 file.
+ * The entire file is read into memory before reads can begin.
+ *
+ * Use of this class is recommended only for small files that can fit
+ * entirely in memory.
+ */
+public class S3InMemoryInputStream extends S3InputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(S3InMemoryInputStream.class);
+
+  private ByteBuffer buffer;
+
+  /**
+   * Initializes a new instance of the {@code S3InMemoryInputStream} class.
+   *
+   * @param context read-specific operation context.
+   * @param s3Attributes attributes of the S3 object being read.
+   * @param client callbacks used for interacting with the underlying S3 client.
+   *
+   * @throws IllegalArgumentException if context is null.
+   * @throws IllegalArgumentException if s3Attributes is null.
+   * @throws IllegalArgumentException if client is null.
+   */
+  public S3InMemoryInputStream(
+      S3AReadOpContext context,
+      S3ObjectAttributes s3Attributes,
+      S3AInputStream.InputStreamCallbacks client) {
+    super(context, s3Attributes, client);
+    int fileSize = (int) s3Attributes.getLen();
+    this.buffer = ByteBuffer.allocate(fileSize);
+    LOG.debug("Created in-memory input stream for {} (size = {})", this.getName(), fileSize);
+  }
+
+  /**
+   * Ensures that a non-empty valid buffer is available for immediate reading.
+   * It returns true when at least one such buffer is available for reading.
+   * It returns false on reaching the end of the stream.
+   *
+   * @return true if at least one such buffer is available for reading, false otherwise.
+   */
+  @Override
+  protected boolean ensureCurrentBuffer() throws IOException {
+    if (this.isClosed()) {
+      return false;
+    }
+
+    if (this.getBlockData().getFileSize() == 0) {
+      return false;
+    }
+
+    if (!this.getFilePosition().isValid()) {
+      this.buffer.clear();
+      int numBytesRead = this.getReader().read(buffer, 0, this.buffer.capacity());
+      if (numBytesRead <= 0) {
+        return false;
+      }
+      BufferData data = new BufferData(0, buffer);
+      this.getFilePosition().setData(data, 0, this.getSeekTargetPos());
+    }
+
+    return this.getFilePosition().buffer().hasRemaining();
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java
new file mode 100644
index 00000000000..0fa6e33200b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.FilePosition;
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+/**
+ * Provides an {@link InputStream} that allows reading from an S3 file.
+ */
+public abstract class S3InputStream
+    extends InputStream
+    implements CanSetReadahead, StreamCapabilities, IOStatisticsSource {
+
+  private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class);
+
+  // The S3 file read by this instance.
+  private S3File s3File;
+
+  // Reading of S3 file takes place through this reader.
+  private S3Reader reader;
+
+  // Name of this stream. Used only for logging.
+  private final String name;
+
+  // Indicates whether the stream has been closed.
+  private volatile boolean closed;
+
+  // Current position within the file.
+  private FilePosition fpos;
+
+  // The target of the most recent seek operation.
+  private long seekTargetPos;
+
+  // Information about each block of the mapped S3 file.
+  private BlockData blockData;
+
+  // Read-specific operation context.
+  private S3AReadOpContext context;
+
+  // Attributes of the S3 object being read.
+  private S3ObjectAttributes s3Attributes;
+
+  // Callbacks used for interacting with the underlying S3 client.
+  private S3AInputStream.InputStreamCallbacks client;
+
+  // Used for reporting input stream access statistics.
+  private final S3AInputStreamStatistics streamStatistics;
+
+  private S3AInputPolicy inputPolicy;
+  private final ChangeTracker changeTracker;
+  private final IOStatistics ioStatistics;
+
+  /**
+   * Initializes a new instance of the {@code S3InputStream} class.
+   *
+   * @param context read-specific operation context.
+   * @param s3Attributes attributes of the S3 object being read.
+   * @param client callbacks used for interacting with the underlying S3 client.
+   *
+   * @throws IllegalArgumentException if context is null.
+   * @throws IllegalArgumentException if s3Attributes is null.
+   * @throws IllegalArgumentException if client is null.
+   */
+  public S3InputStream(
+      S3AReadOpContext context,
+      S3ObjectAttributes s3Attributes,
+      S3AInputStream.InputStreamCallbacks client) {
+
+    Validate.checkNotNull(context, "context");
+    Validate.checkNotNull(s3Attributes, "s3Attributes");
+    Validate.checkNotNull(client, "client");
+
+    this.context = context;
+    this.s3Attributes = s3Attributes;
+    this.client = client;
+    this.streamStatistics = context.getS3AStatisticsContext().newInputStreamStatistics();
+    this.ioStatistics = streamStatistics.getIOStatistics();
+    this.name = S3File.getPath(s3Attributes);
+    this.changeTracker = new ChangeTracker(
+        this.name,
+        context.getChangeDetectionPolicy(),
+        this.streamStatistics.getChangeTrackerStatistics(),
+        s3Attributes);
+
+    setInputPolicy(context.getInputPolicy());
+    setReadahead(context.getReadahead());
+
+    long fileSize = s3Attributes.getLen();
+    int bufferSize = context.getPrefetchBlockSize();
+
+    this.blockData = new BlockData(fileSize, bufferSize);
+    this.fpos = new FilePosition(fileSize, bufferSize);
+    this.s3File = this.getS3File();
+    this.reader = new S3Reader(this.s3File);
+
+    this.seekTargetPos = 0;
+  }
+
+  /**
+   * Gets the internal IO statistics.
+   *
+   * @return the internal IO statistics.
+   */
+  @Override
+  public IOStatistics getIOStatistics() {
+    return this.ioStatistics;
+  }
+
+  /**
+   * Access the input stream statistics.
+   * This is for internal testing and may be removed without warning.
+   * @return the statistics for this input stream
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public S3AInputStreamStatistics getS3AStreamStatistics() {
+    return this.streamStatistics;
+  }
+
+  /**
+   * Sets the number of bytes to read ahead each time.
+   *
+   * @param readahead the number of bytes to read ahead each time..
+   */
+  @Override
+  public synchronized void setReadahead(Long readahead) {
+    // We support read head by prefetching therefore we ignore the supplied value.
+    if (readahead != null) {
+      Validate.checkNotNegative(readahead, "readahead");
+    }
+  }
+
+  /**
+   * Indicates whether the given {@code capability} is supported by this stream.
+   *
+   * @param capability the capability to check.
+   * @return true if the given {@code capability} is supported by this stream, false otherwise.
+   */
+  @Override
+  public boolean hasCapability(String capability) {
+    return capability.equalsIgnoreCase(StreamCapabilities.IOSTATISTICS)
+        || capability.equalsIgnoreCase(StreamCapabilities.READAHEAD);
+  }
+
+  /**
+   * Set/update the input policy of the stream.
+   * This updates the stream statistics.
+   * @param inputPolicy new input policy.
+   */
+  private void setInputPolicy(S3AInputPolicy inputPolicy) {
+    this.inputPolicy = inputPolicy;
+    streamStatistics.inputPolicySet(inputPolicy.ordinal());
+  }
+
+  /**
+   * Returns the number of bytes that can read from this stream without blocking.
+   */
+  @Override
+  public int available() throws IOException {
+    this.throwIfClosed();
+
+    if (!ensureCurrentBuffer()) {
+      return 0;
+    }
+
+    return this.fpos.buffer().remaining();
+  }
+
+  /**
+   * Gets the current position.
+   *
+   * @return the current position.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  public long getPos() throws IOException {
+    this.throwIfClosed();
+
+    if (this.fpos.isValid()) {
+      return this.fpos.absolute();
+    } else {
+      return this.seekTargetPos;
+    }
+  }
+
+  /**
+   * Moves the current read position so that the next read will occur at {@code pos}.
+   *
+   * @param pos the absolute position to seek to.
+   * @throws IOException if there an error during this operation.
+   *
+   * @throws IllegalArgumentException if pos is outside of the range [0, file size].
+   */
+  public void seek(long pos) throws IOException {
+    this.throwIfClosed();
+    this.throwIfInvalidSeek(pos);
+
+    if (!this.fpos.setAbsolute(pos)) {
+      this.fpos.invalidate();
+      this.seekTargetPos = pos;
+    }
+  }
+
+  /**
+   * Ensures that a non-empty valid buffer is available for immediate reading.
+   * It returns true when at least one such buffer is available for reading.
+   * It returns false on reaching the end of the stream.
+   *
+   * @return true if at least one such buffer is available for reading, false otherwise.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  protected abstract boolean ensureCurrentBuffer() throws IOException;
+
+  @Override
+  public int read() throws IOException {
+    this.throwIfClosed();
+
+    if (!ensureCurrentBuffer()) {
+      return -1;
+    }
+
+    this.incrementBytesRead(1);
+
+    return this.fpos.buffer().get() & 0xff;
+  }
+
+  /**
+   * Reads bytes from this stream and copies them into
+   * the given {@code buffer} starting at the beginning (offset 0).
+   * Returns the number of bytes actually copied in to the given buffer.
+   *
+   * @param buffer the buffer to copy data into.
+   * @return the number of bytes actually copied in to the given buffer.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public int read(byte[] buffer) throws IOException {
+    return this.read(buffer, 0, buffer.length);
+  }
+
+  /**
+   * Reads up to {@code len} bytes from this stream and copies them into
+   * the given {@code buffer} starting at the given {@code offset}.
+   * Returns the number of bytes actually copied in to the given buffer.
+   *
+   * @param buffer the buffer to copy data into.
+   * @param offset data is copied starting at this offset.
+   * @param len max number of bytes to copy.
+   * @return the number of bytes actually copied in to the given buffer.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public int read(byte[] buffer, int offset, int len) throws IOException {
+    this.throwIfClosed();
+
+    if (len == 0) {
+      return 0;
+    }
+
+    if (!ensureCurrentBuffer()) {
+      return -1;
+    }
+
+    int numBytesRead = 0;
+    int numBytesRemaining = len;
+
+    while (numBytesRemaining > 0) {
+      if (!ensureCurrentBuffer()) {
+        break;
+      }
+
+      ByteBuffer buf = this.fpos.buffer();
+      int bytesToRead = Math.min(numBytesRemaining, buf.remaining());
+      buf.get(buffer, offset, bytesToRead);
+      this.incrementBytesRead(bytesToRead);
+      offset += bytesToRead;
+      numBytesRemaining -= bytesToRead;
+      numBytesRead += bytesToRead;
+    }
+
+    return numBytesRead;
+  }
+
+  protected S3File getFile() {
+    return this.s3File;
+  }
+
+  protected S3Reader getReader() {
+    return this.reader;
+  }
+
+  protected S3ObjectAttributes getS3ObjectAttributes() {
+    return this.s3Attributes;
+  }
+
+  protected FilePosition getFilePosition() {
+    return this.fpos;
+  }
+
+  protected String getName() {
+    return this.name;
+  }
+
+  protected boolean isClosed() {
+    return this.closed;
+  }
+
+  protected long getSeekTargetPos() {
+    return this.seekTargetPos;
+  }
+
+  protected void setSeekTargetPos(long pos) {
+    this.seekTargetPos = pos;
+  }
+
+  protected BlockData getBlockData() {
+    return this.blockData;
+  }
+
+  protected S3AReadOpContext getContext() {
+    return this.context;
+  }
+
+  private void incrementBytesRead(int bytesRead) {
+    if (bytesRead > 0) {
+      this.streamStatistics.bytesRead(bytesRead);
+      if (this.getContext().getStats() != null) {
+        this.getContext().getStats().incrementBytesRead(bytesRead);
+      }
+      this.fpos.incrementBytesRead(bytesRead);
+    }
+  }
+
+  protected S3File getS3File() {
+    return new S3File(
+        this.context,
+        this.s3Attributes,
+        this.client,
+        this.streamStatistics,
+        this.changeTracker
+    );
+  }
+
+  protected String getOffsetStr(long offset) {
+    int blockNumber = -1;
+
+    if (this.blockData.isValidOffset(offset)) {
+      blockNumber = this.blockData.getBlockNumber(offset);
+    }
+
+    return String.format("%d:%d", blockNumber, offset);
+  }
+
+  /**
+   * Closes this stream and releases all acquired resources.
+   *
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public void close() throws IOException {
+    if (this.closed) {
+      return;
+    }
+    this.closed = true;
+
+    this.blockData = null;
+    this.reader.close();
+    this.reader = null;
+    this.s3File = null;
+    this.fpos.invalidate();
+    try {
+      this.client.close();
+    } finally {
+      this.streamStatistics.close();
+    }
+    this.client = null;
+  }
+
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+
+  protected void throwIfClosed() throws IOException {
+    if (this.closed) {
+      throw new IOException(this.name + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+  }
+
+  protected void throwIfInvalidSeek(long pos) throws EOFException {
+    long fileSize = this.s3File.size();
+    if (pos < 0) {
+      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
+    } else {
+      if (fileSize == 0 && pos == 0) {
+        // Do nothing. Valid combination.
+        return;
+      }
+
+      if (pos >= fileSize) {
+        throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
+      }
+    }
+  }
+
+  // Unsupported functions.
+
+  @Override
+  public void mark(int readlimit) {
+    throw new UnsupportedOperationException("mark not supported");
+  }
+
+  @Override
+  public void reset() {
+    throw new UnsupportedOperationException("reset not supported");
+  }
+
+  @Override
+  public long skip(long n) {
+    throw new UnsupportedOperationException("skip not supported");
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java
new file mode 100644
index 00000000000..c874a8c37b8
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+/**
+ * Enhanced {@code InputStream} for reading from S3.
+ *
+ * This implementation provides improved read throughput by asynchronously prefetching
+ * blocks of configurable size from the underlying S3 file.
+ */
+public class S3PrefetchingInputStream
+    extends FSInputStream
+    implements CanSetReadahead, StreamCapabilities, IOStatisticsSource {
+  private static final Logger LOG = LoggerFactory.getLogger(S3PrefetchingInputStream.class);
+
+  // Underlying input stream used for reading S3 file.
+  private S3InputStream inputStream;
+
+  /**
+   * Initializes a new instance of the {@code S3PrefetchingInputStream} class.
+   *
+   * @param context read-specific operation context.
+   * @param s3Attributes attributes of the S3 object being read.
+   * @param client callbacks used for interacting with the underlying S3 client.
+   *
+   * @throws IllegalArgumentException if context is null.
+   * @throws IllegalArgumentException if s3Attributes is null.
+   * @throws IllegalArgumentException if client is null.
+   */
+  public S3PrefetchingInputStream(
+      S3AReadOpContext context,
+      S3ObjectAttributes s3Attributes,
+      S3AInputStream.InputStreamCallbacks client) {
+
+    Validate.checkNotNull(context, "context");
+    Validate.checkNotNull(s3Attributes, "s3Attributes");
+    Validate.checkNotNullAndNotEmpty(s3Attributes.getBucket(), "s3Attributes.getBucket()");
+    Validate.checkNotNullAndNotEmpty(s3Attributes.getKey(), "s3Attributes.getKey()");
+    Validate.checkNotNegative(s3Attributes.getLen(), "s3Attributes.getLen()");
+    Validate.checkNotNull(client, "client");
+
+    long fileSize = s3Attributes.getLen();
+    if (fileSize <= context.getPrefetchBlockSize()) {
+      this.inputStream = new S3InMemoryInputStream(context, s3Attributes, client);
+    } else {
+      this.inputStream = new S3CachingInputStream(context, s3Attributes, client);
+    }
+  }
+
+  /**
+   * Returns the number of bytes available for reading without blocking.
+   *
+   * @return the number of bytes available for reading without blocking.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public synchronized int available() throws IOException {
+    this.throwIfClosed();
+    return this.inputStream.available();
+  }
+
+  /**
+   * Gets the current position.
+   *
+   * @return the current position.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public synchronized long getPos() throws IOException {
+    this.throwIfClosed();
+    return this.inputStream.getPos();
+  }
+
+  /**
+   * Reads and returns one byte from this stream.
+   *
+   * @return the next byte from this stream.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public synchronized int read() throws IOException {
+    this.throwIfClosed();
+    return this.inputStream.read();
+  }
+
+  /**
+   * Reads up to {@code len} bytes from this stream and copies them into
+   * the given {@code buffer} starting at the given {@code offset}.
+   * Returns the number of bytes actually copied in to the given buffer.
+   *
+   * @param buffer the buffer to copy data into.
+   * @param offset data is copied starting at this offset.
+   * @param len max number of bytes to copy.
+   * @return the number of bytes actually copied in to the given buffer.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public synchronized int read(byte[] buffer, int offset, int len) throws IOException {
+    this.throwIfClosed();
+    return this.inputStream.read(buffer, offset, len);
+  }
+
+  /**
+   * Closes this stream and releases all acquired resources.
+   *
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    if (this.inputStream != null) {
+      this.inputStream.close();
+      this.inputStream = null;
+      super.close();
+    }
+  }
+
+  /**
+   * Updates internal data such that the next read will take place at the given {@code pos}.
+   *
+   * @param pos new read position.
+   * @throws IOException if there is an IO error during this operation.
+   */
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    this.throwIfClosed();
+    this.inputStream.seek(pos);
+  }
+
+  /**
+   * Sets the number of bytes to read ahead each time.
+   *
+   * @param readahead the number of bytes to read ahead each time..
+   */
+  @Override
+  public synchronized void setReadahead(Long readahead) {
+    if (!this.isClosed()) {
+      this.inputStream.setReadahead(readahead);
+    }
+  }
+
+  /**
+   * Indicates whether the given {@code capability} is supported by this stream.
+   *
+   * @param capability the capability to check.
+   * @return true if the given {@code capability} is supported by this stream, false otherwise.
+   */
+  @Override
+  public boolean hasCapability(String capability) {
+    if (!this.isClosed()) {
+      return this.inputStream.hasCapability(capability);
+    }
+
+    return false;
+  }
+
+  /**
+   * Access the input stream statistics.
+   * This is for internal testing and may be removed without warning.
+   * @return the statistics for this input stream
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public S3AInputStreamStatistics getS3AStreamStatistics() {
+    if (this.isClosed()) {
+      return null;
+    }
+    return this.inputStream.getS3AStreamStatistics();
+  }
+
+  /**
+   * Gets the internal IO statistics.
+   *
+   * @return the internal IO statistics.
+   */
+  @Override
+  public IOStatistics getIOStatistics() {
+    if (this.isClosed()) {
+      return null;
+    }
+    return this.inputStream.getIOStatistics();
+  }
+
+  protected boolean isClosed() {
+    return this.inputStream == null;
+  }
+
+  protected void throwIfClosed() throws IOException {
+    if (this.isClosed()) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+  }
+
+  // Unsupported functions.
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    this.throwIfClosed();
+    return false;
+  }
+
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java
new file mode 100644
index 00000000000..d9ed7810da6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.Invoker;
+
+/**
+ * Provides functionality to read S3 file one block at a time.
+ */
+public class S3Reader implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(S3Reader.class);
+
+  // We read from the underlying input stream in blocks of this size.
+  private static final int READ_BUFFER_SIZE = 64 * 1024;
+
+  // The S3 file to read.
+  private final S3File s3File;
+
+  // Set to true by close().
+  private volatile boolean closed;
+
+  /**
+   * Constructs an instance of {@link S3Reader}.
+   *
+   * @param s3File The S3 file to read.
+   *
+   * @throws IllegalArgumentException if s3File is null.
+   */
+  public S3Reader(S3File s3File) {
+    Validate.checkNotNull(s3File, "s3File");
+
+    this.s3File = s3File;
+  }
+
+  /**
+   * Stars reading at {@code offset} and reads upto {@code size} bytes into {@code buffer}.
+   *
+   * @param buffer the buffer into which data is returned
+   * @param offset the absolute offset into the underlying file where reading starts.
+   * @param size the number of bytes to be read.
+   *
+   * @return number of bytes actually read.
+   * @throws IOException if there is an error reading from the file.
+   *
+   * @throws IllegalArgumentException if buffer is null.
+   * @throws IllegalArgumentException if offset is outside of the range [0, file size].
+   * @throws IllegalArgumentException if size is zero or negative.
+   */
+  public int read(ByteBuffer buffer, long offset, int size) throws IOException {
+    Validate.checkNotNull(buffer, "buffer");
+    Validate.checkWithinRange(offset, "offset", 0, this.s3File.size());
+    Validate.checkPositiveInteger(size, "size");
+
+    if (this.closed) {
+      return -1;
+    }
+
+    int reqSize = (int) Math.min(size, this.s3File.size() - offset);
+    return readOneBlockWithRetries(buffer, offset, reqSize);
+  }
+
+  @Override
+  public void close() {
+    this.closed = true;
+  }
+
+  private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)
+      throws IOException {
+
+    this.s3File.getStatistics().readOperationStarted(offset, size);
+    Invoker invoker = this.s3File.getReadInvoker();
+
+    invoker.retry(
+        "read", this.s3File.getPath(), true,
+        () -> {
+          try {
+            this.readOneBlock(buffer, offset, size);
+          } catch (EOFException e) {
+            // the base implementation swallows EOFs.
+            return -1;
+          } catch (SocketTimeoutException e) {
+            this.s3File.getStatistics().readException();
+            throw e;
+          } catch (IOException e) {
+            this.s3File.getStatistics().readException();
+            throw e;
+          }
+          return 0;
+        });
+
+    int numBytesRead = buffer.position();
+    buffer.limit(numBytesRead);
+    this.s3File.getStatistics().readOperationCompleted(size, numBytesRead);
+    return numBytesRead;
+  }
+
+  private void readOneBlock(ByteBuffer buffer, long offset, int size) throws IOException {
+    int readSize = Math.min(size, buffer.remaining());
+    if (readSize == 0) {
+      return;
+    }
+
+    InputStream inputStream = s3File.openForRead(offset, readSize);
+    int numRemainingBytes = readSize;
+    byte[] bytes = new byte[READ_BUFFER_SIZE];
+
+    int numBytesToRead;
+    int numBytes;
+
+    try {
+      do {
+        numBytesToRead = Math.min(READ_BUFFER_SIZE, numRemainingBytes);
+        numBytes = inputStream.read(bytes, 0, numBytesToRead);
+        if (numBytes < 0) {
+          String message = String.format(
+              "Unexpected end of stream: buffer[%d], readSize = %d, numRemainingBytes = %d",
+              buffer.capacity(), readSize, numRemainingBytes);
+          throw new EOFException(message);
+        }
+
+        if (numBytes > 0) {
+          buffer.put(bytes, 0, numBytes);
+          numRemainingBytes -= numBytes;
+        }
+      }
+      while (!this.closed && (numRemainingBytes > 0));
+    } finally {
+      s3File.close(inputStream);
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/package-info.java
new file mode 100644
index 00000000000..b255537b40a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * High performance s3 input stream which reads in
+ * blocks and can cache blocks in the local filesystem.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.s3a.read;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/ExceptionAsserts.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/ExceptionAsserts.java
new file mode 100644
index 00000000000..96b7acdcdb8
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/ExceptionAsserts.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public final class ExceptionAsserts {
+  private ExceptionAsserts() {}
+
+
+  /**
+   * Asserts that the given code throws an exception of the given type
+   * and that the exception message contains the given sub-message.
+   *
+   * Usage:
+   *
+   * ExceptionAsserts.assertThrows(
+   *   IllegalArgumentException.class,
+   *   "'nullArg' must not be null",
+   *   () -> Preconditions.checkNotNull(null, "nullArg"));
+   *
+   * Note: JUnit 5 has similar functionality but it will be a long time before
+   * we move to that framework because of significant differences and lack of
+   * backward compatibility for some JUnit rules.
+   */
+  public static <E extends Exception> void assertThrows(
+      Class<E> expectedExceptionClass,
+      String partialMessage,
+      LambdaTestUtils.VoidCallable code) throws Exception {
+
+
+    intercept(expectedExceptionClass, partialMessage, code);
+
+  }
+
+  public static <E extends Exception> void assertThrows(
+      Class<E> expectedExceptionClass,
+      LambdaTestUtils.VoidCallable code) throws Exception {
+    assertThrows(expectedExceptionClass, null, code);
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/SampleDataForTests.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/SampleDataForTests.java
new file mode 100644
index 00000000000..97c2e0b7659
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/SampleDataForTests.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Frequently used test data items.
+ */
+public final class SampleDataForTests {
+  private SampleDataForTests() {}
+
+
+  // Array data.
+  public static final Object[] NULL_ARRAY      = null;
+  public static final Object[] EMPTY_ARRAY     = new Object[0];
+  public static final Object[] NON_EMPTY_ARRAY = new Object[1];
+
+  public static final byte[] NULL_BYTE_ARRAY      = null;
+  public static final byte[] EMPTY_BYTE_ARRAY     = new byte[0];
+  public static final byte[] NON_EMPTY_BYTE_ARRAY = new byte[1];
+
+  public static final short[] NULL_SHORT_ARRAY      = null;
+  public static final short[] EMPTY_SHORT_ARRAY     = new short[0];
+  public static final short[] NON_EMPTY_SHORT_ARRAY = new short[1];
+
+  public static final int[] NULL_INT_ARRAY         = null;
+  public static final int[] EMPTY_INT_ARRAY        = new int[0];
+  public static final int[] NON_EMPTY_INT_ARRAY    = new int[1];
+
+  public static final long[] NULL_LONG_ARRAY       = null;
+  public static final long[] EMPTY_LONG_ARRAY      = new long[0];
+  public static final long[] NON_EMPTY_LONG_ARRAY  = new long[1];
+
+  public static final List<Object> NULL_LIST  = null;
+  public static final List<Object> EMPTY_LIST = new ArrayList<Object>();
+  public static final List<Object> VALID_LIST = Arrays.asList(new Object[1]);
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java
new file mode 100644
index 00000000000..b1344c6972c
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockCache extends AbstractHadoopTestBase {
+
+  private static final int BUFFER_SIZE = 16;
+
+  @Test
+  public void testArgChecks() throws Exception {
+    // Should not throw.
+    BlockCache cache = new SingleFilePerBlockCache();
+
+    ByteBuffer buffer = ByteBuffer.allocate(16);
+
+    // Verify it throws correctly.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'buffer' must not be null",
+        () -> cache.put(42, null));
+  }
+
+
+  @Test
+  public void testPutAndGet() throws Exception {
+    BlockCache cache = new SingleFilePerBlockCache();
+
+    ByteBuffer buffer1 = ByteBuffer.allocate(BUFFER_SIZE);
+    for (byte i = 0; i < BUFFER_SIZE; i++) {
+      buffer1.put(i);
+    }
+
+    assertEquals(0, cache.size());
+    assertFalse(cache.containsBlock(0));
+    cache.put(0, buffer1);
+    assertEquals(1, cache.size());
+    assertTrue(cache.containsBlock(0));
+    ByteBuffer buffer2 = ByteBuffer.allocate(BUFFER_SIZE);
+    cache.get(0, buffer2);
+    assertNotSame(buffer1, buffer2);
+    assertBuffersEqual(buffer1, buffer2);
+
+    assertEquals(1, cache.size());
+    assertFalse(cache.containsBlock(1));
+    cache.put(1, buffer1);
+    assertEquals(2, cache.size());
+    assertTrue(cache.containsBlock(1));
+    ByteBuffer buffer3 = ByteBuffer.allocate(BUFFER_SIZE);
+    cache.get(1, buffer3);
+    assertNotSame(buffer1, buffer3);
+    assertBuffersEqual(buffer1, buffer3);
+  }
+
+  private void assertBuffersEqual(ByteBuffer buffer1, ByteBuffer buffer2) {
+    assertNotNull(buffer1);
+    assertNotNull(buffer2);
+    assertEquals(buffer1.limit(), buffer2.limit());
+    assertEquals(BUFFER_SIZE, buffer1.limit());
+    for (int i = 0; i < BUFFER_SIZE; i++) {
+      assertEquals(buffer1.get(i), buffer2.get(i));
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockData.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockData.java
new file mode 100644
index 00000000000..dd8c9fb3c7e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockData.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockData extends AbstractHadoopTestBase {
+
+  @Test
+  public void testArgChecks() throws Exception {
+    // Should not throw.
+    new BlockData(10, 5);
+    new BlockData(5, 10);
+    new BlockData(0, 10);
+
+    // Verify it throws correctly.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'fileSize' must not be negative",
+        () -> new BlockData(-1, 2));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockSize' must be a positive integer",
+        () -> new BlockData(10, 0));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockSize' must be a positive integer",
+        () -> new BlockData(10, -2));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' (-1) must be within the range [0, 3]",
+        () -> new BlockData(10, 3).isLastBlock(-1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' (11) must be within the range [0, 3]",
+        () -> new BlockData(10, 3).isLastBlock(11));
+  }
+
+  @Test
+  public void testComputedFields() throws Exception {
+    testComputedFieldsHelper(0, 10);
+    testComputedFieldsHelper(1, 10);
+    testComputedFieldsHelper(10, 1);
+    testComputedFieldsHelper(10, 2);
+    testComputedFieldsHelper(10, 3);
+  }
+
+  private void testComputedFieldsHelper(long fileSize, int blockSize) throws Exception {
+    BlockData bd = new BlockData(fileSize, blockSize);
+
+    if (fileSize == 0) {
+      assertFalse(bd.isLastBlock(0));
+      assertFalse(bd.isLastBlock(1));
+      assertFalse(bd.isValidOffset(0));
+      assertEquals(0, bd.getSize(0));
+      assertEquals("", bd.getStateString());
+
+      ExceptionAsserts.assertThrows(
+          IllegalArgumentException.class,
+          "'offset' (0) must be within the range [0, -1]",
+          () -> bd.getBlockNumber(0));
+
+      ExceptionAsserts.assertThrows(
+          IllegalArgumentException.class,
+          "'blockNumber' (0) must be within the range [0, -1]",
+          () -> bd.getStartOffset(0));
+
+      ExceptionAsserts.assertThrows(
+          IllegalArgumentException.class,
+          "'offset' (0) must be within the range [0, -1]",
+          () -> bd.getRelativeOffset(0, 0));
+
+      ExceptionAsserts.assertThrows(
+          IllegalArgumentException.class,
+          "'blockNumber' (0) must be within the range [0, -1]",
+          () -> bd.getState(0));
+
+      ExceptionAsserts.assertThrows(
+          IllegalArgumentException.class,
+          "'blockNumber' (0) must be within the range [0, -1]",
+          () -> bd.setState(0, BlockData.State.READY));
+
+      return;
+    }
+
+    assertEquals(fileSize, bd.getFileSize());
+    assertEquals(blockSize, bd.getBlockSize());
+
+    int expectedNumBlocks = (int) (fileSize / blockSize);
+    if (fileSize % blockSize > 0) {
+      expectedNumBlocks++;
+    }
+    assertEquals(expectedNumBlocks, bd.getNumBlocks());
+
+    int lastBlockNumber = expectedNumBlocks - 1;
+    for (int b = 0; b < lastBlockNumber; b++) {
+      assertFalse(bd.isLastBlock(b));
+      assertEquals(blockSize, bd.getSize(b));
+    }
+    assertTrue(bd.isLastBlock(lastBlockNumber));
+    int lastBlockSize = (int) (fileSize - blockSize * (expectedNumBlocks - 1));
+    assertEquals(lastBlockSize, bd.getSize(lastBlockNumber));
+
+    // Offset related methods.
+    for (long offset = 0; offset < fileSize; offset++) {
+      int expectedBlockNumber = (int) (offset / blockSize);
+      assertEquals(expectedBlockNumber, bd.getBlockNumber(offset));
+
+      for (int b = 0; b < expectedNumBlocks - 1; b++) {
+        long expectedStartOffset = b * blockSize;
+        assertEquals(expectedStartOffset, bd.getStartOffset(b));
+
+        int expectedRelativeOffset = (int) (offset - expectedStartOffset);
+        assertEquals(expectedRelativeOffset, bd.getRelativeOffset(b, offset));
+      }
+    }
+
+
+    // State methods.
+    for (int b = 0; b < expectedNumBlocks; b++) {
+      assertEquals(b * blockSize, bd.getStartOffset(b));
+      assertEquals(BlockData.State.NOT_READY, bd.getState(b));
+      bd.setState(b, BlockData.State.QUEUED);
+      assertEquals(BlockData.State.QUEUED, bd.getState(b));
+      bd.setState(b, BlockData.State.READY);
+      assertEquals(BlockData.State.READY, bd.getState(b));
+      bd.setState(b, BlockData.State.CACHED);
+      assertEquals(BlockData.State.CACHED, bd.getState(b));
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockOperations.java
new file mode 100644
index 00000000000..da46db1e550
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockOperations.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.lang.reflect.Method;
+
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockOperations extends AbstractHadoopTestBase {
+
+  @Test
+  public void testArgChecks() throws Exception {
+    // Should not throw.
+    BlockOperations ops = new BlockOperations();
+
+    // Verify it throws correctly.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' must not be negative",
+        () -> ops.getPrefetched(-1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' must not be negative",
+        () -> ops.getCached(-1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' must not be negative",
+        () -> ops.getRead(-1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' must not be negative",
+        () -> ops.release(-1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' must not be negative",
+        () -> ops.requestPrefetch(-1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' must not be negative",
+        () -> ops.requestCaching(-1));
+  }
+
+  @Test
+  public void testGetSummary() throws Exception {
+    verifySummary("getPrefetched", "GP");
+    verifySummary("getCached", "GC");
+    verifySummary("getRead", "GR");
+    verifySummary("release", "RL");
+    verifySummary("requestPrefetch", "RP");
+    verifySummary("prefetch", "PF");
+    verifySummary("requestCaching", "RC");
+    verifySummary("addToCache", "C+");
+
+    verifySummaryNoArg("cancelPrefetches", "CP");
+    verifySummaryNoArg("close", "CX");
+  }
+
+  private void verifySummary(String methodName, String shortName) throws Exception {
+    int blockNumber = 42;
+    BlockOperations ops = new BlockOperations();
+    Method method = ops.getClass().getDeclaredMethod(methodName, int.class);
+    BlockOperations.Operation op = (BlockOperations.Operation) method.invoke(ops, blockNumber);
+    ops.end(op);
+    String summary = ops.getSummary(false);
+    String opSummary = String.format("%s(%d)", shortName, blockNumber);
+    String expectedSummary = String.format("%s;E%s;", opSummary, opSummary);
+    assertTrue(summary.startsWith(expectedSummary));
+  }
+
+  private void verifySummaryNoArg(String methodName, String shortName) throws Exception {
+    BlockOperations ops = new BlockOperations();
+    Method method = ops.getClass().getDeclaredMethod(methodName);
+    BlockOperations.Operation op = (BlockOperations.Operation) method.invoke(ops);
+    ops.end(op);
+    String summary = ops.getSummary(false);
+    String expectedSummary = String.format("%s;E%s;", shortName, shortName);
+    assertTrue(summary.startsWith(expectedSummary));
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBoundedResourcePool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBoundedResourcePool.java
new file mode 100644
index 00000000000..154e8464a49
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBoundedResourcePool.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class TestBoundedResourcePool extends AbstractHadoopTestBase {
+
+  static class BufferPool extends BoundedResourcePool<ByteBuffer> {
+    BufferPool(int size) {
+      super(size);
+    }
+
+    @Override
+    protected ByteBuffer createNew() {
+      return ByteBuffer.allocate(10);
+    }
+  }
+
+  @Test
+  public void testArgChecks() throws Exception {
+
+    // Should not throw.
+    BufferPool pool = new BufferPool(5);
+
+    // Verify it throws correctly.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'size' must be a positive integer",
+        () -> new BufferPool(-1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'size' must be a positive integer",
+        () -> new BufferPool(0));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'item' must not be null",
+        () -> pool.release(null));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "This item is not a part of this pool",
+        () -> pool.release(ByteBuffer.allocate(4)));
+  }
+
+  @Test
+  public void testAcquireReleaseSingle() {
+    final int numBuffers = 5;
+    BufferPool pool = new BufferPool(numBuffers);
+
+    assertEquals(0, pool.numCreated());
+    assertEquals(numBuffers, pool.numAvailable());
+
+    ByteBuffer buffer1 = pool.acquire();
+    assertNotNull(buffer1);
+    assertEquals(1, pool.numCreated());
+    assertEquals(numBuffers - 1, pool.numAvailable());
+
+    // Release and immediately reacquire => should not end up creating new buffer.
+    pool.release(buffer1);
+    assertEquals(1, pool.numCreated());
+
+    ByteBuffer buffer2 = pool.acquire();
+    assertNotNull(buffer2);
+    assertSame(buffer1, buffer2);
+    assertEquals(1, pool.numCreated());
+  }
+
+  @Test
+  public void testAcquireReleaseMultiple() {
+    final int numBuffers = 5;
+    BufferPool pool = new BufferPool(numBuffers);
+    Set<ByteBuffer> buffers =
+        Collections.newSetFromMap(new IdentityHashMap<ByteBuffer, Boolean>());
+
+    assertEquals(0, pool.numCreated());
+
+    // Acquire all one by one.
+    for (int i = 0; i < numBuffers; i++) {
+      assertEquals(numBuffers - i, pool.numAvailable());
+      ByteBuffer buffer = pool.acquire();
+      assertNotNull(buffer);
+      assertFalse(buffers.contains(buffer));
+      buffers.add(buffer);
+      assertEquals(i + 1, pool.numCreated());
+    }
+
+    assertEquals(numBuffers, pool.numCreated());
+    assertEquals(0, pool.numAvailable());
+
+    int releaseCount = 0;
+
+    // Release all one by one.
+    for (ByteBuffer buffer : buffers) {
+      assertEquals(releaseCount, pool.numAvailable());
+      releaseCount++;
+      pool.release(buffer);
+      assertEquals(releaseCount, pool.numAvailable());
+
+      // Releasing the same buffer again should not have any ill effect.
+      pool.release(buffer);
+      assertEquals(releaseCount, pool.numAvailable());
+      pool.release(buffer);
+      assertEquals(releaseCount, pool.numAvailable());
+    }
+
+    // Acquire all one by one again to ensure that they are the same ones we got earlier.
+    for (int i = 0; i < numBuffers; i++) {
+      ByteBuffer buffer = pool.acquire();
+      assertTrue(buffers.contains(buffer));
+    }
+
+    assertEquals(numBuffers, pool.numCreated());
+    assertEquals(0, pool.numAvailable());
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java
new file mode 100644
index 00000000000..119e90ffeba
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.nio.ByteBuffer;
+import java.nio.ReadOnlyBufferException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import com.twitter.util.Future;
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+public class TestBufferData extends AbstractHadoopTestBase {
+
+  @Test
+  public void testArgChecks() throws Exception {
+    // Should not throw.
+    ByteBuffer buffer = ByteBuffer.allocate(1);
+    BufferData data = new BufferData(1, buffer);
+
+    // Verify it throws correctly.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' must not be negative",
+        () -> new BufferData(-1, buffer));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'buffer' must not be null",
+        () -> new BufferData(1, null));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'actionFuture' must not be null",
+        () -> data.setPrefetch(null));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'actionFuture' must not be null",
+        () -> data.setCaching(null));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'states' must not be null",
+        () -> data.throwIfStateIncorrect((BufferData.State[]) null));
+
+    ExceptionAsserts.assertThrows(
+        IllegalStateException.class,
+        "Expected buffer state to be 'READY or CACHING' but found",
+        () -> data.throwIfStateIncorrect(BufferData.State.READY, BufferData.State.CACHING));
+  }
+
+  @Test
+  public void testValidStateUpdates() {
+    ByteBuffer buffer = ByteBuffer.allocate(1);
+    BufferData data = new BufferData(1, buffer);
+
+    assertEquals(BufferData.State.BLANK, data.getState());
+
+    Future<Void> actionFuture = Future.value(null);
+    data.setPrefetch(actionFuture);
+    assertEquals(BufferData.State.PREFETCHING, data.getState());
+    assertNotNull(data.getActionFuture());
+    assertSame(actionFuture, data.getActionFuture());
+
+    Future<Void> actionFuture2 = Future.value(null);
+    data.setCaching(actionFuture2);
+    assertEquals(BufferData.State.CACHING, data.getState());
+    assertNotNull(data.getActionFuture());
+    assertSame(actionFuture2, data.getActionFuture());
+    assertNotSame(actionFuture, actionFuture2);
+
+    List<BufferData.State> states = Arrays.asList(
+        BufferData.State.BLANK,
+        BufferData.State.PREFETCHING,
+        BufferData.State.CACHING,
+        BufferData.State.READY
+    );
+
+    BufferData data2 = new BufferData(1, buffer);
+    BufferData.State prevState = null;
+    for (BufferData.State state : states) {
+      if (prevState != null) {
+        assertEquals(prevState, data2.getState());
+        data2.updateState(state, prevState);
+        assertEquals(state, data2.getState());
+      }
+      prevState = state;
+    }
+  }
+
+  @Test
+  public void testInvalidStateUpdates() throws Exception {
+    Future<Void> actionFuture = Future.value(null);
+    testInvalidStateUpdatesHelper(
+        (d) -> d.setPrefetch(actionFuture),
+        BufferData.State.BLANK,
+        BufferData.State.READY);
+
+    testInvalidStateUpdatesHelper(
+        (d) -> d.setCaching(actionFuture),
+        BufferData.State.PREFETCHING,
+        BufferData.State.READY);
+  }
+
+  @Test
+  public void testSetReady() throws Exception {
+    byte[] bytes1 = new byte[5];
+    initBytes(bytes1);
+
+    ByteBuffer buffer = ByteBuffer.allocate(10);
+    buffer.put(bytes1);
+    buffer.limit(bytes1.length);
+    BufferData data = new BufferData(1, buffer);
+    assertNotEquals(BufferData.State.READY, data.getState());
+    assertEquals(0, data.getChecksum());
+
+    data.setReady(BufferData.State.BLANK);
+    assertEquals(BufferData.State.READY, data.getState());
+    assertNotEquals(0, data.getChecksum());
+
+    // Verify that buffer cannot be modified once in READY state.
+    ExceptionAsserts.assertThrows(
+        ReadOnlyBufferException.class,
+        null,
+        () -> data.getBuffer().put(bytes1));
+
+    // Verify that buffer cannot be set to READY state more than once.
+    ExceptionAsserts.assertThrows(
+        IllegalStateException.class,
+        "Checksum cannot be changed once set",
+        () -> data.setReady(BufferData.State.BLANK));
+
+    // Verify that we detect post READY buffer modification.
+    buffer.array()[2] = (byte) 42;
+    ExceptionAsserts.assertThrows(
+        IllegalStateException.class,
+        "checksum changed after setReady()",
+        () -> data.setDone());
+  }
+
+  @Test
+  public void testChecksum() {
+    byte[] bytes1 = new byte[5];
+    byte[] bytes2 = new byte[10];
+
+    initBytes(bytes1);
+    initBytes(bytes2);
+
+    ByteBuffer buffer1 = ByteBuffer.wrap(bytes1);
+    ByteBuffer buffer2 = ByteBuffer.wrap(bytes2);
+    buffer2.limit(bytes1.length);
+
+    long checksum1 = BufferData.getChecksum(buffer1);
+    long checksum2 = BufferData.getChecksum(buffer2);
+
+    assertEquals(checksum1, checksum2);
+  }
+
+  private void initBytes(byte[] bytes) {
+    for (int i = 0; i < bytes.length; i++) {
+      bytes[i] = (byte) i;
+    }
+  }
+
+  @FunctionalInterface
+  public interface StateChanger {
+    void run(BufferData data) throws Exception;
+  }
+
+  private void testInvalidStateUpdatesHelper(
+      StateChanger changeState,
+      BufferData.State... validFromState) throws Exception {
+
+    ByteBuffer buffer = ByteBuffer.allocate(1);
+    BufferData data = new BufferData(1, buffer);
+    data.updateState(validFromState[0], BufferData.State.BLANK);
+    List<BufferData.State> states = this.getStatesExcept(validFromState);
+    BufferData.State prevState = validFromState[0];
+    String expectedMessage =
+        String.format("Expected buffer state to be '%s", validFromState[0]);
+    for (BufferData.State s : states) {
+      data.updateState(s, prevState);
+
+      ExceptionAsserts.assertThrows(
+          IllegalStateException.class,
+          expectedMessage,
+          () -> changeState.run(data));
+
+      assertEquals(s, data.getState());
+      prevState = s;
+    }
+  }
+
+  static final List<BufferData.State> ALL_STATES = Arrays.asList(
+      BufferData.State.UNKNOWN,
+      BufferData.State.BLANK,
+      BufferData.State.PREFETCHING,
+      BufferData.State.CACHING,
+      BufferData.State.READY
+  );
+
+  private List<BufferData.State> getStatesExcept(BufferData.State... states) {
+
+    List<BufferData.State> result = new ArrayList<>();
+    for (BufferData.State s : ALL_STATES) {
+      boolean found = false;
+      for (BufferData.State ss : states) {
+        if (s == ss) {
+          found = true;
+        }
+      }
+
+      if (!found) {
+        result.add(s);
+      }
+    }
+
+    return result;
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java
new file mode 100644
index 00000000000..43be295cb38
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+public class TestBufferPool extends AbstractHadoopTestBase {
+
+  private static final int POOL_SIZE = 2;
+  private static final int BUFFER_SIZE = 10;
+
+
+  @Test
+  public void testArgChecks() throws Exception {
+    // Should not throw.
+    BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE);
+
+    // Verify it throws correctly.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'size' must be a positive integer",
+        () -> new BufferPool(0, 10));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'size' must be a positive integer",
+        () -> new BufferPool(-1, 10));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'bufferSize' must be a positive integer",
+        () -> new BufferPool(10, 0));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'bufferSize' must be a positive integer",
+        () -> new BufferPool(1, -10));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' must not be negative",
+        () -> pool.acquire(-1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' must not be negative",
+        () -> pool.tryAcquire(-1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'data' must not be null",
+        () -> pool.release((BufferData) null));
+  }
+
+  @Test
+  public void testGetAndRelease() {
+    BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE);
+    assertInitialState(pool, POOL_SIZE);
+
+    int count = 0;
+    for (BufferData data : pool.getAll()) {
+      count++;
+    }
+    assertEquals(0, count);
+
+    BufferData data1 = this.acquire(pool, 1);
+    BufferData data2 = this.acquire(pool, 2);
+    BufferData data3 = pool.tryAcquire(3);
+    assertNull(data3);
+
+    count = 0;
+    for (BufferData data : pool.getAll()) {
+      count++;
+    }
+    assertEquals(2, count);
+
+    assertEquals(2, pool.numCreated());
+    assertEquals(0, pool.numAvailable());
+
+    data1.updateState(BufferData.State.READY, BufferData.State.BLANK);
+    pool.release(data1);
+
+    assertEquals(2, pool.numCreated());
+    assertEquals(1, pool.numAvailable());
+
+    data2.updateState(BufferData.State.READY, BufferData.State.BLANK);
+    pool.release(data2);
+
+    assertEquals(2, pool.numCreated());
+    assertEquals(2, pool.numAvailable());
+  }
+
+  @Test
+  public void testRelease() throws Exception {
+    testReleaseHelper(BufferData.State.BLANK, true);
+    testReleaseHelper(BufferData.State.PREFETCHING, true);
+    testReleaseHelper(BufferData.State.CACHING, true);
+    testReleaseHelper(BufferData.State.READY, false);
+  }
+
+  private void testReleaseHelper(BufferData.State stateBeforeRelease, boolean expectThrow)
+      throws Exception {
+
+    BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE);
+    assertInitialState(pool, POOL_SIZE);
+
+    BufferData data = this.acquire(pool, 1);
+    data.updateState(stateBeforeRelease, BufferData.State.BLANK);
+
+    if (expectThrow) {
+      ExceptionAsserts.assertThrows(
+          IllegalArgumentException.class,
+          "Unable to release buffer",
+          () -> pool.release(data));
+    } else {
+      pool.release(data);
+    }
+  }
+
+  private BufferData acquire(BufferPool pool, int blockNumber) {
+    BufferData data = pool.acquire(blockNumber);
+    assertNotNull(data);
+    assertSame(data, pool.acquire(blockNumber));
+    assertEquals(blockNumber, data.getBlockNumber());
+    return data;
+  }
+
+  private void assertInitialState(BufferPool pool, int poolSize) {
+    assertEquals(poolSize, pool.numAvailable());
+    assertEquals(0, pool.numCreated());
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestFilePosition.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestFilePosition.java
new file mode 100644
index 00000000000..a1b4ae610a1
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestFilePosition.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestFilePosition extends AbstractHadoopTestBase {
+
+  @Test
+  public void testArgChecks() throws Exception {
+    ByteBuffer buffer = ByteBuffer.allocate(10);
+    BufferData data = new BufferData(0, buffer);
+
+    // Should not throw.
+    new FilePosition(0, 0);
+    new FilePosition(0, 5);
+    new FilePosition(10, 5);
+    new FilePosition(5, 10);
+    new FilePosition(10, 5).setData(data, 3, 4);
+
+    // Verify it throws correctly.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'fileSize' must not be negative",
+        () -> new FilePosition(-1, 2));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockSize' must be a positive integer",
+        () -> new FilePosition(1, 0));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockSize' must be a positive integer",
+        () -> new FilePosition(1, -1));
+
+    FilePosition pos = new FilePosition(10, 3);
+
+    // Verify that we cannot obtain buffer properties without setting buffer.
+    ExceptionAsserts.assertThrows(
+        IllegalStateException.class,
+        "'buffer' must not be null",
+        () -> pos.buffer());
+
+    ExceptionAsserts.assertThrows(
+        IllegalStateException.class,
+        "'buffer' must not be null",
+        () -> pos.absolute());
+
+    ExceptionAsserts.assertThrows(
+        IllegalStateException.class,
+        "'buffer' must not be null",
+        () -> pos.isWithinCurrentBuffer(2));
+
+    ExceptionAsserts.assertThrows(
+        IllegalStateException.class,
+        "'buffer' must not be null",
+        () -> pos.blockNumber());
+
+    ExceptionAsserts.assertThrows(
+        IllegalStateException.class,
+        "'buffer' must not be null",
+        () -> pos.isLastBlock());
+
+    ExceptionAsserts.assertThrows(
+        IllegalStateException.class,
+        "'buffer' must not be null",
+        () -> pos.bufferFullyRead());
+
+    // Verify that we cannot set invalid buffer parameters.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'bufferData' must not be null",
+        () -> pos.setData(null, 4, 4));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'startOffset' must not be negative",
+        () -> pos.setData(data, -4, 4));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'readOffset' must not be negative",
+        () -> pos.setData(data, 4, -4));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'readOffset' must not be negative",
+        () -> pos.setData(data, 4, -4));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'readOffset' (15) must be within the range [4, 13]",
+        () -> pos.setData(data, 4, 15));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'readOffset' (3) must be within the range [4, 13]",
+        () -> pos.setData(data, 4, 3));
+  }
+
+  @Test
+  public void testValidity() {
+    int bufferSize = 8;
+    long fileSize = 100;
+    long bufferStartOffset = 7;
+    long readStartOffset = 9;
+
+    ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+    BufferData data = new BufferData(0, buffer);
+    FilePosition pos = new FilePosition(fileSize, bufferSize);
+
+    assertFalse(pos.isValid());
+    pos.setData(data, bufferStartOffset, readStartOffset);
+    assertTrue(pos.isValid());
+
+    pos.invalidate();
+    assertFalse(pos.isValid());
+  }
+
+  @Test
+  public void testOffsets() {
+    int bufferSize = 8;
+    long fileSize = 100;
+    long bufferStartOffset = 7;
+    long readStartOffset = 9;
+
+    ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+    BufferData data = new BufferData(0, buffer);
+    FilePosition pos = new FilePosition(fileSize, bufferSize);
+    pos.setData(data, bufferStartOffset, readStartOffset);
+    assertTrue(pos.isValid());
+
+    assertEquals(readStartOffset, pos.absolute());
+    assertEquals(readStartOffset - bufferStartOffset, pos.relative());
+    assertTrue(pos.isWithinCurrentBuffer(8));
+    assertFalse(pos.isWithinCurrentBuffer(6));
+    assertFalse(pos.isWithinCurrentBuffer(1));
+
+    int expectedBlockNumber = (int) (bufferStartOffset / bufferSize);
+    assertEquals(expectedBlockNumber, pos.blockNumber());
+    assertFalse(pos.isLastBlock());
+
+    pos.setData(data, fileSize - 3, fileSize - 2);
+    assertTrue(pos.isLastBlock());
+  }
+
+  @Test
+  public void testBufferStats() {
+    int bufferSize = 8;
+    long fileSize = 100;
+    long bufferStartOffset = 7;
+    long readStartOffset = 9;
+
+    ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+    BufferData data = new BufferData(0, buffer);
+    FilePosition pos = new FilePosition(fileSize, bufferSize);
+    pos.setData(data, bufferStartOffset, readStartOffset);
+    assertTrue(pos.isValid());
+    assertEquals(bufferStartOffset, pos.bufferStartOffset());
+
+    assertEquals(0, pos.numBytesRead());
+    assertEquals(0, pos.numSingleByteReads());
+    assertEquals(0, pos.numBufferReads());
+
+    pos.incrementBytesRead(1);
+    pos.incrementBytesRead(1);
+    pos.incrementBytesRead(1);
+    pos.incrementBytesRead(5);
+    pos.incrementBytesRead(51);
+
+    assertEquals(59, pos.numBytesRead());
+    assertEquals(3, pos.numSingleByteReads());
+    assertEquals(2, pos.numBufferReads());
+
+    assertFalse(pos.bufferFullyRead());
+
+    pos.setData(data, bufferStartOffset, bufferStartOffset);
+    assertTrue(pos.isValid());
+
+    assertEquals(0, pos.numBytesRead());
+    assertEquals(0, pos.numSingleByteReads());
+    assertEquals(0, pos.numBufferReads());
+
+    for (int i = 0; i < bufferSize; i++) {
+      pos.buffer().get();
+      pos.incrementBytesRead(1);
+    }
+    assertTrue(pos.bufferFullyRead());
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestIoClass.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestIoClass.java
new file mode 100644
index 00000000000..d1238fc007c
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestIoClass.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestIoClass extends AbstractHadoopTestBase {
+
+  private static class StubResource implements Closeable {
+    private boolean isOpen = true;
+
+    @Override
+    public void close() throws IOException {
+      this.isOpen = false;
+      throw new IOException("foo");
+    }
+
+    public boolean isOpen() {
+      return this.isOpen;
+    }
+  }
+
+  @Test
+  public void verifyCloseIgnoringIoException() throws Exception {
+    ExceptionAsserts.assertThrows(
+        IOException.class,
+        "foo",
+        () -> {
+          (new StubResource()).close();
+        });
+
+    // Should not throw.
+    StubResource resource = new StubResource();
+    assertTrue(resource.isOpen());
+    Io.closeIgnoringIoException(resource);
+    assertTrue(!resource.isOpen());
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestRetryer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestRetryer.java
new file mode 100644
index 00000000000..7220a7ec8fc
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestRetryer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestRetryer extends AbstractHadoopTestBase {
+
+  @Test
+  public void testArgChecks() throws Exception {
+    // Should not throw.
+    new Retryer(10, 50, 500);
+
+    // Verify it throws correctly.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'perRetryDelay' must be a positive integer",
+        () -> new Retryer(-1, 50, 500));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'perRetryDelay' must be a positive integer",
+        () -> new Retryer(0, 50, 500));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'maxDelay' (5) must be greater than 'perRetryDelay' (10)",
+        () -> new Retryer(10, 5, 500));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'statusUpdateInterval' must be a positive integer",
+        () -> new Retryer(10, 50, -1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'statusUpdateInterval' must be a positive integer",
+        () -> new Retryer(10, 50, 0));
+  }
+
+  @Test
+  public void testRetry() {
+    int perRetryDelay = 1;
+    int statusUpdateInterval = 3;
+    int maxDelay = 10;
+
+    Retryer retryer = new Retryer(perRetryDelay, maxDelay, statusUpdateInterval);
+    for (int t = 1; t <= maxDelay; t++) {
+      assertTrue(retryer.continueRetry());
+      if (t % statusUpdateInterval == 0) {
+        assertTrue(retryer.updateStatus());
+      } else {
+        assertFalse(retryer.updateStatus());
+      }
+    }
+
+    assertFalse(retryer.continueRetry());
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestValidate.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestValidate.java
new file mode 100644
index 00000000000..bffbde417b5
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestValidate.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+public class TestValidate extends AbstractHadoopTestBase {
+  @Test
+  public void testCheckNotNull() throws Exception {
+    String nonNullArg = "nonNullArg";
+    String nullArg = null;
+
+    // Should not throw.
+    Validate.checkNotNull(nonNullArg, "nonNullArg");
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'nullArg' must not be null",
+        () -> Validate.checkNotNull(nullArg, "nullArg"));
+  }
+
+  @Test
+  public void testCheckPositiveInteger() throws Exception {
+    int positiveArg = 1;
+    int zero = 0;
+    int negativeArg = -1;
+
+    // Should not throw.
+    Validate.checkPositiveInteger(positiveArg, "positiveArg");
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'negativeArg' must be a positive integer",
+        () -> Validate.checkPositiveInteger(negativeArg, "negativeArg"));
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'zero' must be a positive integer",
+        () -> Validate.checkPositiveInteger(zero, "zero"));
+  }
+
+  @Test
+  public void testCheckNotNegative() throws Exception {
+    int positiveArg = 1;
+    int zero = 0;
+    int negativeArg = -1;
+
+    // Should not throw.
+    Validate.checkNotNegative(zero, "zeroArg");
+    Validate.checkNotNegative(positiveArg, "positiveArg");
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'negativeArg' must not be negative",
+        () -> Validate.checkNotNegative(negativeArg, "negativeArg"));
+  }
+
+  @Test
+  public void testCheckRequired() throws Exception {
+    // Should not throw.
+    Validate.checkRequired(true, "arg");
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'arg' is required",
+        () -> Validate.checkRequired(false, "arg"));
+  }
+
+  @Test
+  public void testCheckValid() throws Exception {
+    // Should not throw.
+    Validate.checkValid(true, "arg");
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'arg' is invalid",
+        () -> Validate.checkValid(false, "arg"));
+  }
+
+  @Test
+  public void testCheckValidWithValues() throws Exception {
+    String validValues = "foo, bar";
+
+    // Should not throw.
+    Validate.checkValid(true, "arg", validValues);
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'arg' is invalid. Valid values are: foo, bar",
+        () -> Validate.checkValid(false, "arg", validValues));
+  }
+
+  @Test
+  public void testCheckNotNullAndNotEmpty() throws Exception {
+    // Should not throw.
+    Validate.checkNotNullAndNotEmpty(SampleDataForTests.NON_EMPTY_ARRAY, "array");
+    Validate.checkNotNullAndNotEmpty(SampleDataForTests.NON_EMPTY_BYTE_ARRAY, "array");
+    Validate.checkNotNullAndNotEmpty(SampleDataForTests.NON_EMPTY_SHORT_ARRAY, "array");
+    Validate.checkNotNullAndNotEmpty(SampleDataForTests.NON_EMPTY_INT_ARRAY, "array");
+    Validate.checkNotNullAndNotEmpty(SampleDataForTests.NON_EMPTY_LONG_ARRAY, "array");
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'string' must not be empty",
+        () -> Validate.checkNotNullAndNotEmpty("", "string"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'array' must not be null",
+        () -> Validate.checkNotNullAndNotEmpty(SampleDataForTests.NULL_ARRAY, "array"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'array' must have at least one element",
+        () -> Validate.checkNotNullAndNotEmpty(SampleDataForTests.EMPTY_ARRAY, "array"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'array' must not be null",
+        () -> Validate.checkNotNullAndNotEmpty(SampleDataForTests.NULL_BYTE_ARRAY, "array"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'array' must have at least one element",
+        () -> Validate.checkNotNullAndNotEmpty(SampleDataForTests.EMPTY_BYTE_ARRAY, "array"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'array' must not be null",
+        () -> Validate.checkNotNullAndNotEmpty(SampleDataForTests.NULL_SHORT_ARRAY, "array"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'array' must have at least one element",
+        () -> Validate.checkNotNullAndNotEmpty(SampleDataForTests.EMPTY_SHORT_ARRAY, "array"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'array' must not be null",
+        () -> Validate.checkNotNullAndNotEmpty(SampleDataForTests.NULL_INT_ARRAY, "array"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'array' must have at least one element",
+        () -> Validate.checkNotNullAndNotEmpty(SampleDataForTests.EMPTY_INT_ARRAY, "array"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'array' must not be null",
+        () -> Validate.checkNotNullAndNotEmpty(SampleDataForTests.NULL_LONG_ARRAY, "array"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'array' must have at least one element",
+        () -> Validate.checkNotNullAndNotEmpty(SampleDataForTests.EMPTY_LONG_ARRAY, "array"));
+  }
+
+  @Test
+  public void testCheckListNotNullAndNotEmpty() throws Exception {
+    // Should not throw.
+    Validate.checkNotNullAndNotEmpty(SampleDataForTests.VALID_LIST, "list");
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'list' must not be null",
+        () -> Validate.checkNotNullAndNotEmpty(SampleDataForTests.NULL_LIST, "list"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'list' must have at least one element",
+        () -> Validate.checkNotNullAndNotEmpty(SampleDataForTests.EMPTY_LIST, "list"));
+  }
+
+  @Test
+  public void testCheckNotNullAndNumberOfElements() throws Exception {
+    // Should not throw.
+    Validate.checkNotNullAndNumberOfElements(Arrays.asList(1, 2, 3), 3, "arg");
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'arg' must not be null",
+        () -> Validate.checkNotNullAndNumberOfElements(null, 3, "arg")
+    );
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "Number of elements in 'arg' must be exactly 3, 2 given.",
+        () -> Validate.checkNotNullAndNumberOfElements(Arrays.asList(1, 2), 3, "arg")
+    );
+  }
+
+  @Test
+  public void testCheckValuesEqual() throws Exception {
+    // Should not throw.
+    Validate.checkValuesEqual(1, "arg1", 1, "arg2");
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'arg1' (1) must equal 'arg2' (2)",
+        () -> Validate.checkValuesEqual(1, "arg1", 2, "arg2"));
+  }
+
+  @Test
+  public void testCheckIntegerMultiple() throws Exception {
+    // Should not throw.
+    Validate.checkIntegerMultiple(10, "arg1", 5, "arg2");
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'arg1' (10) must be an integer multiple of 'arg2' (3)",
+        () -> Validate.checkIntegerMultiple(10, "arg1", 3, "arg2"));
+  }
+
+  @Test
+  public void testCheckGreater() throws Exception {
+    // Should not throw.
+    Validate.checkGreater(10, "arg1", 5, "arg2");
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'arg1' (5) must be greater than 'arg2' (10)",
+        () -> Validate.checkGreater(5, "arg1", 10, "arg2"));
+  }
+
+  @Test
+  public void testCheckGreaterOrEqual() throws Exception {
+    // Should not throw.
+    Validate.checkGreaterOrEqual(10, "arg1", 5, "arg2");
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'arg1' (5) must be greater than or equal to 'arg2' (10)",
+        () -> Validate.checkGreaterOrEqual(5, "arg1", 10, "arg2"));
+  }
+
+  @Test
+  public void testCheckWithinRange() throws Exception {
+    // Should not throw.
+    Validate.checkWithinRange(10, "arg", 5, 15);
+    Validate.checkWithinRange(10.0, "arg", 5.0, 15.0);
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'arg' (5) must be within the range [10, 20]",
+        () -> Validate.checkWithinRange(5, "arg", 10, 20));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'arg' (5.0) must be within the range [10.0, 20.0]",
+        () -> Validate.checkWithinRange(5.0, "arg", 10.0, 20.0));
+  }
+
+  @Test
+  public void testCheckPathExists() throws Exception {
+    Path tempFile = Files.createTempFile("foo", "bar");
+    Path tempDir  = tempFile.getParent();
+    Path notFound = Paths.get("<not-found>");
+
+    // Should not throw.
+    Validate.checkPathExists(tempFile, "tempFile");
+    Validate.checkPathExists(tempDir, "tempDir");
+
+    // Verify it throws.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'nullArg' must not be null",
+        () -> Validate.checkPathExists(null, "nullArg"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "Path notFound (<not-found>) does not exist",
+        () -> Validate.checkPathExists(notFound, "notFound"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "must point to a directory",
+        () -> Validate.checkPathExistsAsDir(tempFile, "tempFile"));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "must point to a file",
+        () -> Validate.checkPathExistsAsFile(tempDir, "tempDir"));
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
index 204f1aa0939..0e105c25c3a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
@@ -34,8 +34,8 @@ import static org.junit.Assert.assertEquals;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -74,6 +74,6 @@ public class TestS3AUnbuffer extends AbstractS3AMockTest {
     stream.unbuffer();
 
     // Verify that unbuffer closed the object stream
-    verify(objectStream, times(1)).close();
+    verify(objectStream, atLeast(1)).close();
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java
new file mode 100644
index 00000000000..5c2f7eb2241
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.twitter.util.FuturePool;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.common.BlockCache;
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.SingleFilePerBlockCache;
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.s3a.statistics.impl.CountingChangeTracker;
+import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+
+/**
+ * Provides 'fake' implementations of S3InputStream variants.
+ *
+ * These implementations avoid accessing the following real resources:
+ * -- S3 store
+ * -- local filesystem
+ *
+ * This arrangement allows thorough multi-threaded testing of those
+ * implementations without accessing external resources. It also helps
+ * avoid test flakiness introduced by external factors.
+ */
+public final class Fakes {
+
+  private Fakes() {}
+
+  public static final String E_TAG = "eTag";
+  public static final String OWNER = "owner";
+  public static final String VERSION_ID = "v1";
+  public static final long MODIFICATION_TIME = 0L;
+  public static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.None,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  public static S3AFileStatus createFileStatus(String key, long fileSize) {
+    org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(key);
+    long blockSize = fileSize;
+    return new S3AFileStatus(
+        fileSize, MODIFICATION_TIME, path, blockSize, OWNER, E_TAG, VERSION_ID);
+  }
+
+  public static S3ObjectAttributes createObjectAttributes(
+      String bucket,
+      String key,
+      long fileSize) {
+
+    org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(key);
+    String encryptionKey = "";
+
+    return new S3ObjectAttributes(
+        bucket,
+        path,
+        key,
+        S3AEncryptionMethods.NONE,
+        encryptionKey,
+        E_TAG,
+        VERSION_ID,
+        fileSize);
+  }
+
+  public static S3AReadOpContext createReadContext(
+      FuturePool futurePool,
+      String key,
+      int fileSize,
+      int prefetchBlockSize,
+      int prefetchBlockCount) {
+
+    S3AFileStatus fileStatus = createFileStatus(key, fileSize);
+    org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(key);
+    FileSystem.Statistics statistics = new FileSystem.Statistics("s3a");
+    S3AStatisticsContext statisticsContext = new EmptyS3AStatisticsContext();
+    RetryPolicy retryPolicy =
+        RetryPolicies.retryUpToMaximumCountWithFixedSleep(3, 10, TimeUnit.MILLISECONDS);
+
+    return new S3AReadOpContext(
+        path,
+        new Invoker(retryPolicy, Invoker.LOG_EVENT),
+        statistics,
+        statisticsContext,
+        fileStatus,
+        S3AInputPolicy.Random,  // seekPolicy
+        CHANGE_POLICY,
+        1L,                     // readAheadRange
+        NoopSpan.INSTANCE,      // auditSpan
+        futurePool,
+        prefetchBlockSize,
+        prefetchBlockCount);
+  }
+
+  public static URI createUri(String bucket, String key) {
+    return URI.create(String.format("s3a://%s/%s", bucket, key));
+  }
+
+  public static ChangeTracker createChangeTracker(
+      String bucket,
+      String key,
+      long fileSize) {
+
+    return new ChangeTracker(
+        createUri(bucket, key).toString(),
+        CHANGE_POLICY,
+        new CountingChangeTracker(),
+        createObjectAttributes(bucket, key, fileSize));
+  }
+
+  public static S3ObjectInputStream createS3ObjectInputStream(byte[] buffer) {
+    return new S3ObjectInputStream(new ByteArrayInputStream(buffer), null);
+  }
+
+  public static S3AInputStream.InputStreamCallbacks createInputStreamCallbacks(
+      String bucket,
+      String key) {
+
+    S3Object object = new S3Object() {
+        @Override
+        public S3ObjectInputStream getObjectContent() {
+          return createS3ObjectInputStream(new byte[8]);
+        }
+
+        @Override
+        public ObjectMetadata getObjectMetadata() {
+          ObjectMetadata metadata = new ObjectMetadata();
+          metadata.setHeader("ETag", E_TAG);
+          return metadata;
+        }
+      };
+
+    return new S3AInputStream.InputStreamCallbacks() {
+      @Override
+      public S3Object getObject(GetObjectRequest request) {
+        return object;
+      }
+
+      @Override
+      public GetObjectRequest newGetRequest(String key) {
+        return new GetObjectRequest(bucket, key);
+      }
+
+      @Override
+      public void close() {
+      }
+    };
+  }
+
+
+  public static S3InputStream createInputStream(
+      Class<? extends S3InputStream> clazz,
+      FuturePool futurePool,
+      String bucket,
+      String key,
+      int fileSize,
+      int prefetchBlockSize,
+      int prefetchBlockCount) {
+
+    org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(key);
+
+    S3AFileStatus fileStatus = createFileStatus(key, fileSize);
+    S3ObjectAttributes s3ObjectAttributes = createObjectAttributes(bucket, key, fileSize);
+    S3AReadOpContext s3AReadOpContext = createReadContext(
+        futurePool,
+        key,
+        fileSize,
+        prefetchBlockSize,
+        prefetchBlockCount);
+
+    S3AInputStream.InputStreamCallbacks callbacks = createInputStreamCallbacks(bucket, key);
+
+    if (clazz == TestS3InMemoryInputStream.class) {
+      return new TestS3InMemoryInputStream(s3AReadOpContext, s3ObjectAttributes, callbacks);
+    } else if (clazz == TestS3CachingInputStream.class) {
+      return new TestS3CachingInputStream(s3AReadOpContext, s3ObjectAttributes, callbacks);
+    }
+
+    throw new RuntimeException("Unsupported class: " + clazz);
+  }
+
+  public static TestS3InMemoryInputStream createS3InMemoryInputStream(
+      FuturePool futurePool,
+      String bucket,
+      String key,
+      int fileSize) {
+
+    return (TestS3InMemoryInputStream) createInputStream(
+        TestS3InMemoryInputStream.class, futurePool, bucket, key, fileSize, 1, 1);
+  }
+
+  public static TestS3CachingInputStream createS3CachingInputStream(
+      FuturePool futurePool,
+      String bucket,
+      String key,
+      int fileSize,
+      int prefetchBlockSize,
+      int prefetchBlockCount) {
+
+    return (TestS3CachingInputStream) createInputStream(
+        TestS3CachingInputStream.class,
+        futurePool,
+        bucket,
+        key,
+        fileSize,
+        prefetchBlockSize,
+        prefetchBlockCount);
+  }
+
+  public static class TestS3InMemoryInputStream extends S3InMemoryInputStream {
+    public TestS3InMemoryInputStream(
+        S3AReadOpContext context,
+        S3ObjectAttributes s3Attributes,
+        S3AInputStream.InputStreamCallbacks client) {
+      super(context, s3Attributes, client);
+    }
+
+    @Override
+    protected S3File getS3File() {
+      randomDelay(200);
+      return new MockS3File((int) this.getS3ObjectAttributes().getLen(), false);
+    }
+  }
+
+  public static class TestS3FilePerBlockCache extends SingleFilePerBlockCache {
+    private final Map<Path, byte[]> files;
+    private final int readDelay;
+    private final int writeDelay;
+
+    public TestS3FilePerBlockCache(int readDelay, int writeDelay) {
+      this.files = new ConcurrentHashMap<>();
+      this.readDelay = readDelay;
+      this.writeDelay = writeDelay;
+    }
+
+    @Override
+    protected int readFile(Path path, ByteBuffer buffer) {
+      byte[] source = this.files.get(path);
+      randomDelay(this.readDelay);
+      buffer.put(source);
+      return source.length;
+    }
+
+    @Override
+    protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
+      Validate.checkPositiveInteger(buffer.limit(), "buffer.limit()");
+      byte[] dest = new byte[buffer.limit()];
+      randomDelay(this.writeDelay);
+      buffer.rewind();
+      buffer.get(dest);
+      this.files.put(path, dest);
+    }
+
+    private long fileCount = 0;
+
+    @Override
+    protected Path getCacheFilePath() throws IOException {
+      fileCount++;
+      return Paths.get(Long.toString(fileCount));
+    }
+
+    @Override
+    public void close() throws IOException {
+      this.files.clear();
+    }
+  }
+
+  private static final Random RANDOM = new Random();
+
+  private static void randomDelay(int delay) {
+    try {
+      Thread.sleep(RANDOM.nextInt(delay));
+    } catch (InterruptedException e) {
+
+    }
+  }
+
+  public static class TestS3CachingBlockManager extends S3CachingBlockManager {
+    public TestS3CachingBlockManager(
+        FuturePool futurePool,
+        S3Reader reader,
+        BlockData blockData,
+        int bufferPoolSize) {
+      super(futurePool, reader, blockData, bufferPoolSize);
+    }
+
+    @Override
+    public int read(ByteBuffer buffer, long offset, int size) throws IOException {
+      randomDelay(100);
+      return this.getReader().read(buffer, offset, size);
+    }
+
+    @Override
+    protected BlockCache createCache() {
+      final int readDelayMs = 50;
+      final int writeDelayMs = 200;
+      return new TestS3FilePerBlockCache(readDelayMs, writeDelayMs);
+    }
+  }
+
+  public static class TestS3CachingInputStream extends S3CachingInputStream {
+    public TestS3CachingInputStream(
+        S3AReadOpContext context,
+        S3ObjectAttributes s3Attributes,
+        S3AInputStream.InputStreamCallbacks client) {
+      super(context, s3Attributes, client);
+    }
+
+    @Override
+    protected S3File getS3File() {
+      randomDelay(200);
+      return new MockS3File((int) this.getS3ObjectAttributes().getLen(), false);
+    }
+
+    @Override
+    protected S3CachingBlockManager createBlockManager(
+        FuturePool futurePool,
+        S3Reader reader,
+        BlockData blockData,
+        int bufferPoolSize) {
+      return new TestS3CachingBlockManager(futurePool, reader, blockData, bufferPoolSize);
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/MockS3File.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/MockS3File.java
new file mode 100644
index 00000000000..91523026e9a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/MockS3File.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
+
+/**
+ * A mock s3 file with some fault injection.
+ */
+class MockS3File extends S3File {
+  private byte[] contents;
+
+  // If true, throws IOException on open request just once.
+  // That allows test code to validate behavior related to retries.
+  private boolean throwExceptionOnOpen;
+
+  private static final String BUCKET = "bucket";
+  private static final String KEY = "key";
+
+  MockS3File(int size) {
+    this(size, false);
+  }
+
+  MockS3File(int size, boolean throwExceptionOnOpen) {
+    super(
+        Fakes.createReadContext(null, KEY, size, 1, 1),
+        Fakes.createObjectAttributes(BUCKET, KEY, size),
+        Fakes.createInputStreamCallbacks(BUCKET, KEY),
+        new EmptyS3AStatisticsContext().EMPTY_INPUT_STREAM_STATISTICS,
+        Fakes.createChangeTracker(BUCKET, KEY, size)
+    );
+
+    this.throwExceptionOnOpen = throwExceptionOnOpen;
+    this.contents = new byte[size];
+    for (int b = 0; b < size; b++) {
+      this.contents[b] = byteAtOffset(b);
+    }
+  }
+
+  @Override
+  public InputStream openForRead(long offset, int size) throws IOException {
+    Validate.checkLessOrEqual(offset, "offset", size(), "size()");
+    Validate.checkLessOrEqual(size, "size", size() - offset, "size() - offset");
+
+    if (this.throwExceptionOnOpen) {
+      this.throwExceptionOnOpen = false;
+      throw new IOException("Throwing because throwExceptionOnOpen is true ");
+    }
+    int bufSize = (int) Math.min(size, size() - offset);
+    return new ByteArrayInputStream(contents, (int) offset, bufSize);
+  }
+
+  @Override
+  public void close(InputStream inputStream) {
+    // do nothing since we do not use a real S3 stream.
+  }
+
+  public static byte byteAtOffset(int offset) {
+    return (byte) (offset % 128);
+  }
+
+  public static S3AInputStream.InputStreamCallbacks createClient(String bucketName) {
+    return new S3AInputStream.InputStreamCallbacks() {
+      @Override
+      public S3Object getObject(GetObjectRequest request) {
+        return null;
+      }
+
+      @Override
+      public GetObjectRequest newGetRequest(String key) {
+        return new GetObjectRequest(bucketName, key);
+      }
+
+      @Override
+      public void close() {
+      }
+    };
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3BlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3BlockManager.java
new file mode 100644
index 00000000000..eb3b700f280
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3BlockManager.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.BufferData;
+import org.apache.hadoop.fs.common.ExceptionAsserts;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestS3BlockManager extends AbstractHadoopTestBase {
+
+  static final int FILE_SIZE = 12;
+  static final int BLOCK_SIZE = 3;
+
+  @Test
+  public void testArgChecks() throws Exception {
+    BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE);
+    MockS3File s3File = new MockS3File(FILE_SIZE, false);
+    S3Reader reader = new S3Reader(s3File);
+
+    // Should not throw.
+    new S3BlockManager(reader, blockData);
+
+    // Verify it throws correctly.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'reader' must not be null",
+        () -> new S3BlockManager(null, blockData));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockData' must not be null",
+        () -> new S3BlockManager(reader, null));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' must not be negative",
+        () -> new S3BlockManager(reader, blockData).get(-1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'data' must not be null",
+        () -> new S3BlockManager(reader, blockData).release(null));
+  }
+
+  @Test
+  public void testGet() throws IOException {
+    BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE);
+    MockS3File s3File = new MockS3File(FILE_SIZE, false);
+    S3Reader reader = new S3Reader(s3File);
+    S3BlockManager blockManager = new S3BlockManager(reader, blockData);
+
+    for (int b = 0; b < blockData.getNumBlocks(); b++) {
+      BufferData data = blockManager.get(b);
+      ByteBuffer buffer = data.getBuffer();
+      long startOffset = blockData.getStartOffset(b);
+      for (int i = 0; i < BLOCK_SIZE; i++) {
+        assertEquals(startOffset + i, buffer.get());
+      }
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java
new file mode 100644
index 00000000000..99836793dec
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.twitter.util.ExecutorServiceFuturePool;
+import com.twitter.util.FuturePool;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.BufferData;
+import org.apache.hadoop.fs.common.ExceptionAsserts;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
+  static final int FILE_SIZE = 15;
+  static final int BLOCK_SIZE = 2;
+  static final int POOL_SIZE = 3;
+
+  private final ExecutorService threadPool = Executors.newFixedThreadPool(4);
+  private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
+
+  private final BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE);
+
+  @Test
+  public void testArgChecks() throws Exception {
+    MockS3File s3File = new MockS3File(FILE_SIZE, false);
+    S3Reader reader = new S3Reader(s3File);
+
+    // Should not throw.
+    S3CachingBlockManager blockManager =
+        new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE);
+
+    // Verify it throws correctly.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'futurePool' must not be null",
+        () -> new S3CachingBlockManager(null, reader, blockData, POOL_SIZE));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'reader' must not be null",
+        () -> new S3CachingBlockManager(futurePool, null, blockData, POOL_SIZE));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockData' must not be null",
+        () -> new S3CachingBlockManager(futurePool, reader, null, POOL_SIZE));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'bufferPoolSize' must be a positive integer",
+        () -> new S3CachingBlockManager(futurePool, reader, blockData, 0));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'bufferPoolSize' must be a positive integer",
+        () -> new S3CachingBlockManager(futurePool, reader, blockData, -1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' must not be negative",
+        () -> blockManager.get(-1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'data' must not be null",
+        () -> blockManager.release(null));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'blockNumber' must not be negative",
+        () -> blockManager.requestPrefetch(-1));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'data' must not be null",
+        () -> blockManager.requestCaching(null));
+  }
+
+  /**
+   * Extends S3CachingBlockManager so that we can inject asynchronous failures.
+   */
+  static class TestBlockManager extends S3CachingBlockManager {
+    TestBlockManager(
+        FuturePool futurePool,
+        S3Reader reader,
+        BlockData blockData,
+        int bufferPoolSize) {
+      super(futurePool, reader, blockData, bufferPoolSize);
+    }
+
+    // If true, forces the next read operation to fail.
+    // Resets itself to false after one failure.
+    private boolean forceNextReadToFail;
+
+    @Override
+    public int read(ByteBuffer buffer, long offset, int size) throws IOException {
+      if (forceNextReadToFail) {
+        forceNextReadToFail = false;
+        throw new RuntimeException("foo");
+      } else {
+        return super.read(buffer, offset, size);
+      }
+    }
+
+    // If true, forces the next cache-put operation to fail.
+    // Resets itself to false after one failure.
+    private boolean forceNextCachePutToFail;
+
+    @Override
+    protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
+      if (forceNextCachePutToFail) {
+        forceNextCachePutToFail = false;
+        throw new RuntimeException("bar");
+      } else {
+        super.cachePut(blockNumber, buffer);
+      }
+    }
+  }
+
+  // @Ignore
+  @Test
+  public void testGet() throws Exception {
+    testGetHelper(false);
+  }
+
+  // @Ignore
+  @Test
+  public void testGetFailure() throws Exception {
+    testGetHelper(true);
+  }
+
+  private void testGetHelper(boolean forceReadFailure) throws Exception {
+    MockS3File s3File = new MockS3File(FILE_SIZE, true);
+    S3Reader reader = new S3Reader(s3File);
+    TestBlockManager blockManager =
+        new TestBlockManager(futurePool, reader, blockData, POOL_SIZE);
+
+    for (int b = 0; b < blockData.getNumBlocks(); b++) {
+      // We simulate caching failure for all even numbered blocks.
+      boolean forceFailure = forceReadFailure && (b % 2 == 0);
+
+      BufferData data = null;
+
+      if (forceFailure) {
+        blockManager.forceNextReadToFail = true;
+
+        ExceptionAsserts.assertThrows(
+            RuntimeException.class,
+            "foo",
+            () -> blockManager.get(3));
+      } else {
+        data = blockManager.get(b);
+
+        long startOffset = blockData.getStartOffset(b);
+        for (int i = 0; i < blockData.getSize(b); i++) {
+          assertEquals(startOffset + i, data.getBuffer().get());
+        }
+
+        blockManager.release(data);
+      }
+
+      assertEquals(POOL_SIZE, blockManager.numAvailable());
+    }
+  }
+
+  // @Ignore
+  @Test
+  public void testPrefetch() throws IOException, InterruptedException {
+    testPrefetchHelper(false);
+  }
+
+  // @Ignore
+  @Test
+  public void testPrefetchFailure() throws IOException, InterruptedException {
+    testPrefetchHelper(true);
+  }
+
+  private void testPrefetchHelper(boolean forcePrefetchFailure)
+      throws IOException, InterruptedException {
+    MockS3File s3File = new MockS3File(FILE_SIZE, false);
+    S3Reader reader = new S3Reader(s3File);
+    TestBlockManager blockManager =
+        new TestBlockManager(futurePool, reader, blockData, POOL_SIZE);
+    assertInitialState(blockManager);
+
+    int expectedNumErrors = 0;
+    int expectedNumSuccesses = 0;
+
+    for (int b = 0; b < POOL_SIZE; b++) {
+      // We simulate caching failure for all odd numbered blocks.
+      boolean forceFailure = forcePrefetchFailure && (b % 2 == 1);
+      if (forceFailure) {
+        expectedNumErrors++;
+        blockManager.forceNextReadToFail = true;
+      } else {
+        expectedNumSuccesses++;
+      }
+      blockManager.requestPrefetch(b);
+    }
+
+    assertEquals(0, blockManager.numCached());
+
+    blockManager.cancelPrefetches();
+    waitForCaching(blockManager, expectedNumSuccesses);
+    assertEquals(expectedNumErrors, this.totalErrors(blockManager));
+    assertEquals(expectedNumSuccesses, blockManager.numCached());
+  }
+
+  // @Ignore
+  @Test
+  public void testCachingOfPrefetched() throws IOException, InterruptedException {
+    MockS3File s3File = new MockS3File(FILE_SIZE, false);
+    S3Reader reader = new S3Reader(s3File);
+    S3CachingBlockManager blockManager =
+        new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE);
+    assertInitialState(blockManager);
+
+    for (int b = 0; b < blockData.getNumBlocks(); b++) {
+      blockManager.requestPrefetch(b);
+      BufferData data = blockManager.get(b);
+      blockManager.requestCaching(data);
+    }
+
+    waitForCaching(blockManager, blockData.getNumBlocks());
+    assertEquals(blockData.getNumBlocks(), blockManager.numCached());
+    assertEquals(0, this.totalErrors(blockManager));
+  }
+
+  // @Ignore
+  @Test
+  public void testCachingOfGet() throws IOException, InterruptedException {
+    testCachingOfGetHelper(false);
+  }
+
+  // @Ignore
+  @Test
+  public void testCachingFailureOfGet() throws IOException, InterruptedException {
+    testCachingOfGetHelper(true);
+  }
+
+  public void testCachingOfGetHelper(boolean forceCachingFailure)
+      throws IOException, InterruptedException {
+    MockS3File s3File = new MockS3File(FILE_SIZE, false);
+    S3Reader reader = new S3Reader(s3File);
+    TestBlockManager blockManager =
+        new TestBlockManager(futurePool, reader, blockData, POOL_SIZE);
+    assertInitialState(blockManager);
+
+    int expectedNumErrors = 0;
+    int expectedNumSuccesses = 0;
+
+    for (int b = 0; b < blockData.getNumBlocks(); b++) {
+      // We simulate caching failure for all odd numbered blocks.
+      boolean forceFailure = forceCachingFailure && (b % 2 == 1);
+      if (forceFailure) {
+        expectedNumErrors++;
+      } else {
+        expectedNumSuccesses++;
+      }
+
+      BufferData data = blockManager.get(b);
+      if (forceFailure) {
+        blockManager.forceNextCachePutToFail = true;
+      }
+
+      blockManager.requestCaching(data);
+      waitForCaching(blockManager, expectedNumSuccesses);
+      assertEquals(expectedNumSuccesses, blockManager.numCached());
+
+      if (forceCachingFailure) {
+        assertEquals(expectedNumErrors, this.totalErrors(blockManager));
+      } else {
+        assertEquals(0, this.totalErrors(blockManager));
+      }
+    }
+  }
+
+  private void waitForCaching(
+      S3CachingBlockManager blockManager,
+      int expectedCount)
+        throws InterruptedException {
+    // Wait for async cache operation to be over.
+    int numTrys = 0;
+    int count;
+    do {
+      Thread.sleep(100);
+      count = blockManager.numCached();
+      numTrys++;
+      if (numTrys > 600) {
+        String message = String.format(
+            "waitForCaching: expected: %d, actual: %d, read errors: %d, caching errors: %d",
+            expectedCount, count, blockManager.numReadErrors(), blockManager.numCachingErrors());
+        throw new IllegalStateException(message);
+      }
+    }
+    while (count < expectedCount);
+  }
+
+  private int totalErrors(S3CachingBlockManager blockManager) {
+    return blockManager.numCachingErrors() + blockManager.numReadErrors();
+  }
+
+  private void assertInitialState(S3CachingBlockManager blockManager) {
+    assertEquals(0, blockManager.numCached());
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java
new file mode 100644
index 00000000000..2f555d2b62c
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.twitter.util.ExecutorServiceFuturePool;
+import com.twitter.util.FuturePool;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.common.ExceptionAsserts;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+public class TestS3File extends AbstractHadoopTestBase {
+  private final ExecutorService threadPool = Executors.newFixedThreadPool(1);
+  private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
+  private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket");
+
+  @Test
+  public void testArgChecks() throws Exception {
+    S3AReadOpContext readContext = Fakes.createReadContext(futurePool, "key", 10, 10, 1);
+    S3ObjectAttributes attrs = Fakes.createObjectAttributes("bucket", "key", 10);
+    S3AInputStreamStatistics stats =
+        readContext.getS3AStatisticsContext().newInputStreamStatistics();
+    ChangeTracker changeTracker = Fakes.createChangeTracker("bucket", "key", 10);
+
+    // Should not throw.
+    new S3File(readContext, attrs, client, stats, changeTracker);
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'context' must not be null",
+        () -> new S3File(null, attrs, client, stats, changeTracker));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'s3Attributes' must not be null",
+        () -> new S3File(readContext, null, client, stats, changeTracker));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'client' must not be null",
+        () -> new S3File(readContext, attrs, null, stats, changeTracker));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'streamStatistics' must not be null",
+        () -> new S3File(readContext, attrs, client, null, changeTracker));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'changeTracker' must not be null",
+        () -> new S3File(readContext, attrs, client, stats, null));
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java
new file mode 100644
index 00000000000..503cd699002
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.twitter.util.ExecutorServiceFuturePool;
+import com.twitter.util.FuturePool;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.common.ExceptionAsserts;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Applies the same set of tests to both S3CachingInputStream and S3InMemoryInputStream.
+ */
+public class TestS3InputStream extends AbstractHadoopTestBase {
+
+  private static final int FILE_SIZE = 10;
+
+  private final ExecutorService threadPool = Executors.newFixedThreadPool(4);
+  private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
+  private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket");
+
+  @Test
+  public void testArgChecks() throws Exception {
+    S3AReadOpContext readContext = Fakes.createReadContext(futurePool, "key", 10, 10, 1);
+    S3ObjectAttributes attrs = Fakes.createObjectAttributes("bucket", "key", 10);
+
+    // Should not throw.
+    new S3CachingInputStream(readContext, attrs, client);
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'context' must not be null",
+        () -> new S3CachingInputStream(null, attrs, client));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'s3Attributes' must not be null",
+        () -> new S3CachingInputStream(readContext, null, client));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'client' must not be null",
+        () -> new S3CachingInputStream(readContext, attrs, null));
+  }
+
+  @Test
+  public void testRead0SizedFile() throws Exception {
+    S3InputStream inputStream =
+        Fakes.createS3InMemoryInputStream(futurePool, "bucket", "key", 0);
+    testRead0SizedFileHelper(inputStream, 9);
+
+    inputStream = Fakes.createS3CachingInputStream(futurePool, "bucket", "key", 0, 5, 2);
+    testRead0SizedFileHelper(inputStream, 5);
+  }
+
+  private void testRead0SizedFileHelper(S3InputStream inputStream, int bufferSize)
+      throws Exception {
+    assertEquals(0, inputStream.available());
+    assertEquals(-1, inputStream.read());
+    assertEquals(-1, inputStream.read());
+
+    byte[] buffer = new byte[2];
+    assertEquals(-1, inputStream.read(buffer));
+    assertEquals(-1, inputStream.read());
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    S3InputStream inputStream =
+        Fakes.createS3InMemoryInputStream(futurePool, "bucket", "key", FILE_SIZE);
+    testReadHelper(inputStream, FILE_SIZE);
+
+    inputStream =
+        Fakes.createS3CachingInputStream(futurePool, "bucket", "key", FILE_SIZE, 5, 2);
+    testReadHelper(inputStream, 5);
+  }
+
+  private void testReadHelper(S3InputStream inputStream, int bufferSize) throws Exception {
+    assertEquals(bufferSize, inputStream.available());
+    assertEquals(0, inputStream.read());
+    assertEquals(1, inputStream.read());
+
+    byte[] buffer = new byte[2];
+    assertEquals(2, inputStream.read(buffer));
+    assertEquals(2, buffer[0]);
+    assertEquals(3, buffer[1]);
+
+    assertEquals(4, inputStream.read());
+
+    buffer = new byte[10];
+    int curPos = (int) inputStream.getPos();
+    int expectedRemainingBytes = (int) (FILE_SIZE - curPos);
+    int readStartOffset = 2;
+    assertEquals(
+        expectedRemainingBytes,
+        inputStream.read(buffer, readStartOffset, expectedRemainingBytes));
+
+    for (int i = 0; i < expectedRemainingBytes; i++) {
+      assertEquals(curPos + i, buffer[readStartOffset + i]);
+    }
+
+    assertEquals(-1, inputStream.read());
+    Thread.sleep(100);
+    assertEquals(-1, inputStream.read());
+    assertEquals(-1, inputStream.read());
+    assertEquals(-1, inputStream.read(buffer));
+    assertEquals(-1, inputStream.read(buffer, 1, 3));
+  }
+
+  @Test
+  public void testSeek() throws Exception {
+    S3InputStream inputStream;
+    inputStream = Fakes.createS3InMemoryInputStream(futurePool, "bucket", "key", 9);
+    testSeekHelper(inputStream, 9, 9);
+
+    inputStream = Fakes.createS3CachingInputStream(futurePool, "bucket", "key", 9, 5, 1);
+    testSeekHelper(inputStream, 5, 9);
+  }
+
+  private void testSeekHelper(S3InputStream inputStream, int bufferSize, int fileSize)
+      throws Exception {
+    assertEquals(0, inputStream.getPos());
+    inputStream.seek(7);
+    assertEquals(7, inputStream.getPos());
+    inputStream.seek(0);
+
+    assertEquals(bufferSize, inputStream.available());
+    for (int i = 0; i < fileSize; i++) {
+      assertEquals(i, inputStream.read());
+    }
+
+    for (int i = 0; i < fileSize; i++) {
+      inputStream.seek(i);
+      for (int j = i; j < fileSize; j++) {
+        assertEquals(j, inputStream.read());
+      }
+    }
+
+    // Test invalid seeks.
+    ExceptionAsserts.assertThrows(
+        EOFException.class,
+        FSExceptionMessages.NEGATIVE_SEEK,
+        () -> inputStream.seek(-1));
+
+    ExceptionAsserts.assertThrows(
+        EOFException.class,
+        FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
+        () -> inputStream.seek(fileSize + 1));
+  }
+
+  @Test
+  public void testRandomSeek() throws Exception {
+    S3InputStream inputStream;
+    inputStream = Fakes.createS3InMemoryInputStream(futurePool, "bucket", "key", 9);
+    testRandomSeekHelper(inputStream, 9, 9);
+
+    inputStream = Fakes.createS3CachingInputStream(futurePool, "bucket", "key", 9, 5, 1);
+    testRandomSeekHelper(inputStream, 5, 9);
+  }
+
+  private void testRandomSeekHelper(S3InputStream inputStream, int bufferSize, int fileSize)
+      throws Exception {
+    assertEquals(0, inputStream.getPos());
+    inputStream.seek(7);
+    assertEquals(7, inputStream.getPos());
+    inputStream.seek(0);
+
+    assertEquals(bufferSize, inputStream.available());
+    for (int i = 0; i < fileSize; i++) {
+      assertEquals(i, inputStream.read());
+    }
+
+    for (int i = 0; i < fileSize; i++) {
+      inputStream.seek(i);
+      for (int j = i; j < fileSize; j++) {
+        assertEquals(j, inputStream.read());
+      }
+
+      int seekFromEndPos = fileSize - i - 1;
+      inputStream.seek(seekFromEndPos);
+      for (int j = seekFromEndPos; j < fileSize; j++) {
+        assertEquals(j, inputStream.read());
+      }
+    }
+  }
+
+  @Test
+  public void testClose() throws Exception {
+    S3InputStream inputStream =
+        Fakes.createS3InMemoryInputStream(futurePool, "bucket", "key", 9);
+    testCloseHelper(inputStream, 9);
+
+    inputStream =
+        Fakes.createS3CachingInputStream(futurePool, "bucket", "key", 9, 5, 3);
+    testCloseHelper(inputStream, 5);
+  }
+
+  private void testCloseHelper(S3InputStream inputStream, int bufferSize) throws Exception {
+    assertEquals(bufferSize, inputStream.available());
+    assertEquals(0, inputStream.read());
+    assertEquals(1, inputStream.read());
+
+    inputStream.close();
+
+    ExceptionAsserts.assertThrows(
+        IOException.class,
+        FSExceptionMessages.STREAM_IS_CLOSED,
+        () -> inputStream.available());
+
+    ExceptionAsserts.assertThrows(
+        IOException.class,
+        FSExceptionMessages.STREAM_IS_CLOSED,
+        () -> inputStream.read());
+
+    byte[] buffer = new byte[10];
+    ExceptionAsserts.assertThrows(
+        IOException.class,
+        FSExceptionMessages.STREAM_IS_CLOSED,
+        () -> inputStream.read(buffer));
+
+    // Verify a second close() does not throw.
+    inputStream.close();
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3Reader.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3Reader.java
new file mode 100644
index 00000000000..10e5e29da2d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3Reader.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.common.ExceptionAsserts;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestS3Reader extends AbstractHadoopTestBase {
+
+  private static final int FILE_SIZE = 9;
+  private static final int BUFFER_SIZE = 2;
+  private final S3File s3File = new MockS3File(FILE_SIZE, false);
+
+  @Test
+  public void testArgChecks() throws Exception {
+    // Should not throw.
+    S3Reader reader = new S3Reader(s3File);
+
+    // Verify it throws correctly.
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'s3File' must not be null",
+        () -> new S3Reader(null));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'buffer' must not be null",
+        () -> reader.read(null, 10, 2));
+
+    ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'offset' (-1) must be within the range [0, 9]",
+        () -> reader.read(buffer, -1, 2));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'offset' (11) must be within the range [0, 9]",
+        () -> reader.read(buffer, 11, 2));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'size' must be a positive integer",
+        () -> reader.read(buffer, 1, 0));
+
+    ExceptionAsserts.assertThrows(
+        IllegalArgumentException.class,
+        "'size' must be a positive integer",
+        () -> reader.read(buffer, 1, -1));
+  }
+
+  @Test
+  public void testGetWithOffset() throws Exception {
+    for (int i = 0; i < FILE_SIZE; i++) {
+      testGetHelper(false, i);  // no retry
+      testGetHelper(true, i);   // with retry
+    }
+  }
+
+  private void testGetHelper(boolean testWithRetry, long startOffset)
+      throws Exception {
+    int numBlocks = 0;
+    ByteBuffer buffer;
+    S3Reader reader = new S3Reader(new MockS3File(FILE_SIZE, testWithRetry));
+    int remainingSize = FILE_SIZE - (int) startOffset;
+    for (int bufferSize = 0; bufferSize <= FILE_SIZE + 1; bufferSize++) {
+      buffer = ByteBuffer.allocate(bufferSize);
+      for (int readSize = 1; readSize <= FILE_SIZE; readSize++) {
+        buffer.clear();
+        int numBytesRead = reader.read(buffer, startOffset, readSize);
+        int expectedNumBytesRead = Math.min(readSize, remainingSize);
+        expectedNumBytesRead = Math.min(bufferSize, expectedNumBytesRead);
+        assertEquals(expectedNumBytesRead, numBytesRead);
+
+        byte[] bytes = buffer.array();
+        for (int i = 0; i< expectedNumBytesRead; i++) {
+          assertEquals(startOffset + i, bytes[i]);
+        }
+      }
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
index d95b46b10d7..6092abb77c6 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
@@ -25,8 +25,8 @@ import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3ATestConstants;
 import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.read.S3PrefetchingInputStream;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -162,7 +162,15 @@ public class S3AScaleTestBase extends AbstractS3ATestBase {
    */
   protected S3AInputStreamStatistics getInputStreamStatistics(
       FSDataInputStream in) {
-    return getS3AInputStream(in).getS3AStreamStatistics();
+
+    InputStream inner = in.getWrappedStream();
+    if (inner instanceof S3AInputStream) {
+      return ((S3AInputStream) inner).getS3AStreamStatistics();
+    } else if (inner instanceof S3PrefetchingInputStream) {
+      return ((S3PrefetchingInputStream) inner).getS3AStreamStatistics();
+    } else {
+      throw new AssertionError("Not an S3AInputStream or S3PrefetchingInputStream: " + inner);
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org