You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/04/24 16:36:19 UTC

[hadoop] 03/04: HADOOP-16202. Enhanced openFile(): hadoop-aws changes. (#2584/3)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit e0cd0a82e032b926774dcea69edd4fa20aae2603
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Sun Apr 24 17:23:19 2022 +0100

    HADOOP-16202. Enhanced openFile(): hadoop-aws changes. (#2584/3)
    
    S3A input stream support for the few fs.option.openfile settings.
    As well as supporting the read policy option and values,
    if the file length is declared in fs.option.openfile.length
    then no HEAD request will be issued when opening a file.
    This can cut a few tens of milliseconds off the operation.
    
    The patch adds a new openfile parameter/FS configuration option
    fs.s3a.input.async.drain.threshold (default: 16000).
    It declares the number of bytes remaining in the http input stream
    above which any operation to read and discard the rest of the stream,
    "draining", is executed asynchronously.
    This asynchronous draining offers some performance benefit on seek-heavy
    file IO.
    
    Contributed by Steve Loughran.
    
    Change-Id: I9b0626bbe635e9fd97ac0f463f5e7167e0111e39
---
 .../hadoop-aws/dev-support/findbugs-exclude.xml    |   5 +
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |  45 +-
 .../java/org/apache/hadoop/fs/s3a/Invoker.java     |  28 +
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 283 +++++-----
 .../org/apache/hadoop/fs/s3a/S3AInputPolicy.java   |  93 +++-
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   | 263 ++++++---
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   |  12 +-
 .../org/apache/hadoop/fs/s3a/S3AReadOpContext.java | 101 +++-
 .../apache/hadoop/fs/s3a/S3ObjectAttributes.java   |  15 +-
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |  17 +
 .../hadoop/fs/s3a/commit/CommitOperations.java     |   8 +-
 .../hadoop/fs/s3a/commit/files/PendingSet.java     |  26 +-
 .../fs/s3a/commit/files/SinglePendingCommit.java   |  20 +-
 .../hadoop/fs/s3a/impl/AbstractStoreOperation.java |  26 +-
 .../hadoop/fs/s3a/impl/CallableSupplier.java       |   2 +-
 .../hadoop/fs/s3a/impl/InternalConstants.java      |  21 +-
 .../apache/hadoop/fs/s3a/impl/OpenFileSupport.java | 600 +++++++++++++++++++++
 .../apache/hadoop/fs/s3a/s3guard/S3GuardTool.java  |   4 +-
 .../fs/s3a/select/InternalSelectConstants.java     |   2 +-
 .../apache/hadoop/fs/s3a/select/SelectTool.java    |   4 +-
 .../s3a/statistics/S3AInputStreamStatistics.java   |   7 +
 .../statistics/impl/EmptyS3AStatisticsContext.java |   4 +
 .../src/site/markdown/tools/hadoop-aws/index.md    |   8 +
 .../fs/contract/s3a/ITestS3AContractOpen.java      |  67 +++
 .../fs/contract/s3a/ITestS3AContractSeek.java      |  15 +-
 .../apache/hadoop/fs/s3a/AbstractS3AMockTest.java  |   4 +
 .../hadoop/fs/s3a/ITestS3AConfiguration.java       |  11 -
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |   4 +-
 .../hadoop/fs/s3a/TestS3AInputStreamRetry.java     |  17 +-
 .../org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java  |   5 +-
 .../hadoop/fs/s3a/TestStreamChangeTracker.java     |   2 +-
 .../hadoop/fs/s3a/impl/TestOpenFileSupport.java    | 429 +++++++++++++++
 .../fs/s3a/performance/ITestS3AOpenCost.java       | 209 +++++++
 .../hadoop/fs/s3a/performance/OperationCost.java   |   6 +
 .../s3a/scale/ITestS3AInputStreamPerformance.java  |  58 +-
 .../hadoop/fs/s3a/select/AbstractS3SelectTest.java |   2 +-
 .../apache/hadoop/fs/s3a/select/ITestS3Select.java |   7 +-
 .../hadoop/fs/s3a/select/ITestS3SelectMRJob.java   |   4 +-
 38 files changed, 2062 insertions(+), 372 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
index b4568b69de4..324369076b8 100644
--- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
@@ -28,6 +28,11 @@
     <Method name="s3Exists" />
     <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
   </Match>
+  <!-- we are using completable futures, so ignore the Future which submit() returns -->
+  <Match>
+    <Class name="org.apache.hadoop.fs.s3a.S3AFileSystem$InputStreamCallbacksImpl" />
+    <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
+  </Match>
 
   <!--
    findbugs gets confused by lambda expressions in synchronized methods
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index cb3d72e5668..e5369b84883 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
 
 import java.util.concurrent.TimeUnit;
@@ -602,37 +603,69 @@ public final class Constants {
   public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
   public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024;
 
+  /**
+   * The threshold at which drain operations switch
+   * to being asynchronous with the schedule/wait overhead
+   * compared to synchronous.
+   * Value: {@value}
+   */
+  public static final String ASYNC_DRAIN_THRESHOLD = "fs.s3a.input.async.drain.threshold";
+
+  /**
+   * This is a number based purely on experimentation in
+   * {@code ITestS3AInputStreamPerformance}.
+   * Value: {@value}
+   */
+  public static final int DEFAULT_ASYNC_DRAIN_THRESHOLD = 16_000;
+
   /**
    * Which input strategy to use for buffering, seeking and similar when
    * reading data.
    * Value: {@value}
    */
-  @InterfaceStability.Unstable
   public static final String INPUT_FADVISE =
       "fs.s3a.experimental.input.fadvise";
 
+  /**
+   * The default value for this FS.
+   * Which for S3A, is adaptive.
+   * Value: {@value}
+   * @deprecated use the {@link Options.OpenFileOptions} value
+   * in code which only needs to be compiled against newer hadoop
+   * releases.
+   */
+  public static final String INPUT_FADV_DEFAULT =
+      Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+
   /**
    * General input. Some seeks, some reads.
+   * The policy name "default" is standard across different stores,
+   * and should be preferred.
    * Value: {@value}
    */
-  @InterfaceStability.Unstable
   public static final String INPUT_FADV_NORMAL = "normal";
 
   /**
    * Optimized for sequential access.
    * Value: {@value}
+   * @deprecated use the {@link Options.OpenFileOptions} value
+   * in code which only needs to be compiled against newer hadoop
+   * releases.
    */
-  @InterfaceStability.Unstable
-  public static final String INPUT_FADV_SEQUENTIAL = "sequential";
+  public static final String INPUT_FADV_SEQUENTIAL =
+      Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
 
   /**
    * Optimized purely for random seek+read/positionedRead operations;
    * The performance of sequential IO may be reduced in exchange for
    * more efficient {@code seek()} operations.
    * Value: {@value}
+   * @deprecated use the {@link Options.OpenFileOptions} value
+   * in code which only needs to be compiled against newer hadoop
+   * releases.
    */
-  @InterfaceStability.Unstable
-  public static final String INPUT_FADV_RANDOM = "random";
+  public static final String INPUT_FADV_RANDOM =
+      Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
 
   /**
    * Gauge name for the input policy : {@value}.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
index 0663fe935db..279bfeba987 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.util.DurationInfo;
 import org.apache.hadoop.util.functional.CallableRaisingIOE;
@@ -38,6 +39,8 @@ import org.apache.hadoop.util.functional.FutureIO;
 import org.apache.hadoop.util.functional.InvocationRaisingIOE;
 import org.apache.hadoop.util.Preconditions;
 
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
+
 /**
  * Class to provide lambda expression invocation of AWS operations.
  *
@@ -122,6 +125,31 @@ public class Invoker {
     }
   }
 
+  /**
+   * Execute a function, translating any exception into an IOException.
+   * The supplied duration tracker instance is updated with success/failure.
+   * @param action action to execute (used in error messages)
+   * @param path path of work (used in error messages)
+   * @param tracker tracker to update
+   * @param operation operation to execute
+   * @param <T> type of return value
+   * @return the result of the function call
+   * @throws IOException any IOE raised, or translated exception
+   */
+  @Retries.OnceTranslated
+  public static <T> T onceTrackingDuration(
+      final String action,
+      final String path,
+      final DurationTracker tracker,
+      final CallableRaisingIOE<T> operation)
+      throws IOException {
+    try {
+      return invokeTrackingDuration(tracker, operation);
+    } catch (AmazonClientException e) {
+      throw S3AUtils.translateException(action, path, e);
+    }
+  }
+
   /**
    * Execute an operation with no result.
    * @param action action to execute (used in error messages)
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 83c3a74f5b3..15e240f9018 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -90,6 +90,7 @@ import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Globber;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.impl.OpenFileParameters;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
@@ -109,6 +110,7 @@ import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
 import org.apache.hadoop.fs.s3a.impl.InternalConstants;
 import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
 import org.apache.hadoop.fs.s3a.impl.MkdirOperation;
+import org.apache.hadoop.fs.s3a.impl.OpenFileSupport;
 import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
 import org.apache.hadoop.fs.s3a.impl.RenameOperation;
 import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl;
@@ -116,7 +118,6 @@ import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder;
 import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
 import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
-import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
 import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
 import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
 import org.apache.hadoop.fs.statistics.DurationTracker;
@@ -169,6 +170,7 @@ import org.apache.hadoop.fs.s3a.select.SelectConstants;
 import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
 import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
 import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
 import org.apache.hadoop.fs.s3a.statistics.impl.BondedS3AStatisticsContext;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
@@ -187,7 +189,8 @@ import org.apache.hadoop.util.functional.CallableRaisingIOE;
 import static java.util.Objects.requireNonNull;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
-import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
 import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.Invoker.*;
@@ -298,7 +301,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   /** Storage Statistics Bonded to the instrumentation. */
   private S3AStorageStatistics storageStatistics;
 
-  private long readAhead;
+  /**
+   * Default input policy; may be overridden in
+   * {@code openFile()}.
+   */
   private S3AInputPolicy inputPolicy;
   private ChangeDetectionPolicy changeDetectionPolicy;
   private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -327,6 +333,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
   private final ListingOperationCallbacks listingOperationCallbacks =
           new ListingOperationCallbacksImpl();
+
+  /**
+   * Helper for the openFile() method.
+   */
+  private OpenFileSupport openFileHelper;
+
   /**
    * Directory policy.
    */
@@ -465,9 +477,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
       enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
 
-      readAhead = longBytesOption(conf, READAHEAD_RANGE,
-          DEFAULT_READAHEAD_RANGE, 0);
-
       initThreadPools(conf);
 
       int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
@@ -508,7 +517,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       doBucketProbing();
 
       inputPolicy = S3AInputPolicy.getPolicy(
-          conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
+          conf.getTrimmed(INPUT_FADVISE,
+              Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT),
+          S3AInputPolicy.Normal);
       LOG.debug("Input fadvise policy = {}", inputPolicy);
       changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf);
       LOG.debug("Change detection policy = {}", changeDetectionPolicy);
@@ -555,6 +566,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
               "page size out of range: %s", pageSize);
       listing = new Listing(listingOperationCallbacks, createStoreContext());
+      // now the open file logic
+      openFileHelper = new OpenFileSupport(
+          changeDetectionPolicy,
+          longBytesOption(conf, READAHEAD_RANGE,
+              DEFAULT_READAHEAD_RANGE, 0),
+          username,
+          intOption(conf, IO_FILE_BUFFER_SIZE_KEY,
+              IO_FILE_BUFFER_SIZE_DEFAULT, 0),
+          longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
+                        DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
+          inputPolicy);
     } catch (AmazonClientException e) {
       // amazon client exception: stop all services then throw the translation
       cleanupWithLogger(LOG, span);
@@ -1178,15 +1200,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     return fixBucketRegion(region);
   }
 
-  /**
-   * Returns the read ahead range value used by this filesystem.
-   * @return the readahead range
-   */
-  @VisibleForTesting
-  long getReadAheadRange() {
-    return readAhead;
-  }
-
   /**
    * Get the input policy for this FS instance.
    * @return the input policy
@@ -1268,13 +1281,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
   /**
    * Change the input policy for this FS.
+   * This is now a no-op, retained in case some application
+   * or external test invokes it.
+   *
+   * @deprecated use openFile() options
    * @param inputPolicy new policy
    */
   @InterfaceStability.Unstable
+  @Deprecated
   public void setInputPolicy(S3AInputPolicy inputPolicy) {
-    Objects.requireNonNull(inputPolicy, "Null inputStrategy");
-    LOG.debug("Setting input strategy: {}", inputPolicy);
-    this.inputPolicy = inputPolicy;
+    LOG.warn("setInputPolicy is no longer supported");
   }
 
   /**
@@ -1392,64 +1408,46 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   @Retries.RetryTranslated
   public FSDataInputStream open(Path f, int bufferSize)
       throws IOException {
-    return open(f, Optional.empty(), Optional.empty());
+    return executeOpen(qualify(f),
+        openFileHelper.openSimpleFile(bufferSize));
   }
 
   /**
    * Opens an FSDataInputStream at the indicated Path.
-   * if status contains an S3AFileStatus reference, it is used
-   * and so a HEAD request to the store is avoided.
-   *
-   * @param file the file to open
-   * @param options configuration options if opened with the builder API.
-   * @param providedStatus optional file status.
+   * The {@code fileInformation} parameter controls how the file
+   * is opened, whether it is normal vs. an S3 select call,
+   * can a HEAD be skipped, etc.
+   * @param path the file to open
+   * @param fileInformation information about the file to open
    * @throws IOException IO failure.
    */
-  @Retries.RetryTranslated
   @AuditEntryPoint
-  private FSDataInputStream open(
-      final Path file,
-      final Optional<Configuration> options,
-      final Optional<S3AFileStatus> providedStatus)
+  @Retries.RetryTranslated
+  private FSDataInputStream executeOpen(
+      final Path path,
+      final OpenFileSupport.OpenFileInformation fileInformation)
       throws IOException {
-
-    final Path path = qualify(file);
+    // create the input stream statistics before opening
+    // the file so that the time to prepare to open the file is included.
+    S3AInputStreamStatistics inputStreamStats =
+        statisticsContext.newInputStreamStatistics();
     // this span is passed into the stream.
     final AuditSpan auditSpan = entryPoint(INVOCATION_OPEN, path);
-    S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path,
-        providedStatus);
-
-    S3AReadOpContext readContext;
-    if (options.isPresent()) {
-      Configuration o = options.get();
-      // normal path. Open the file with the chosen seek policy, if different
-      // from the normal one.
-      // and readahead.
-      S3AInputPolicy policy = S3AInputPolicy.getPolicy(
-          o.get(INPUT_FADVISE, inputPolicy.toString()));
-      long readAheadRange2 = o.getLong(READAHEAD_RANGE, readAhead);
-      // TODO support change detection policy from options?
-      readContext = createReadContext(
-          fileStatus,
-          policy,
-          changeDetectionPolicy,
-          readAheadRange2,
-          auditSpan);
-    } else {
-      readContext = createReadContext(
-          fileStatus,
-          inputPolicy,
-          changeDetectionPolicy,
-          readAhead,
-          auditSpan);
-    }
+    final S3AFileStatus fileStatus =
+        trackDuration(inputStreamStats,
+            ACTION_FILE_OPENED.getSymbol(), () ->
+            extractOrFetchSimpleFileStatus(path, fileInformation));
+    S3AReadOpContext readContext = createReadContext(
+        fileStatus,
+        auditSpan);
+    fileInformation.applyOptions(readContext);
     LOG.debug("Opening '{}'", readContext);
-
     return new FSDataInputStream(
         new S3AInputStream(
-            readContext,
-            createObjectAttributes(fileStatus),
-            createInputStreamCallbacks(auditSpan)));
+            readContext.build(),
+            createObjectAttributes(path, fileStatus),
+            createInputStreamCallbacks(auditSpan),
+            inputStreamStats));
   }
 
   /**
@@ -1503,34 +1501,40 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         return s3.getObject(request);
       }
     }
+
+    @Override
+    public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) {
+      CompletableFuture<T> result = new CompletableFuture<>();
+      unboundedThreadPool.submit(() ->
+          LambdaUtils.eval(result, () -> {
+            try (AuditSpan span = auditSpan.activate()) {
+              return operation.apply();
+            }
+          }));
+      return result;
+    }
   }
 
   /**
    * Create the read context for reading from the referenced file,
    * using FS state as well as the status.
    * @param fileStatus file status.
-   * @param seekPolicy input policy for this operation
-   * @param changePolicy change policy for this operation.
-   * @param readAheadRange readahead value.
    * @param auditSpan audit span.
    * @return a context for read and select operations.
    */
   @VisibleForTesting
   protected S3AReadOpContext createReadContext(
       final FileStatus fileStatus,
-      final S3AInputPolicy seekPolicy,
-      final ChangeDetectionPolicy changePolicy,
-      final long readAheadRange,
       final AuditSpan auditSpan) {
-    return new S3AReadOpContext(fileStatus.getPath(),
+    final S3AReadOpContext roc = new S3AReadOpContext(
+        fileStatus.getPath(),
         invoker,
         statistics,
         statisticsContext,
-        fileStatus,
-        seekPolicy,
-        changePolicy,
-        readAheadRange,
-        auditSpan);
+        fileStatus)
+        .withAuditSpan(auditSpan);
+    openFileHelper.applyDefaultOptions(roc);
+    return roc.build();
   }
 
   /**
@@ -1558,13 +1562,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
   /**
    * Create the attributes of an object for subsequent use.
+   * @param path path -this is used over the file status path.
    * @param fileStatus file status to build from.
    * @return attributes to use when building the query.
    */
   private S3ObjectAttributes createObjectAttributes(
+      final Path path,
       final S3AFileStatus fileStatus) {
     return createObjectAttributes(
-        fileStatus.getPath(),
+        path,
         fileStatus.getEtag(),
         fileStatus.getVersionId(),
         fileStatus.getLen());
@@ -1981,14 +1987,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     @Override
     public S3ObjectAttributes createObjectAttributes(
         final S3AFileStatus fileStatus) {
-      return S3AFileSystem.this.createObjectAttributes(fileStatus);
+      return S3AFileSystem.this.createObjectAttributes(
+          fileStatus.getPath(),
+          fileStatus);
     }
 
     @Override
     public S3AReadOpContext createReadContext(final FileStatus fileStatus) {
       return S3AFileSystem.this.createReadContext(fileStatus,
-          inputPolicy,
-          changeDetectionPolicy, readAhead,
           auditSpan);
     }
 
@@ -4085,9 +4091,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   /**
    * Return the number of bytes that large input files should be optimally
    * be split into to minimize I/O time.
-   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
    */
-  @Deprecated
   public long getDefaultBlockSize() {
     return getConf().getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
   }
@@ -4106,14 +4110,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         "S3AFileSystem{");
     sb.append("uri=").append(uri);
     sb.append(", workingDir=").append(workingDir);
-    sb.append(", inputPolicy=").append(inputPolicy);
     sb.append(", partSize=").append(partSize);
     sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete);
     sb.append(", maxKeys=").append(maxKeys);
     if (cannedACL != null) {
-      sb.append(", cannedACL=").append(cannedACL.toString());
+      sb.append(", cannedACL=").append(cannedACL);
+    }
+    if (openFileHelper != null) {
+      sb.append(", ").append(openFileHelper);
     }
-    sb.append(", readAhead=").append(readAhead);
     if (getConf() != null) {
       sb.append(", blockSize=").append(getDefaultBlockSize());
     }
@@ -4799,23 +4804,26 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   @Retries.RetryTranslated
   @AuditEntryPoint
   private FSDataInputStream select(final Path source,
-      final String expression,
       final Configuration options,
-      final Optional<S3AFileStatus> providedStatus)
+      final OpenFileSupport.OpenFileInformation fileInformation)
       throws IOException {
-    final AuditSpan auditSpan = entryPoint(OBJECT_SELECT_REQUESTS, source);
     requireSelectSupport(source);
+    final AuditSpan auditSpan = entryPoint(OBJECT_SELECT_REQUESTS, source);
     final Path path = makeQualified(source);
+    String expression = fileInformation.getSql();
     final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path,
-        providedStatus);
+        fileInformation);
 
     // readahead range can be dynamically set
-    long ra = options.getLong(READAHEAD_RANGE, readAhead);
-    S3ObjectAttributes objectAttributes = createObjectAttributes(fileStatus);
-    S3AReadOpContext readContext = createReadContext(fileStatus, inputPolicy,
-        changeDetectionPolicy, ra, auditSpan);
+    S3ObjectAttributes objectAttributes = createObjectAttributes(
+        path, fileStatus);
+    ChangeDetectionPolicy changePolicy = fileInformation.getChangePolicy();
+    S3AReadOpContext readContext = createReadContext(
+        fileStatus,
+        auditSpan);
+    fileInformation.applyOptions(readContext);
 
-    if (changeDetectionPolicy.getSource() != ChangeDetectionPolicy.Source.None
+    if (changePolicy.getSource() != ChangeDetectionPolicy.Source.None
         && fileStatus.getEtag() != null) {
       // if there is change detection, and the status includes at least an
       // etag,
@@ -4827,7 +4835,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       // version in the final read; nor can we check the etag match)
       ChangeTracker changeTracker =
           new ChangeTracker(uri.toString(),
-              changeDetectionPolicy,
+              changePolicy,
               readContext.getS3AStatisticsContext()
                   .newInputStreamStatistics()
                   .getChangeTrackerStatistics(),
@@ -4865,38 +4873,36 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   }
 
   /**
-   * Extract the status from the optional parameter, querying
-   * S3 if it is absent.
-   * @param path path of the status
-   * @param optStatus optional status
+   * Get the file status of the source file.
+   * If in the fileInformation parameter return that
+   * if not found, issue a HEAD request, looking for a
+   * file only.
+   * @param path path of the file to open
+   * @param fileInformation information on the file to open
    * @return a file status
-   * @throws FileNotFoundException if there is no normal file at that path
+   * @throws FileNotFoundException if a HEAD request found no file
    * @throws IOException IO failure
    */
   private S3AFileStatus extractOrFetchSimpleFileStatus(
-      final Path path, final Optional<S3AFileStatus> optStatus)
+      final Path path,
+      final OpenFileSupport.OpenFileInformation fileInformation)
       throws IOException {
-    S3AFileStatus fileStatus;
-    if (optStatus.isPresent()) {
-      fileStatus = optStatus.get();
+    S3AFileStatus fileStatus = fileInformation.getStatus();
+    if (fileStatus == null) {
       // we check here for the passed in status
       // being a directory
-      if (fileStatus.isDirectory()) {
-        throw new FileNotFoundException(path.toString() + " is a directory");
-      }
-    } else {
-      // Executes a HEAD only.
-      // therefore: if there is is a dir marker, this
-      // will raise a FileNotFoundException
       fileStatus = innerGetFileStatus(path, false,
           StatusProbeEnum.HEAD_ONLY);
     }
+    if (fileStatus.isDirectory()) {
+      throw new FileNotFoundException(path.toString() + " is a directory");
+    }
 
     return fileStatus;
   }
 
   /**
-   * Initiate the open or select operation.
+   * Initiate the open() or select() operation.
    * This is invoked from both the FileSystem and FileContext APIs.
    * It's declared as an audit entry point but the span creation is pushed
    * down into the open/select methods it ultimately calls.
@@ -4915,54 +4921,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       final Path rawPath,
       final OpenFileParameters parameters) throws IOException {
     final Path path = qualify(rawPath);
-    Configuration options = parameters.getOptions();
-    Set<String> mandatoryKeys = parameters.getMandatoryKeys();
-    String sql = options.get(SelectConstants.SELECT_SQL, null);
-    boolean isSelect = sql != null;
-    // choice of keys depends on open type
-    if (isSelect) {
-      rejectUnknownMandatoryKeys(
-          mandatoryKeys,
-          InternalSelectConstants.SELECT_OPTIONS,
-          "for " + path + " in S3 Select operation");
-    } else {
-      rejectUnknownMandatoryKeys(
-          mandatoryKeys,
-          InternalConstants.STANDARD_OPENFILE_KEYS,
-          "for " + path + " in non-select file I/O");
-    }
-    FileStatus providedStatus = parameters.getStatus();
-    S3AFileStatus fileStatus;
-    if (providedStatus != null) {
-      Preconditions.checkArgument(path.equals(providedStatus.getPath()),
-          "FileStatus parameter is not for the path %s: %s",
-          path, providedStatus);
-      if (providedStatus instanceof S3AFileStatus) {
-        // can use this status to skip our own probes,
-        // including etag and version.
-        LOG.debug("File was opened with a supplied S3AFileStatus;"
-            + " skipping getFileStatus call in open() operation: {}",
-            providedStatus);
-        fileStatus = (S3AFileStatus) providedStatus;
-      } else if (providedStatus instanceof S3ALocatedFileStatus) {
-        LOG.debug("File was opened with a supplied S3ALocatedFileStatus;"
-            + " skipping getFileStatus call in open() operation: {}",
-            providedStatus);
-        fileStatus = ((S3ALocatedFileStatus) providedStatus).toS3AFileStatus();
-      } else {
-        LOG.debug("Ignoring file status {}", providedStatus);
-        fileStatus = null;
-      }
-    } else {
-      fileStatus = null;
-    }
-    Optional<S3AFileStatus> ost = Optional.ofNullable(fileStatus);
+    OpenFileSupport.OpenFileInformation fileInformation =
+        openFileHelper.prepareToOpenFile(
+            path,
+            parameters,
+            getDefaultBlockSize());
     CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
-    if (!isSelect) {
+    if (!fileInformation.isS3Select()) {
       // normal path.
       unboundedThreadPool.submit(() ->
           LambdaUtils.eval(result,
-              () -> open(path, Optional.of(options), ost)));
+              () -> executeOpen(path, fileInformation)));
     } else {
       // it is a select statement.
       // fail fast if the operation is not available
@@ -4970,7 +4939,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       // submit the query
       unboundedThreadPool.submit(() ->
           LambdaUtils.eval(result,
-              () -> select(path, sql, options, ost)));
+              () -> select(path, parameters.getOptions(), fileInformation)));
     }
     return result;
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java
index c018410e117..b90d0f2a616 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java
@@ -18,32 +18,46 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Locale;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Locale;
 
-import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
 
 /**
- * Filesystem input policy.
+ * Stream input policy.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public enum S3AInputPolicy {
 
-  Normal(INPUT_FADV_NORMAL),
-  Sequential(INPUT_FADV_SEQUENTIAL),
-  Random(INPUT_FADV_RANDOM);
+  Normal(FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, false, true),
+  Random(FS_OPTION_OPENFILE_READ_POLICY_RANDOM, true, false),
+  Sequential(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, false, false);
 
-  private static final Logger LOG =
-      LoggerFactory.getLogger(S3AInputPolicy.class);
+  /** Policy name. */
   private final String policy;
 
-  S3AInputPolicy(String policy) {
+  /** Is this random IO? */
+  private final boolean randomIO;
+
+  /** Is this an adaptive policy? */
+  private final boolean adaptive;
+
+  S3AInputPolicy(String policy,
+      boolean randomIO,
+      boolean adaptive) {
     this.policy = policy;
+    this.randomIO = randomIO;
+    this.adaptive = adaptive;
   }
 
   @Override
@@ -51,26 +65,63 @@ public enum S3AInputPolicy {
     return policy;
   }
 
+  String getPolicy() {
+    return policy;
+  }
+
+  boolean isRandomIO() {
+    return randomIO;
+  }
+
+  boolean isAdaptive() {
+    return adaptive;
+  }
+
   /**
-   * Choose an FS access policy.
-   * Always returns something,
-   * primarily by downgrading to "normal" if there is no other match.
+   * Choose an access policy.
    * @param name strategy name from a configuration option, etc.
+   * @param defaultPolicy default policy to fall back to.
    * @return the chosen strategy
    */
-  public static S3AInputPolicy getPolicy(String name) {
+  public static S3AInputPolicy getPolicy(
+      String name,
+      @Nullable S3AInputPolicy defaultPolicy) {
     String trimmed = name.trim().toLowerCase(Locale.ENGLISH);
     switch (trimmed) {
-    case INPUT_FADV_NORMAL:
+    case FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE:
+    case FS_OPTION_OPENFILE_READ_POLICY_DEFAULT:
+    case Constants.INPUT_FADV_NORMAL:
       return Normal;
-    case INPUT_FADV_RANDOM:
+
+    // all these options currently map to random IO.
+    case FS_OPTION_OPENFILE_READ_POLICY_RANDOM:
+    case FS_OPTION_OPENFILE_READ_POLICY_VECTOR:
       return Random;
-    case INPUT_FADV_SEQUENTIAL:
+
+    case FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL:
+    case FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE:
       return Sequential;
     default:
-      LOG.warn("Unrecognized " + INPUT_FADVISE + " value: \"{}\"", trimmed);
-      return Normal;
+      return defaultPolicy;
+    }
+  }
+
+  /**
+   * Scan the list of input policies, returning the first one supported.
+   * @param policies list of policies.
+   * @param defaultPolicy fallback
+   * @return a policy or the defaultPolicy, which may be null
+   */
+  public static S3AInputPolicy getFirstSupportedPolicy(
+      Collection<String> policies,
+      @Nullable S3AInputPolicy defaultPolicy) {
+    for (String s : policies) {
+      S3AInputPolicy nextPolicy = S3AInputPolicy.getPolicy(s, null);
+      if (nextPolicy != null) {
+        return nextPolicy;
+      }
     }
+    return defaultPolicy;
   }
 
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 79a65acb438..6beeb2891ee 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
+
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -37,7 +38,7 @@ import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,9 +47,14 @@ import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
+import java.util.concurrent.CompletableFuture;
 
+import static java.util.Objects.requireNonNull;
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
+import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
 import static org.apache.hadoop.util.StringUtils.toLowerCase;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 
 /**
  * The input stream for an S3A object.
@@ -78,6 +84,11 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   public static final String OPERATION_OPEN = "open";
   public static final String OPERATION_REOPEN = "re-open";
 
+  /**
+   * size of a buffer to create when draining the stream.
+   */
+  private static final int DRAIN_BUFFER_SIZE = 16384;
+
   /**
    * This is the public position; the one set in {@link #seek(long)}
    * and returned in {@link #getPos()}.
@@ -136,6 +147,12 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    */
   private final IOStatistics ioStatistics;
 
+  /**
+   * Threshold for stream reads to switch to
+   * asynchronous draining.
+   */
+  private long asyncDrainThreshold;
+
   /**
    * Create the stream.
    * This does not attempt to open it; that is only done on the first
@@ -143,10 +160,12 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    * @param ctx operation context
    * @param s3Attributes object attributes
    * @param client S3 client to use
+   * @param streamStatistics statistics for this stream
    */
   public S3AInputStream(S3AReadOpContext ctx,
       S3ObjectAttributes s3Attributes,
-      InputStreamCallbacks client) {
+      InputStreamCallbacks client,
+      S3AInputStreamStatistics streamStatistics) {
     Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
         "No Bucket");
     Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
@@ -155,12 +174,11 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     this.context = ctx;
     this.bucket = s3Attributes.getBucket();
     this.key = s3Attributes.getKey();
-    this.pathStr = ctx.dstFileStatus.getPath().toString();
+    this.pathStr = s3Attributes.getPath().toString();
     this.contentLength = l;
     this.client = client;
     this.uri = "s3a://" + this.bucket + "/" + this.key;
-    this.streamStatistics = ctx.getS3AStatisticsContext()
-        .newInputStreamStatistics();
+    this.streamStatistics = streamStatistics;
     this.ioStatistics = streamStatistics.getIOStatistics();
     this.changeTracker = new ChangeTracker(uri,
         ctx.getChangeDetectionPolicy(),
@@ -168,6 +186,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
         s3Attributes);
     setInputPolicy(ctx.getInputPolicy());
     setReadahead(ctx.getReadahead());
+    this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
   }
 
   /**
@@ -193,7 +212,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
           boolean forceAbort) throws IOException {
 
     if (isObjectStreamOpen()) {
-      closeStream("reopen(" + reason + ")", contentRangeFinish, forceAbort);
+      closeStream("reopen(" + reason + ")", forceAbort, false);
     }
 
     contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
@@ -211,21 +230,10 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
         operation, uri, targetPos);
     changeTracker.maybeApplyConstraint(request);
 
-    DurationTracker tracker = streamStatistics.initiateGetRequest();
-    try {
-      object = Invoker.once(text, uri,
-          () -> client.getObject(request));
-    } catch(IOException e) {
-      // input function failed: note it
-      tracker.failed();
-      // and rethrow
-      throw e;
-    } finally {
-      // update the tracker.
-      // this is called after any catch() call will have
-      // set the failed flag.
-      tracker.close();
-    }
+    object = onceTrackingDuration(text, uri,
+        streamStatistics.initiateGetRequest(), () ->
+            client.getObject(request));
+
 
     changeTracker.processResponse(object, operation,
         targetPos);
@@ -333,7 +341,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
       streamStatistics.seekBackwards(diff);
       // if the stream is in "Normal" mode, switch to random IO at this
       // point, as it is indicative of columnar format IO
-      if (inputPolicy.equals(S3AInputPolicy.Normal)) {
+      if (inputPolicy.isAdaptive()) {
         LOG.info("Switching to Random IO seek policy");
         setInputPolicy(S3AInputPolicy.Random);
       }
@@ -348,7 +356,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
 
     // if the code reaches here, the stream needs to be reopened.
     // close the stream; if read the object will be opened at the new pos
-    closeStream("seekInStream()", this.contentRangeFinish, false);
+    closeStream("seekInStream()", false, false);
     pos = targetPos;
   }
 
@@ -458,7 +466,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
           uri, client, object);
     }
     streamStatistics.readException();
-    closeStream("failure recovery", contentRangeFinish, forceAbort);
+    closeStream("failure recovery", forceAbort, false);
   }
 
   /**
@@ -551,8 +559,8 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     if (!closed) {
       closed = true;
       try {
-        // close or abort the stream
-        closeStream("close() operation", this.contentRangeFinish, false);
+        // close or abort the stream; blocking
+        awaitFuture(closeStream("close() operation", false, true));
         LOG.debug("Statistics of stream {}\n{}", key, streamStatistics);
         // end the client+audit span.
         client.close();
@@ -571,16 +579,26 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    * If a close() is attempted and fails, the operation escalates to
    * an abort.
    *
+   * The close is potentially; a future is returned.
+   * It's the draining of a stream which is time consuming so
+   * worth scheduling on a separate thread.
+   * In stream close, when an abort is issued or when there's no
+   * data to drain, block.
    * This does not set the {@link #closed} flag.
    * @param reason reason for stream being closed; used in messages
-   * @param length length of the stream.
    * @param forceAbort force an abort; used if explicitly requested.
+   * @param blocking should the call block for completion, or is async IO allowed
+   * @return a future for the async operation
    */
   @Retries.OnceRaw
-  private void closeStream(String reason, long length, boolean forceAbort) {
+  private CompletableFuture<Boolean> closeStream(
+      final String reason,
+      final boolean forceAbort,
+      final boolean blocking) {
+
     if (!isObjectStreamOpen()) {
       // steam is already closed
-      return;
+      return CompletableFuture.completedFuture(false);
     }
 
     // if the amount of data remaining in the current request is greater
@@ -589,56 +607,137 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     LOG.debug("Closing stream {}: {}", reason,
         forceAbort ? "abort" : "soft");
     boolean shouldAbort = forceAbort || remaining > readahead;
+    CompletableFuture<Boolean> operation;
+
+    if (blocking || shouldAbort || remaining <= asyncDrainThreshold) {
+      // don't bother with async io.
+      operation = CompletableFuture.completedFuture(
+          drain(shouldAbort, reason, remaining, object, wrappedStream));
+
+    } else {
+      LOG.debug("initiating asynchronous drain of {} bytes", remaining);
+      // schedule an async drain/abort with references to the fields so they
+      // can be reused
+      operation = client.submit(
+          () -> drain(false, reason, remaining, object, wrappedStream));
+    }
+
+    // either the stream is closed in the blocking call or the async call is
+    // submitted with its own copy of the references
+    wrappedStream = null;
+    object = null;
+    return operation;
+  }
+
+  /**
+   * drain the stream. This method is intended to be
+   * used directly or asynchronously, and measures the
+   * duration of the operation in the stream statistics.
+   * @param shouldAbort force an abort; used if explicitly requested.
+   * @param reason reason for stream being closed; used in messages
+   * @param remaining remaining bytes
+   * @param requestObject http request object; needed to avoid GC issues.
+   * @param inner stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drain(
+      final boolean shouldAbort,
+      final String reason,
+      final long remaining,
+      final S3Object requestObject,
+      final S3ObjectInputStream inner) {
 
     try {
-      if (!shouldAbort) {
-        try {
-          // clean close. This will read to the end of the stream,
-          // so, while cleaner, can be pathological on a multi-GB object
-
-          // explicitly drain the stream
-          long drained = 0;
-          while (wrappedStream.read() >= 0) {
-            drained++;
+      return invokeTrackingDuration(
+          streamStatistics.initiateInnerStreamClose(shouldAbort),
+          () -> drainOrAbortHttpStream(
+              shouldAbort,
+              reason,
+              remaining,
+              requestObject,
+              inner));
+    } catch (IOException e) {
+      // this is only here because invokeTrackingDuration() has it in its
+      // signature
+      return shouldAbort;
+    }
+  }
+
+  /**
+   * Drain or abort the inner stream.
+   * Exceptions are swallowed.
+   * If a close() is attempted and fails, the operation escalates to
+   * an abort.
+   *
+   * This does not set the {@link #closed} flag.
+   *
+   * A reference to the stream is passed in so that the instance
+   * {@link #wrappedStream} field can be reused as soon as this
+   * method is submitted;
+   * @param shouldAbort force an abort; used if explicitly requested.
+   * @param reason reason for stream being closed; used in messages
+   * @param remaining remaining bytes
+   * @param requestObject http request object; needed to avoid GC issues.
+   * @param inner stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drainOrAbortHttpStream(
+      boolean shouldAbort,
+      final String reason,
+      final long remaining,
+      final S3Object requestObject,
+      final S3ObjectInputStream inner) {
+    // force a use of the request object so IDEs don't warn of
+    // lack of use.
+    requireNonNull(requestObject);
+
+    if (!shouldAbort) {
+      try {
+        // clean close. This will read to the end of the stream,
+        // so, while cleaner, can be pathological on a multi-GB object
+
+        // explicitly drain the stream
+        long drained = 0;
+        byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
+        while (true) {
+          final int count = inner.read(buffer);
+          if (count < 0) {
+            // no more data is left
+            break;
           }
-          LOG.debug("Drained stream of {} bytes", drained);
-
-          // now close it
-          wrappedStream.close();
-          // this MUST come after the close, so that if the IO operations fail
-          // and an abort is triggered, the initial attempt's statistics
-          // aren't collected.
-          streamStatistics.streamClose(false, drained);
-        } catch (Exception e) {
-          // exception escalates to an abort
-          LOG.debug("When closing {} stream for {}, will abort the stream",
-              uri, reason, e);
-          shouldAbort = true;
+          drained += count;
         }
+        LOG.debug("Drained stream of {} bytes", drained);
+
+        // now close it
+        inner.close();
+        // this MUST come after the close, so that if the IO operations fail
+        // and an abort is triggered, the initial attempt's statistics
+        // aren't collected.
+        streamStatistics.streamClose(false, drained);
+      } catch (Exception e) {
+        // exception escalates to an abort
+        LOG.debug("When closing {} stream for {}, will abort the stream",
+            uri, reason, e);
+        shouldAbort = true;
       }
-      if (shouldAbort) {
-        // Abort, rather than just close, the underlying stream.  Otherwise, the
-        // remaining object payload is read from S3 while closing the stream.
-        LOG.debug("Aborting stream {}", uri);
-        try {
-          wrappedStream.abort();
-        } catch (Exception e) {
-          LOG.warn("When aborting {} stream after failing to close it for {}",
-              uri, reason, e);
-        }
-        streamStatistics.streamClose(true, remaining);
+    }
+    if (shouldAbort) {
+      // Abort, rather than just close, the underlying stream.  Otherwise, the
+      // remaining object payload is read from S3 while closing the stream.
+      LOG.debug("Aborting stream {}", uri);
+      try {
+        inner.abort();
+      } catch (Exception e) {
+        LOG.warn("When aborting {} stream after failing to close it for {}",
+            uri, reason, e);
       }
-      LOG.debug("Stream {} {}: {}; remaining={} streamPos={},"
-              + " nextReadPos={}," +
-              " request range {}-{} length={}",
-          uri, (shouldAbort ? "aborted" : "closed"), reason,
-          remaining, pos, nextReadPos,
-          contentRangeStart, contentRangeFinish,
-          length);
-    } finally {
-      wrappedStream = null;
-      object = null;
+      streamStatistics.streamClose(true, remaining);
     }
+    LOG.debug("Stream {} {}: {}; remaining={}",
+        uri, (shouldAbort ? "aborted" : "closed"), reason,
+        remaining);
+    return shouldAbort;
   }
 
   /**
@@ -648,17 +747,17 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    *
    * This is potentially very inefficient, and should only be invoked
    * in extreme circumstances. It logs at info for this reason.
+   *
+   * Blocks until the abort is completed.
+   *
    * @return true if the connection was actually reset.
    * @throws IOException if invoked on a closed stream.
    */
   @InterfaceStability.Unstable
   public synchronized boolean resetConnection() throws IOException {
     checkNotClosed();
-    if (isObjectStreamOpen()) {
-      LOG.info("Forced reset of connection to {}", uri);
-      closeStream("reset()", contentRangeFinish, true);
-    }
-    return isObjectStreamOpen();
+    LOG.info("Forcing reset of connection to {}", uri);
+    return awaitFuture(closeStream("reset()", true, true));
   }
 
   @Override
@@ -870,7 +969,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   @Override
   public synchronized void unbuffer() {
     try {
-      closeStream("unbuffer()", contentRangeFinish, false);
+      closeStream("unbuffer()", false, false);
     } finally {
       streamStatistics.unbuffered();
     }
@@ -918,6 +1017,14 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     @Retries.OnceRaw
     S3Object getObject(GetObjectRequest request);
 
+    /**
+     * Submit some asynchronous work, for example, draining a stream.
+     * @param operation operation to invoke
+     * @param <T> return type
+     * @return a future.
+     */
+    <T> CompletableFuture<T> submit(CallableRaisingIOE<T> operation);
+
   }
 
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 099d6442ca0..eec63667201 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
@@ -837,7 +838,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
               StreamStatisticNames.STREAM_READ_UNBUFFERED,
               StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)
           .withGauges(STREAM_READ_GAUGE_INPUT_POLICY)
-          .withDurationTracking(ACTION_HTTP_GET_REQUEST)
+          .withDurationTracking(ACTION_HTTP_GET_REQUEST,
+              StoreStatisticNames.ACTION_FILE_OPENED,
+              StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
+              StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED)
           .build();
       setIOStatistics(st);
       aborted = st.getCounterReference(
@@ -1271,6 +1275,12 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
       return trackDuration(ACTION_HTTP_GET_REQUEST);
     }
 
+    @Override
+    public DurationTracker initiateInnerStreamClose(final boolean abort) {
+      return trackDuration(abort
+          ? StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED
+          : StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED);
+    }
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
index 5ce3d96c435..f416cf9485d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
@@ -29,7 +29,7 @@ import javax.annotation.Nullable;
 
 import org.apache.hadoop.util.Preconditions;
 
-import static org.apache.hadoop.util.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 
 /**
  * Read-specific operation context struct.
@@ -44,19 +44,25 @@ public class S3AReadOpContext extends S3AOpContext {
   /**
    * Initial input policy of the stream.
    */
-  private final S3AInputPolicy inputPolicy;
+  private S3AInputPolicy inputPolicy;
 
   /**
    * How to detect and deal with the object being updated during read.
    */
-  private final ChangeDetectionPolicy changeDetectionPolicy;
+  private ChangeDetectionPolicy changeDetectionPolicy;
 
   /**
    * Readahead for GET operations/skip, etc.
    */
-  private final long readahead;
+  private long readahead;
 
-  private final AuditSpan auditSpan;
+  private AuditSpan auditSpan;
+
+  /**
+   * Threshold for stream reads to switch to
+   * asynchronous draining.
+   */
+  private long asyncDrainThreshold;
 
   /**
    * Instantiate.
@@ -65,31 +71,33 @@ public class S3AReadOpContext extends S3AOpContext {
    * @param stats Fileystem statistics (may be null)
    * @param instrumentation statistics context
    * @param dstFileStatus target file status
-   * @param inputPolicy the input policy
-   * @param changeDetectionPolicy change detection policy.
-   * @param readahead readahead for GET operations/skip, etc.
-   * @param auditSpan active audit
    */
   public S3AReadOpContext(
       final Path path,
       Invoker invoker,
       @Nullable FileSystem.Statistics stats,
       S3AStatisticsContext instrumentation,
-      FileStatus dstFileStatus,
-      S3AInputPolicy inputPolicy,
-      ChangeDetectionPolicy changeDetectionPolicy,
-      final long readahead,
-      final AuditSpan auditSpan) {
+      FileStatus dstFileStatus) {
 
     super(invoker, stats, instrumentation,
         dstFileStatus);
-    this.path = checkNotNull(path);
-    this.auditSpan = auditSpan;
+    this.path = requireNonNull(path);
+  }
+
+  /**
+   * validate the context.
+   * @return a read operation context ready for use.
+   */
+  public S3AReadOpContext build() {
+    requireNonNull(inputPolicy, "inputPolicy");
+    requireNonNull(changeDetectionPolicy, "changeDetectionPolicy");
+    requireNonNull(auditSpan, "auditSpan");
+    requireNonNull(inputPolicy, "inputPolicy");
     Preconditions.checkArgument(readahead >= 0,
         "invalid readahead %d", readahead);
-    this.inputPolicy = checkNotNull(inputPolicy);
-    this.changeDetectionPolicy = checkNotNull(changeDetectionPolicy);
-    this.readahead = readahead;
+    Preconditions.checkArgument(asyncDrainThreshold >= 0,
+        "invalid drainThreshold %d", asyncDrainThreshold);
+    return this;
   }
 
   /**
@@ -136,6 +144,61 @@ public class S3AReadOpContext extends S3AOpContext {
     return auditSpan;
   }
 
+  /**
+   * Set builder value.
+   * @param value new value
+   * @return the builder
+   */
+  public S3AReadOpContext withInputPolicy(final S3AInputPolicy value) {
+    inputPolicy = value;
+    return this;
+  }
+
+  /**
+   * Set builder value.
+   * @param value new value
+   * @return the builder
+   */
+  public S3AReadOpContext withChangeDetectionPolicy(
+      final ChangeDetectionPolicy value) {
+    changeDetectionPolicy = value;
+    return this;
+  }
+
+  /**
+   * Set builder value.
+   * @param value new value
+   * @return the builder
+   */
+  public S3AReadOpContext withReadahead(final long value) {
+    readahead = value;
+    return this;
+  }
+
+  /**
+   * Set builder value.
+   * @param value new value
+   * @return the builder
+   */
+  public S3AReadOpContext withAuditSpan(final AuditSpan value) {
+    auditSpan = value;
+    return this;
+  }
+
+  /**
+   * Set builder value.
+   * @param value new value
+   * @return the builder
+   */
+  public S3AReadOpContext withAsyncDrainThreshold(final long value) {
+    asyncDrainThreshold = value;
+    return this;
+  }
+
+  public long getAsyncDrainThreshold() {
+    return asyncDrainThreshold;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
index 275b207cd6d..0a0454854b2 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
 
 /**
- * This class holds attributed of an object independent of the
+ * This class holds attributes of an object independent of the
  * file status type.
  * It is used in {@link S3AInputStream} and the select equivalent.
  * as a way to reduce parameters being passed
@@ -44,6 +44,17 @@ public class S3ObjectAttributes {
   private final String versionId;
   private final long len;
 
+  /**
+   * Constructor.
+   * @param bucket s3 bucket
+   * @param path path
+   * @param key object key
+   * @param serverSideEncryptionAlgorithm current encryption algorithm
+   * @param serverSideEncryptionKey any server side encryption key?
+   * @param len object length
+   * @param eTag optional etag
+   * @param versionId optional version id
+   */
   public S3ObjectAttributes(
       String bucket,
       Path path,
@@ -70,7 +81,7 @@ public class S3ObjectAttributes {
    * @param copyResult copy result.
    * @param serverSideEncryptionAlgorithm current encryption algorithm
    * @param serverSideEncryptionKey any server side encryption key?
-   * @param len
+   * @param len object length
    */
   public S3ObjectAttributes(
       final Path path,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index ed16a7c4fd8..86cb18076cc 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -55,6 +55,10 @@ public enum Statistic {
       StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST,
       "HEAD request.",
       TYPE_DURATION),
+  ACTION_FILE_OPENED(
+      StoreStatisticNames.ACTION_FILE_OPENED,
+      "File opened.",
+      TYPE_DURATION),
   ACTION_HTTP_GET_REQUEST(
       StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
       "GET request.",
@@ -175,6 +179,10 @@ public enum Statistic {
       StoreStatisticNames.OP_OPEN,
       "Calls of open()",
       TYPE_COUNTER),
+  INVOCATION_OPENFILE(
+      StoreStatisticNames.OP_OPENFILE,
+      "Calls of openFile()",
+      TYPE_COUNTER),
   INVOCATION_RENAME(
       StoreStatisticNames.OP_RENAME,
       "Calls of rename()",
@@ -296,6 +304,15 @@ public enum Statistic {
       StreamStatisticNames.STREAM_READ_OPERATIONS,
       "Count of read() operations in an input stream",
       TYPE_COUNTER),
+  STREAM_READ_REMOTE_STREAM_ABORTED(
+      StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
+      "Duration of aborting a remote stream during stream IO",
+      TYPE_DURATION),
+  STREAM_READ_REMOTE_STREAM_CLOSED(
+      StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED,
+      "Duration of closing a remote stream during stream IO",
+      TYPE_DURATION),
+
   STREAM_READ_OPERATIONS_INCOMPLETE(
       StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE,
       "Count of incomplete read() operations in an input stream",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
index 0ca71f21aae..840cf8e0f23 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
@@ -267,7 +267,7 @@ public class CommitOperations extends AbstractStoreOperation
     List<Pair<LocatedFileStatus, IOException>> failures = new ArrayList<>(1);
     for (LocatedFileStatus status : statusList) {
       try {
-        commits.add(SinglePendingCommit.load(fs, status.getPath()));
+        commits.add(SinglePendingCommit.load(fs, status.getPath(), status));
       } catch (IOException e) {
         LOG.warn("Failed to load commit file {}", status.getPath(), e);
         failures.add(Pair.of(status, e));
@@ -350,10 +350,12 @@ public class CommitOperations extends AbstractStoreOperation
       LOG.debug("No files to abort under {}", pendingDir);
     }
     while (pendingFiles.hasNext()) {
-      Path pendingFile = pendingFiles.next().getPath();
+      LocatedFileStatus status = pendingFiles.next();
+      Path pendingFile = status.getPath();
       if (pendingFile.getName().endsWith(CommitConstants.PENDING_SUFFIX)) {
         try {
-          abortSingleCommit(SinglePendingCommit.load(fs, pendingFile));
+          abortSingleCommit(SinglePendingCommit.load(fs, pendingFile,
+              status));
         } catch (FileNotFoundException e) {
           LOG.debug("listed file already deleted: {}", pendingFile);
         } catch (IOException | IllegalArgumentException e) {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
index fd734102566..318896e2361 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a.commit.files;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
@@ -128,10 +129,7 @@ public class PendingSet extends PersistentCommitData
    */
   public static PendingSet load(FileSystem fs, Path path)
       throws IOException {
-    LOG.debug("Reading pending commits in file {}", path);
-    PendingSet instance = serializer().load(fs, path);
-    instance.validate();
-    return instance;
+    return load(fs, path, null);
   }
 
   /**
@@ -144,7 +142,25 @@ public class PendingSet extends PersistentCommitData
    */
   public static PendingSet load(FileSystem fs, FileStatus status)
       throws IOException {
-    return load(fs, status.getPath());
+    return load(fs, status.getPath(), status);
+  }
+
+  /**
+   * Load an instance from a file, then validate it.
+   * @param fs filesystem
+   * @param path path
+   * @param status status of file to load
+   * @return the loaded instance
+   * @throws IOException IO failure
+   * @throws ValidationFailure if the data is invalid
+   */
+  public static PendingSet load(FileSystem fs, Path path,
+      @Nullable FileStatus status)
+      throws IOException {
+    LOG.debug("Reading pending commits in file {}", path);
+    PendingSet instance = serializer().load(fs, path, status);
+    instance.validate();
+    return instance;
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
index 9bd37f19f2e..b53ef75d823 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a.commit.files;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.Serializable;
@@ -38,6 +39,7 @@ import org.apache.hadoop.util.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.commit.ValidationFailure;
@@ -152,7 +154,23 @@ public class SinglePendingCommit extends PersistentCommitData
    */
   public static SinglePendingCommit load(FileSystem fs, Path path)
       throws IOException {
-    SinglePendingCommit instance = serializer().load(fs, path);
+    return load(fs, path, null);
+  }
+
+  /**
+   * Load an instance from a file, then validate it.
+   * @param fs filesystem
+   * @param path path
+   * @param status status of file to load or null
+   * @return the loaded instance
+   * @throws IOException IO failure
+   * @throws ValidationFailure if the data is invalid
+   */
+  public static SinglePendingCommit load(FileSystem fs,
+      Path path,
+      @Nullable FileStatus status)
+      throws IOException {
+    SinglePendingCommit instance = serializer().load(fs, path, status);
     instance.filename = path.toString();
     instance.validate();
     return instance;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AbstractStoreOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AbstractStoreOperation.java
index 75d06e78729..3537755e790 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AbstractStoreOperation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AbstractStoreOperation.java
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.fs.s3a.impl;
 
-import org.apache.hadoop.fs.store.audit.AuditSpan;
+import javax.annotation.Nullable;
 
-import static org.apache.hadoop.util.Preconditions.checkNotNull;
+import org.apache.hadoop.fs.store.audit.AuditSpan;
 
 /**
  * Base class of operations in the store.
@@ -37,7 +37,7 @@ public abstract class AbstractStoreOperation {
   /**
    * Audit Span.
    */
-  private AuditSpan auditSpan;
+  private final AuditSpan auditSpan;
 
   /**
    * Constructor.
@@ -45,8 +45,11 @@ public abstract class AbstractStoreOperation {
    * stores it for later.
    * @param storeContext store context.
    */
-  protected AbstractStoreOperation(final StoreContext storeContext) {
-    this(storeContext, storeContext.getActiveAuditSpan());
+  protected AbstractStoreOperation(final @Nullable StoreContext storeContext) {
+    this(storeContext,
+        storeContext != null
+        ? storeContext.getActiveAuditSpan()
+        : null);
   }
 
   /**
@@ -54,10 +57,11 @@ public abstract class AbstractStoreOperation {
    * @param storeContext store context.
    * @param auditSpan active span
    */
-  protected AbstractStoreOperation(final StoreContext storeContext,
+  protected AbstractStoreOperation(
+      final @Nullable StoreContext storeContext,
       final AuditSpan auditSpan) {
-    this.storeContext = checkNotNull(storeContext);
-    this.auditSpan = checkNotNull(auditSpan);
+    this.storeContext = storeContext;
+    this.auditSpan = auditSpan;
   }
 
   /**
@@ -70,7 +74,7 @@ public abstract class AbstractStoreOperation {
 
   /**
    * Get the audit span this object was created with.
-   * @return the current span
+   * @return the current span or null
    */
   public AuditSpan getAuditSpan() {
     return auditSpan;
@@ -80,6 +84,8 @@ public abstract class AbstractStoreOperation {
    * Activate the audit span.
    */
   public void activateAuditSpan() {
-    auditSpan.activate();
+    if (auditSpan != null) {
+      auditSpan.activate();
+    }
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java
index 259738f9989..01562074192 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.store.audit.AuditSpan;
 import org.apache.hadoop.util.DurationInfo;
 
-import static org.apache.hadoop.fs.impl.FutureIOSupport.raiseInnerCause;
+import static org.apache.hadoop.util.functional.FutureIO.raiseInnerCause;
 
 /**
  * A bridge from Callable to Supplier; catching exceptions
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index 1e4a1262764..49b9feeb6f1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -18,16 +18,18 @@
 
 package org.apache.hadoop.fs.s3a.impl;
 
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.hadoop.classification.VisibleForTesting;
 
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.s3a.Constants;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
+
 /**
  * Internal constants private only to the S3A codebase.
  * Please don't refer to these outside of this module &amp; its tests.
@@ -89,11 +91,16 @@ public final class InternalConstants {
    * used becomes that of the select operation.
    */
   @InterfaceStability.Unstable
-  public static final Set<String> STANDARD_OPENFILE_KEYS =
-      Collections.unmodifiableSet(
-          new HashSet<>(
-              Arrays.asList(Constants.INPUT_FADVISE,
-                  Constants.READAHEAD_RANGE)));
+  public static final Set<String> S3A_OPENFILE_KEYS;
+
+  static {
+    Set<String> keys = Stream.of(
+        Constants.INPUT_FADVISE,
+        Constants.READAHEAD_RANGE)
+        .collect(Collectors.toSet());
+    keys.addAll(FS_OPTION_OPENFILE_STANDARD_OPTIONS);
+    S3A_OPENFILE_KEYS = Collections.unmodifiableSet(keys);
+  }
 
   /** 403 error code. */
   public static final int SC_403 = 403;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java
new file mode 100644
index 00000000000..70a99d5318c
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
+import org.apache.hadoop.fs.s3a.select.SelectConstants;
+
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys;
+import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Helper class for openFile() logic, especially processing file status
+ * args and length/etag/versionID.
+ * <p>
+ *  This got complex enough it merited removal from S3AFileSystem -which
+ *  also permits unit testing.
+ * </p>
+ * <p>
+ *   The default values are those from the FileSystem configuration.
+ *   in openFile(), they can all be changed by specific options;
+ *   in FileSystem.open(path, buffersize) only the buffer size is
+ *   set.
+ * </p>
+ */
+public class OpenFileSupport {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OpenFileSupport.class);
+
+  /**
+   * For use when a value of an split/file length is unknown.
+   */
+  private static final int LENGTH_UNKNOWN = -1;
+
+  /**  Default change detection policy. */
+  private final ChangeDetectionPolicy changePolicy;
+
+  /** Default read ahead range. */
+  private final long defaultReadAhead;
+
+  /** Username. */
+  private final String username;
+
+  /** Default buffer size. */
+  private final int defaultBufferSize;
+
+  /**
+   * Threshold for stream reads to switch to
+   * asynchronous draining.
+   */
+  private final long defaultAsyncDrainThreshold;
+
+  /**
+   * Default input policy; may be overridden in
+   * {@code openFile()}.
+   */
+  private final S3AInputPolicy defaultInputPolicy;
+
+  /**
+   * Instantiate with the default options from the filesystem.
+   * @param changePolicy change detection policy
+   * @param defaultReadAhead read ahead range
+   * @param username username
+   * @param defaultBufferSize buffer size
+   * @param defaultAsyncDrainThreshold drain threshold
+   * @param defaultInputPolicy input policy
+   */
+  public OpenFileSupport(
+      final ChangeDetectionPolicy changePolicy,
+      final long defaultReadAhead,
+      final String username,
+      final int defaultBufferSize,
+      final long defaultAsyncDrainThreshold,
+      final S3AInputPolicy defaultInputPolicy) {
+    this.changePolicy = changePolicy;
+    this.defaultReadAhead = defaultReadAhead;
+    this.username = username;
+    this.defaultBufferSize = defaultBufferSize;
+    this.defaultAsyncDrainThreshold = defaultAsyncDrainThreshold;
+    this.defaultInputPolicy = defaultInputPolicy;
+  }
+
+  public ChangeDetectionPolicy getChangePolicy() {
+    return changePolicy;
+  }
+
+  public long getDefaultReadAhead() {
+    return defaultReadAhead;
+  }
+
+  public int getDefaultBufferSize() {
+    return defaultBufferSize;
+  }
+
+  public long getDefaultAsyncDrainThreshold() {
+    return defaultAsyncDrainThreshold;
+  }
+
+  /**
+   * Propagate the default options to the operation context
+   * being built up.
+   * @param roc context
+   * @return the context
+   */
+  public S3AReadOpContext applyDefaultOptions(S3AReadOpContext roc) {
+    return roc
+        .withInputPolicy(defaultInputPolicy)
+        .withChangeDetectionPolicy(changePolicy)
+        .withAsyncDrainThreshold(defaultAsyncDrainThreshold)
+        .withReadahead(defaultReadAhead);
+  }
+
+  /**
+   * Prepare to open a file from the openFile parameters.
+   * @param path path to the file
+   * @param parameters open file parameters from the builder.
+   * @param blockSize for fileStatus
+   * @return open file options
+   * @throws IOException failure to resolve the link.
+   * @throws IllegalArgumentException unknown mandatory key
+   */
+  @SuppressWarnings("ChainOfInstanceofChecks")
+  public OpenFileInformation prepareToOpenFile(
+      final Path path,
+      final OpenFileParameters parameters,
+      final long blockSize) throws IOException {
+    Configuration options = parameters.getOptions();
+    Set<String> mandatoryKeys = parameters.getMandatoryKeys();
+    String sql = options.get(SelectConstants.SELECT_SQL, null);
+    boolean isSelect = sql != null;
+    // choice of keys depends on open type
+    if (isSelect) {
+      // S3 Select call adds a large set of supported mandatory keys
+      rejectUnknownMandatoryKeys(
+          mandatoryKeys,
+          InternalSelectConstants.SELECT_OPTIONS,
+          "for " + path + " in S3 Select operation");
+    } else {
+      rejectUnknownMandatoryKeys(
+          mandatoryKeys,
+          InternalConstants.S3A_OPENFILE_KEYS,
+          "for " + path + " in non-select file I/O");
+    }
+
+    // where does a read end?
+    long fileLength = LENGTH_UNKNOWN;
+
+    // was a status passed in via a withStatus() invocation in
+    // the builder API?
+    FileStatus providedStatus = parameters.getStatus();
+    S3AFileStatus fileStatus = null;
+    if (providedStatus != null) {
+      // there's a file status
+
+      // make sure the file name matches -the rest of the path
+      // MUST NOT be checked.
+      Path providedStatusPath = providedStatus.getPath();
+      checkArgument(path.getName().equals(providedStatusPath.getName()),
+          "Filename mismatch between file being opened %s and"
+              + " supplied filestatus %s",
+          path, providedStatusPath);
+
+      // make sure the status references a file
+      if (providedStatus.isDirectory()) {
+        throw new FileNotFoundException(
+            "Supplied status references a directory " + providedStatus);
+      }
+      // build up the values
+      long len = providedStatus.getLen();
+      long modTime = providedStatus.getModificationTime();
+      String versionId;
+      String eTag;
+      // can use this status to skip our own probes,
+      LOG.debug("File was opened with a supplied FileStatus;"
+              + " skipping getFileStatus call in open() operation: {}",
+          providedStatus);
+
+      // what type is the status (and hence: what information does it contain?)
+      if (providedStatus instanceof S3AFileStatus) {
+        // is it an S3AFileSystem status?
+        S3AFileStatus st = (S3AFileStatus) providedStatus;
+        versionId = st.getVersionId();
+        eTag = st.getEtag();
+      } else if (providedStatus instanceof S3ALocatedFileStatus) {
+
+        //  S3ALocatedFileStatus instance may supply etag and version.
+        S3ALocatedFileStatus st = (S3ALocatedFileStatus) providedStatus;
+        versionId = st.getVersionId();
+        eTag = st.getEtag();
+      } else {
+        // it is another type.
+        // build a status struct without etag or version.
+        LOG.debug("Converting file status {}", providedStatus);
+        versionId = null;
+        eTag = null;
+      }
+      // Construct a new file status with the real path of the file.
+      fileStatus = new S3AFileStatus(
+          len,
+          modTime,
+          path,
+          blockSize,
+          username,
+          eTag,
+          versionId);
+      // set the end of the read to the file length
+      fileLength = fileStatus.getLen();
+    }
+    // determine start and end of file.
+    long splitStart = options.getLong(FS_OPTION_OPENFILE_SPLIT_START, 0);
+
+    // split end
+    long splitEnd = options.getLong(FS_OPTION_OPENFILE_SPLIT_END,
+        LENGTH_UNKNOWN);
+    if (splitStart > 0 && splitStart > splitEnd) {
+      LOG.warn("Split start {} is greater than split end {}, resetting",
+          splitStart, splitEnd);
+      splitStart = 0;
+    }
+
+    // read end is the open file value
+    fileLength = options.getLong(FS_OPTION_OPENFILE_LENGTH, fileLength);
+
+    // if the read end has come from options, use that
+    // in creating a file status
+    if (fileLength >= 0 && fileStatus == null) {
+      fileStatus = createStatus(path, fileLength, blockSize);
+    }
+
+    // Build up the input policy.
+    // seek policy from default, s3a opt or standard option
+    // read from the FS standard option.
+    Collection<String> policies =
+        options.getStringCollection(FS_OPTION_OPENFILE_READ_POLICY);
+    if (policies.isEmpty()) {
+      // fall back to looking at the S3A-specific option.
+      policies = options.getStringCollection(INPUT_FADVISE);
+    }
+
+    return new OpenFileInformation()
+        .withS3Select(isSelect)
+        .withSql(sql)
+        .withAsyncDrainThreshold(
+            options.getLong(ASYNC_DRAIN_THRESHOLD,
+                defaultReadAhead))
+        .withBufferSize(
+            options.getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, defaultBufferSize))
+        .withChangePolicy(changePolicy)
+        .withFileLength(fileLength)
+        .withInputPolicy(
+            S3AInputPolicy.getFirstSupportedPolicy(policies, defaultInputPolicy))
+        .withReadAheadRange(
+            options.getLong(READAHEAD_RANGE, defaultReadAhead))
+        .withSplitStart(splitStart)
+        .withSplitEnd(splitEnd)
+        .withStatus(fileStatus)
+        .build();
+
+  }
+
+  /**
+   * Create a minimal file status.
+   * @param path path
+   * @param length file length/read end
+   * @param blockSize block size
+   * @return a new status
+   */
+  private S3AFileStatus createStatus(Path path, long length, long blockSize) {
+    return new S3AFileStatus(
+        length,
+        0,
+        path,
+        blockSize,
+        username,
+        null,
+        null);
+  }
+
+  /**
+   * Open a simple file, using all the default
+   * options.
+   * @return the parameters needed to open a file through
+   * {@code open(path, bufferSize)}.
+   * @param bufferSize  buffer size
+   */
+  public OpenFileInformation openSimpleFile(final int bufferSize) {
+    return new OpenFileInformation()
+        .withS3Select(false)
+        .withAsyncDrainThreshold(defaultAsyncDrainThreshold)
+        .withBufferSize(bufferSize)
+        .withChangePolicy(changePolicy)
+        .withFileLength(LENGTH_UNKNOWN)
+        .withInputPolicy(defaultInputPolicy)
+        .withReadAheadRange(defaultReadAhead)
+        .withSplitStart(0)
+        .withSplitEnd(LENGTH_UNKNOWN)
+        .build();
+  }
+
+  @Override
+  public String toString() {
+    return "OpenFileSupport{" +
+        "changePolicy=" + changePolicy +
+        ", defaultReadAhead=" + defaultReadAhead +
+        ", defaultBufferSize=" + defaultBufferSize +
+        ", defaultAsyncDrainThreshold=" + defaultAsyncDrainThreshold +
+        ", defaultInputPolicy=" + defaultInputPolicy +
+        '}';
+  }
+
+  /**
+   * The information on a file needed to open it.
+   */
+  public static final class OpenFileInformation {
+
+    /** Is this SQL? */
+    private boolean isS3Select;
+
+    /** File status; may be null. */
+    private S3AFileStatus status;
+
+    /** SQL string if this is a SQL select file. */
+    private String sql;
+
+    /** Active input policy. */
+    private S3AInputPolicy inputPolicy;
+
+    /** Change detection policy. */
+    private ChangeDetectionPolicy changePolicy;
+
+    /** Read ahead range. */
+    private long readAheadRange;
+
+    /** Buffer size. Currently ignored. */
+    private int bufferSize;
+
+    /**
+     * Where does the read start from. 0 unless known.
+     */
+    private long splitStart;
+
+    /**
+     * What is the split end?
+     * Negative if not known.
+     */
+    private long splitEnd = -1;
+
+    /**
+     * What is the file length?
+     * Negative if not known.
+     */
+    private long fileLength = -1;
+
+    /**
+     * Threshold for stream reads to switch to
+     * asynchronous draining.
+     */
+    private long asyncDrainThreshold;
+
+    /**
+     * Constructor.
+     */
+    public OpenFileInformation() {
+    }
+
+    /**
+     * Build.
+     * @return this object
+     */
+    public OpenFileInformation build() {
+      return this;
+    }
+
+    public boolean isS3Select() {
+      return isS3Select;
+    }
+
+    public S3AFileStatus getStatus() {
+      return status;
+    }
+
+    public String getSql() {
+      return sql;
+    }
+
+    public S3AInputPolicy getInputPolicy() {
+      return inputPolicy;
+    }
+
+    public ChangeDetectionPolicy getChangePolicy() {
+      return changePolicy;
+    }
+
+    public long getReadAheadRange() {
+      return readAheadRange;
+    }
+
+    public int getBufferSize() {
+      return bufferSize;
+    }
+
+    public long getSplitStart() {
+      return splitStart;
+    }
+
+    public long getSplitEnd() {
+      return splitEnd;
+    }
+
+    @Override
+    public String toString() {
+      return "OpenFileInformation{" +
+          "isSql=" + isS3Select +
+          ", status=" + status +
+          ", sql='" + sql + '\'' +
+          ", inputPolicy=" + inputPolicy +
+          ", changePolicy=" + changePolicy +
+          ", readAheadRange=" + readAheadRange +
+          ", splitStart=" + splitStart +
+          ", splitEnd=" + splitEnd +
+          ", bufferSize=" + bufferSize +
+          ", drainThreshold=" + asyncDrainThreshold +
+          '}';
+    }
+
+    /**
+     * Get the file length.
+     * @return the file length; -1 if not known.
+     */
+    public long getFileLength() {
+      return fileLength;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public OpenFileInformation withS3Select(final boolean value) {
+      isS3Select = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public OpenFileInformation withStatus(final S3AFileStatus value) {
+      status = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public OpenFileInformation withSql(final String value) {
+      sql = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public OpenFileInformation withInputPolicy(final S3AInputPolicy value) {
+      inputPolicy = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public OpenFileInformation withChangePolicy(final ChangeDetectionPolicy value) {
+      changePolicy = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public OpenFileInformation withReadAheadRange(final long value) {
+      readAheadRange = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public OpenFileInformation withBufferSize(final int value) {
+      bufferSize = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public OpenFileInformation withSplitStart(final long value) {
+      splitStart = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public OpenFileInformation withSplitEnd(final long value) {
+      splitEnd = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public OpenFileInformation withFileLength(final long value) {
+      fileLength = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public OpenFileInformation withAsyncDrainThreshold(final long value) {
+      asyncDrainThreshold = value;
+      return this;
+    }
+
+    /**
+     * Propagate the options to the operation context
+     * being built up.
+     * @param roc context
+     * @return the context
+     */
+    public S3AReadOpContext applyOptions(S3AReadOpContext roc) {
+      return roc
+          .withInputPolicy(inputPolicy)
+          .withChangeDetectionPolicy(changePolicy)
+          .withAsyncDrainThreshold(asyncDrainThreshold)
+          .withReadahead(readAheadRange);
+    }
+
+  }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index 1f8be586750..1d52b0a34ea 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.MultipartUtils;
@@ -426,7 +427,8 @@ public abstract class S3GuardTool extends Configured implements Tool,
       String encryption =
           printOption(out, "\tEncryption", Constants.S3_ENCRYPTION_ALGORITHM,
               "none");
-      printOption(out, "\tInput seek policy", INPUT_FADVISE, INPUT_FADV_NORMAL);
+      printOption(out, "\tInput seek policy", INPUT_FADVISE,
+          Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT);
       printOption(out, "\tChange Detection Source", CHANGE_DETECT_SOURCE,
           CHANGE_DETECT_SOURCE_DEFAULT);
       printOption(out, "\tChange Detection Mode", CHANGE_DETECT_MODE,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java
index 4f387d8ccfa..fbf5226afb8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/InternalSelectConstants.java
@@ -71,7 +71,7 @@ public final class InternalSelectConstants {
         CSV_OUTPUT_QUOTE_FIELDS,
         CSV_OUTPUT_RECORD_DELIMITER
     ));
-    options.addAll(InternalConstants.STANDARD_OPENFILE_KEYS);
+    options.addAll(InternalConstants.S3A_OPENFILE_KEYS);
     SELECT_OPTIONS = Collections.unmodifiableSet(options);
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectTool.java
index 73a08750053..7a6c1afdc1f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/select/SelectTool.java
@@ -39,12 +39,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
 import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool;
 import org.apache.hadoop.fs.shell.CommandFormat;
 import org.apache.hadoop.util.DurationInfo;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.OperationDuration;
+import org.apache.hadoop.util.functional.FutureIO;
 
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@@ -261,7 +261,7 @@ public class SelectTool extends S3GuardTool {
     FSDataInputStream stream;
     try(DurationInfo ignored =
             new DurationInfo(LOG, "Selecting stream")) {
-      stream = FutureIOSupport.awaitFuture(builder.build());
+      stream = FutureIO.awaitFuture(builder.build());
     } catch (FileNotFoundException e) {
       // the source file is missing.
       throw notFound(e);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
index 328d9f7c4ce..539af2bde36 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
@@ -188,4 +188,11 @@ public interface S3AInputStreamStatistics extends AutoCloseable,
    */
   DurationTracker initiateGetRequest();
 
+  /**
+   * Initiate a stream close/abort.
+   * @param abort was the stream aborted?
+   * @return duration tracker;
+   */
+  DurationTracker initiateInnerStreamClose(boolean abort);
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
index 58bf60ec3ab..f618270798e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
@@ -337,6 +337,10 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
       return stubDurationTracker();
     }
 
+    @Override
+    public DurationTracker initiateInnerStreamClose(final boolean abort) {
+      return stubDurationTracker();
+    }
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index df08a969e95..f4a2f036ce3 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1017,6 +1017,14 @@ options are covered in [Testing](./testing.md).
   any call to setReadahead() is made to an open stream.</description>
 </property>
 
+<property>
+  <name>fs.s3a.input.async.drain.threshold</name>
+  <value>64K</value>
+  <description>Bytes to read ahead during a seek() before closing and
+  re-opening the S3 HTTP connection. This option will be overridden if
+  any call to setReadahead() is made to an open stream.</description>
+</property>
+
 <property>
   <name>fs.s3a.list.version</name>
   <value>2</value>
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java
index 4765fa8e8d7..82a7a3c63b3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java
@@ -18,10 +18,22 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.io.FileNotFoundException;
+
+import org.junit.Test;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
 /**
  * S3A contract tests opening files.
  */
@@ -40,4 +52,59 @@ public class ITestS3AContractOpen extends AbstractContractOpenTest {
   protected boolean areZeroByteFilesEncrypted() {
     return true;
   }
+
+  @Test
+  public void testOpenFileApplyReadBadName() throws Throwable {
+    describe("use the apply sequence to read a whole file");
+    Path path = methodPath();
+    FileSystem fs = getFileSystem();
+    touch(fs, path);
+    FileStatus st = fs.getFileStatus(path);
+    // The final element of the path is different, so
+    // openFile must fail
+    FileStatus st2 = new FileStatus(
+        0, false,
+        st.getReplication(),
+        st.getBlockSize(),
+        st.getModificationTime(),
+        st.getAccessTime(),
+        st.getPermission(),
+        st.getOwner(),
+        st.getGroup(),
+        new Path("gopher:///localhost/something.txt"));
+    intercept(IllegalArgumentException.class, () ->
+        fs.openFile(path)
+            .withFileStatus(st2)
+            .build());
+  }
+
+  /**
+   * Pass in a directory reference and expect the openFile call
+   * to fail.
+   */
+  @Test
+  public void testOpenFileDirectory() throws Throwable {
+    describe("Change the status to a directory");
+    Path path = methodPath();
+    FileSystem fs = getFileSystem();
+    int len = 4096;
+    createFile(fs, path, true,
+        dataset(len, 0x40, 0x80));
+    FileStatus st = fs.getFileStatus(path);
+    FileStatus st2 = new FileStatus(
+        len, true,
+        st.getReplication(),
+        st.getBlockSize(),
+        st.getModificationTime(),
+        st.getAccessTime(),
+        st.getPermission(),
+        st.getOwner(),
+        st.getGroup(),
+        path);
+    intercept(FileNotFoundException.class, () ->
+        fs.openFile(path)
+            .withFileStatus(st2)
+            .build());
+  }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java
index ba07ab24002..dd41583de3f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java
@@ -44,11 +44,11 @@ import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
 import org.apache.hadoop.util.NativeCodeLoader;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
 import static org.apache.hadoop.util.Preconditions.checkNotNull;
 import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
-import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_NORMAL;
-import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_RANDOM;
-import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL;
 import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
 import static org.apache.hadoop.fs.s3a.Constants.SSL_CHANNEL_MODE;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.FS_S3A_IMPL_DISABLE_CACHE;
@@ -87,9 +87,9 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest {
   @Parameterized.Parameters
   public static Collection<Object[]> params() {
     return Arrays.asList(new Object[][]{
-        {INPUT_FADV_SEQUENTIAL, Default_JSSE},
-        {INPUT_FADV_RANDOM, OpenSSL},
-        {INPUT_FADV_NORMAL, Default_JSSE_with_GCM},
+        {FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, Default_JSSE},
+        {FS_OPTION_OPENFILE_READ_POLICY_RANDOM, OpenSSL},
+        {FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, Default_JSSE_with_GCM},
     });
   }
 
@@ -215,7 +215,8 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest {
   public void testReadPolicyInFS() throws Throwable {
     describe("Verify the read policy is being consistently set");
     S3AFileSystem fs = getFileSystem();
-    assertEquals(S3AInputPolicy.getPolicy(seekPolicy), fs.getInputPolicy());
+    assertEquals(S3AInputPolicy.getPolicy(seekPolicy, S3AInputPolicy.Normal),
+        fs.getInputPolicy());
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
index 7cd60cdd4da..a80a24881fa 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
@@ -70,6 +70,10 @@ public abstract class AbstractS3AMockTest {
     // use minimum multipart size for faster triggering
     conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
     conf.setInt(Constants.S3A_BUCKET_PROBE, 1);
+    // this is so stream draining is always blocking, allowing
+    // assertions to be safely made without worrying
+    // about any race conditions
+    conf.setInt(ASYNC_DRAIN_THRESHOLD, Integer.MAX_VALUE);
     return conf;
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
index c2b3aab0782..61d12747c0a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
@@ -456,17 +456,6 @@ public class ITestS3AConfiguration {
         tmp1.getParent(), tmp2.getParent());
   }
 
-  @Test
-  public void testReadAheadRange() throws Exception {
-    conf = new Configuration();
-    conf.set(Constants.READAHEAD_RANGE, "300K");
-    fs = S3ATestUtils.createTestFileSystem(conf);
-    assertNotNull(fs);
-    long readAheadRange = fs.getReadAheadRange();
-    assertNotNull(readAheadRange);
-    assertEquals("Read Ahead Range Incorrect.", 300 * 1024, readAheadRange);
-  }
-
   @Test
   public void testUsernameFromUGI() throws Throwable {
     final String alice = "alice";
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index d965e6e57a2..e20de5edf06 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
 import org.apache.hadoop.util.DurationInfo;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.functional.CallableRaisingIOE;
+import org.apache.hadoop.util.functional.FutureIO;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
 import org.assertj.core.api.Assertions;
@@ -91,7 +92,6 @@ import static org.apache.hadoop.util.Preconditions.checkNotNull;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
-import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.buildEncryptionSecrets;
@@ -1257,7 +1257,7 @@ public final class S3ATestUtils {
             .withFileStatus(status)
             .build();
 
-    try (FSDataInputStream in = awaitFuture(future)) {
+    try (FSDataInputStream in = FutureIO.awaitFuture(future)) {
       byte[] buf = new byte[(int) status.getLen()];
       in.readFully(0, buf);
       return new String(buf);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
index db5b5b56851..62f5bff35c4 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
@@ -22,6 +22,7 @@ import javax.net.ssl.SSLException;
 import java.io.IOException;
 import java.net.SocketException;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
 
 import com.amazonaws.SdkClientException;
 import com.amazonaws.services.s3.model.GetObjectRequest;
@@ -34,9 +35,10 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
 import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
-import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
 
 import static java.lang.Math.min;
+import static org.apache.hadoop.util.functional.FutureIO.eval;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
@@ -54,7 +56,6 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
   @Test
   public void testInputStreamReadRetryForException() throws IOException {
     S3AInputStream s3AInputStream = getMockedS3AInputStream();
-
     assertEquals("'a' from the test input stream 'ab' should be the first " +
         "character being read", INPUT.charAt(0), s3AInputStream.read());
     assertEquals("'b' from the test input stream 'ab' should be the second " +
@@ -103,13 +104,14 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
         INPUT.length());
 
     S3AReadOpContext s3AReadOpContext = fs.createReadContext(
-        s3AFileStatus, S3AInputPolicy.Normal,
-        ChangeDetectionPolicy.getPolicy(fs.getConf()), 100, NoopSpan.INSTANCE);
+        s3AFileStatus,
+        NoopSpan.INSTANCE);
 
     return new S3AInputStream(
         s3AReadOpContext,
         s3ObjectAttributes,
-        getMockedInputStreamCallback());
+        getMockedInputStreamCallback(),
+        s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics());
   }
 
   /**
@@ -151,6 +153,11 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
         return new GetObjectRequest(fs.getBucket(), key);
       }
 
+      @Override
+      public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) {
+        return eval(operation);
+      }
+
       @Override
       public void close() {
       }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
index c858c9933fc..204f1aa0939 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
@@ -33,6 +33,7 @@ import java.util.Date;
 import static org.junit.Assert.assertEquals;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -59,6 +60,8 @@ public class TestS3AUnbuffer extends AbstractS3AMockTest {
     // Create mock S3ObjectInputStream and S3Object for open()
     S3ObjectInputStream objectStream = mock(S3ObjectInputStream.class);
     when(objectStream.read()).thenReturn(-1);
+    when(objectStream.read(any(byte[].class))).thenReturn(-1);
+    when(objectStream.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
 
     S3Object s3Object = mock(S3Object.class);
     when(s3Object.getObjectContent()).thenReturn(objectStream);
@@ -67,7 +70,7 @@ public class TestS3AUnbuffer extends AbstractS3AMockTest {
 
     // Call read and then unbuffer
     FSDataInputStream stream = fs.open(path);
-    assertEquals(0, stream.read(new byte[8])); // mocks read 0 bytes
+    assertEquals(-1, stream.read(new byte[8])); // mocks read 0 bytes
     stream.unbuffer();
 
     // Verify that unbuffer closed the object stream
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestStreamChangeTracker.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestStreamChangeTracker.java
index 4d3930fbc3c..42de7cdffc8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestStreamChangeTracker.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestStreamChangeTracker.java
@@ -79,7 +79,7 @@ public class TestStreamChangeTracker extends HadoopTestBase {
   public void testVersionCheckingHandlingNoVersionsVersionRequired()
       throws Throwable {
     LOG.info("If an endpoint doesn't return versions but we are configured to"
-        + "require them");
+        + " require them");
     ChangeTracker tracker = newTracker(
         ChangeDetectionPolicy.Mode.Client,
         ChangeDetectionPolicy.Source.VersionId,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java
new file mode 100644
index 00000000000..17f210dd586
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testUnknownSeekPolicyS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "undefined")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+  }
+
+  /**
+   * The S3A option also supports a list of values.
+   */
+  @Test
+  public void testSeekPolicyListS3AOption() throws Throwable {
+    // fall back to the second seek policy if the first is unknown
+    assertOpenFile(INPUT_FADVISE, "hbase, random")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  /**
+   * Verify that if a list of policies is supplied in a configuration,
+   * the first recognized policy will be adopted.
+   */
+  @Test
+  public void testSeekPolicyExtractionFromList() throws Throwable {
+    String plist = "a, b, RandOm, other ";
+    Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
+    Collection<String> options = conf.getTrimmedStringCollection(
+        FS_OPTION_OPENFILE_READ_POLICY);
+    Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, null))
+        .describedAs("Policy from " + plist)
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  @Test
+  public void testAdaptiveSeekPolicyRecognized() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
+        .describedAs("adaptive")
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  @Test
+  public void testUnknownSeekPolicyFallback() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null))
+        .describedAs("unknown policy")
+        .isNull();
+  }
+
+  /**
+   * Test the mapping of the standard option names.
+   */
+  @Test
+  public void testInputPolicyMapping() throws Throwable {
+    Object[][] policyMapping = {
+        {"normal", S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_RANDOM, S3AInputPolicy.Random},
+        {FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, S3AInputPolicy.Sequential},
+    };
+    for (Object[] mapping : policyMapping) {
+      String name = (String) mapping[0];
+      Assertions.assertThat(S3AInputPolicy.getPolicy(name, null))
+          .describedAs("Policy %s", name)
+          .isEqualTo(mapping[1]);
+    }
+  }
+
+  /**
+   * Verify readahead range is picked up.
+   */
+  @Test
+  public void testReadahead() throws Throwable {
+    // readahead range option
+    assertOpenFile(READAHEAD_RANGE, "4096")
+        .extracting(f -> f.getReadAheadRange())
+        .isEqualTo(4096L);
+  }
+
+  /**
+   * Verify buffer size  is picked up.
+   */
+  @Test
+  public void testBufferSize() throws Throwable {
+    // readahead range option
+    assertOpenFile(FS_OPTION_OPENFILE_BUFFER_SIZE, "4096")
+        .extracting(f -> f.getBufferSize())
+        .isEqualTo(4096);
+  }
+
+  @Test
+  public void testStatusWithValidFilename() throws Throwable {
+    Path p = new Path("file:///tmp/" + TESTPATH.getName());
+    ObjectAssert<OpenFileSupport.OpenFileInformation> asst =
+        assertFileInfo(prepareToOpenFile(
+            params(FS_OPTION_OPENFILE_LENGTH, "32")
+                .withStatus(status(p, 4096))));
+    asst.extracting(f -> f.getStatus().getVersionId())
+        .isEqualTo("version");
+    asst.extracting(f -> f.getStatus().getEtag())
+        .isEqualTo("etag");
+    asst.extracting(f -> f.getStatus().getLen())
+        .isEqualTo(4096L);
+  }
+
+  /**
+   * Verify S3ALocatedFileStatus is handled.
+   */
+  @Test
+  public void testLocatedStatus() throws Throwable {
+    Path p = new Path("file:///tmp/" + TESTPATH.getName());
+    ObjectAssert<OpenFileSupport.OpenFileInformation> asst =
+        assertFileInfo(
+            prepareToOpenFile(
+                params(FS_OPTION_OPENFILE_LENGTH, "32")
+                    .withStatus(
+                        new S3ALocatedFileStatus(
+                            status(p, 4096), null))));
+    asst.extracting(f -> f.getStatus().getVersionId())
+        .isEqualTo("version");
+    asst.extracting(f -> f.getStatus().getEtag())
+        .isEqualTo("etag");
+    asst.extracting(f -> f.getStatus().getLen())
+        .isEqualTo(4096L);
+  }
+
+  /**
+   * Callers cannot supply a directory status when opening a file.
+   */
+  @Test
+  public void testDirectoryStatus() throws Throwable {
+    intercept(FileNotFoundException.class, TESTFILE, () ->
+        prepareToOpenFile(
+            params(INPUT_FADVISE, "normal")
+                .withStatus(new S3AFileStatus(true, TESTPATH, USERNAME))));
+  }
+
+  /**
+   * File name must match the path argument to openFile().
+   */
+  @Test
+  public void testStatusWithInconsistentFilename() throws Throwable {
+    intercept(IllegalArgumentException.class, TESTFILE, () ->
+        prepareToOpenFile(params(INPUT_FADVISE, "normal")
+            .withStatus(new S3AFileStatus(true,
+                new Path(TESTFILE + "-"), USERNAME))));
+  }
+
+  /**
+   * Prepare to open a file with the set of parameters.
+   * @param parameters open a file
+   * @return
+   * @throws IOException
+   */
+  public OpenFileSupport.OpenFileInformation prepareToOpenFile(
+      final OpenFileParameters parameters)
+      throws IOException {
+    return PREPARE.prepareToOpenFile(TESTPATH,
+        parameters,
+        IO_FILE_BUFFER_SIZE_DEFAULT
+    );
+  }
+
+  /**
+   * If a file length option is set, a file status
+   * is created.
+   */
+  @Test
+  public void testFileLength() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation> asst =
+        assertFileInfo(prepareToOpenFile(
+            params(FS_OPTION_OPENFILE_LENGTH, "8192")
+                .withStatus(null)));
+    asst.extracting(f -> f.getStatus())
+        .isNotNull();
+    asst.extracting(f -> f.getStatus().getPath())
+        .isEqualTo(TESTPATH);
+    asst.extracting(f -> f.getStatus().getLen())
+        .isEqualTo(8192L);
+  }
+
+  /**
+   * Verify that setting the split end sets the length.
+   * By passing in a value greater than the size of an int,
+   * the test verifies that the long is passed everywhere.
+   */
+  @Test
+  public void testSplitEndSetsLength() throws Throwable {
+    long bigFile = 2L ^ 34;
+    assertOpenFile(FS_OPTION_OPENFILE_SPLIT_END, Long.toString(bigFile))
+        .matches(p -> p.getSplitEnd() == bigFile, "split end")
+        .matches(p -> p.getFileLength() == -1, "file length")
+        .matches(p -> p.getStatus() == null, "status");
+  }
+
+  /**
+   * Semantics of split and length. Split end can only be safely treated
+   * as a hint unless the codec is known (how?) that it will never
+   * read past it.
+   */
+  @Test
+  public void testSplitEndAndLength() throws Throwable {
+    long splitEnd = 256;
+    long len = 8192;
+    Configuration conf = conf(FS_OPTION_OPENFILE_LENGTH,
+        Long.toString(len));
+    conf.setLong(FS_OPTION_OPENFILE_SPLIT_END, splitEnd);
+    conf.setLong(FS_OPTION_OPENFILE_SPLIT_START, 1024);
+    Set<String> s = new HashSet<>();
+    Collections.addAll(s,
+        FS_OPTION_OPENFILE_SPLIT_START,
+        FS_OPTION_OPENFILE_SPLIT_END,
+        FS_OPTION_OPENFILE_LENGTH);
+    assertFileInfo(prepareToOpenFile(
+        new OpenFileParameters()
+            .withMandatoryKeys(s)
+            .withOptions(conf)))
+        .matches(p -> p.getSplitStart() == 0, "split start")
+        .matches(p -> p.getSplitEnd() == splitEnd, "split end")
+        .matches(p -> p.getStatus().getLen() == len, "file length");
+  }
+
+  /**
+   * Create an S3A status entry with stub etag and versions, timestamp of 0.
+   * @param path status path
+   * @param length file length
+   * @return a status instance.
+   */
+  private S3AFileStatus status(final Path path, final int length) {
+    return new S3AFileStatus(length, 0,
+        path, 0, "", "etag", "version");
+  }
+
+  /**
+   * Create an instance of {@link OpenFileParameters} with
+   * the key as a mandatory parameter.
+   * @param key mandatory key
+   * @param val value
+   * @return the instance.
+   */
+  private OpenFileParameters params(final String key, final String val) {
+    return new OpenFileParameters()
+        .withMandatoryKeys(singleton(key))
+        .withOptions(conf(key, val));
+  }
+
+  /**
+   * Create a configuration with a single entry.
+   * @param key entry key
+   * @param val entry value
+   * @return a configuration
+   */
+  private Configuration conf(String key, Object val) {
+    Configuration c = new Configuration(false);
+    c.set(key, val.toString());
+    return c;
+  }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
new file mode 100644
index 00000000000..b0ee531112b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+
+import java.io.EOFException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
+import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Cost of openFile().
+ */
+public class ITestS3AOpenCost extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AOpenCost.class);
+
+  private Path testFile;
+
+  private FileStatus testFileStatus;
+
+  private long fileLength;
+
+  public ITestS3AOpenCost() {
+    super(true);
+  }
+
+  /**
+   * Setup creates a test file, saves is status and length
+   * to fields.
+   */
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    S3AFileSystem fs = getFileSystem();
+    testFile = methodPath();
+
+    writeTextFile(fs, testFile, "openfile", true);
+    testFileStatus = fs.getFileStatus(testFile);
+    fileLength = testFileStatus.getLen();
+  }
+
+  /**
+   * Test when openFile() performs GET requests when file status
+   * and length options are passed down.
+   * Note that the input streams only update the FS statistics
+   * in close(), so metrics cannot be verified until all operations
+   * on a stream are complete.
+   * This is slightly less than ideal.
+   */
+  @Test
+  public void testOpenFileWithStatusOfOtherFS() throws Throwable {
+    describe("Test cost of openFile with/without status; raw only");
+    S3AFileSystem fs = getFileSystem();
+
+    // now read that file back in using the openFile call.
+    // with a new FileStatus and a different path.
+    // this verifies that any FileStatus class/subclass is used
+    // as a source of the file length.
+    FileStatus st2 = new FileStatus(
+        fileLength, false,
+        testFileStatus.getReplication(),
+        testFileStatus.getBlockSize(),
+        testFileStatus.getModificationTime(),
+        testFileStatus.getAccessTime(),
+        testFileStatus.getPermission(),
+        testFileStatus.getOwner(),
+        testFileStatus.getGroup(),
+        new Path("gopher:///localhost/" + testFile.getName()));
+
+    // no IO in open
+    FSDataInputStream in = verifyMetrics(() ->
+            fs.openFile(testFile)
+                .withFileStatus(st2)
+                .build()
+                .get(),
+        always(NO_HEAD_OR_LIST),
+        with(STREAM_READ_OPENED, 0));
+
+    // the stream gets opened during read
+    long readLen = verifyMetrics(() ->
+            readStream(in),
+        always(NO_HEAD_OR_LIST),
+        with(STREAM_READ_OPENED, 1));
+    assertEquals("bytes read from file", fileLength, readLen);
+  }
+
+  @Test
+  public void testOpenFileShorterLength() throws Throwable {
+    // do a second read with the length declared as short.
+    // we now expect the bytes read to be shorter.
+    S3AFileSystem fs = getFileSystem();
+
+    S3ATestUtils.MetricDiff bytesDiscarded =
+        new S3ATestUtils.MetricDiff(fs, STREAM_READ_BYTES_READ_CLOSE);
+    int offset = 2;
+    long shortLen = fileLength - offset;
+    // open the file
+    FSDataInputStream in2 = verifyMetrics(() ->
+            fs.openFile(testFile)
+                .must(FS_OPTION_OPENFILE_READ_POLICY,
+                    FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+                .opt(FS_OPTION_OPENFILE_LENGTH, shortLen)
+                .build()
+                .get(),
+        always(NO_HEAD_OR_LIST),
+        with(STREAM_READ_OPENED, 0));
+
+    // verify that the statistics are in range
+    IOStatistics ioStatistics = extractStatistics(in2);
+    Object statsString = demandStringifyIOStatistics(ioStatistics);
+    LOG.info("Statistics of open stream {}", statsString);
+    verifyStatisticCounterValue(ioStatistics, ACTION_FILE_OPENED, 1);
+    // no network IO happened, duration is 0. There's a very small
+    // risk of some delay making it positive just from scheduling delays
+    assertDurationRange(ioStatistics, ACTION_FILE_OPENED, 0, 0);
+    // now read it
+    long r2 = verifyMetrics(() ->
+            readStream(in2),
+        always(NO_HEAD_OR_LIST),
+        with(STREAM_READ_OPENED, 1),
+        with(STREAM_READ_BYTES_READ_CLOSE, 0),
+        with(STREAM_READ_SEEK_BYTES_SKIPPED, 0));
+
+    LOG.info("Statistics of read stream {}", statsString);
+
+    assertEquals("bytes read from file", shortLen, r2);
+    // no bytes were discarded.
+    bytesDiscarded.assertDiffEquals(0);
+  }
+
+  @Test
+  public void testOpenFileLongerLength() throws Throwable {
+    // do a second read with the length declared as longer
+    // than it is.
+    // An EOF will be read on readFully(), -1 on a read()
+
+    S3AFileSystem fs = getFileSystem();
+    // set a length past the actual file length
+    long longLen = fileLength + 10;
+    FSDataInputStream in3 = verifyMetrics(() ->
+            fs.openFile(testFile)
+                .must(FS_OPTION_OPENFILE_READ_POLICY,
+                    FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+                .must(FS_OPTION_OPENFILE_LENGTH, longLen)
+                .build()
+                .get(),
+        always(NO_HEAD_OR_LIST));
+
+    // assert behaviors of seeking/reading past the file length.
+    // there is no attempt at recovery.
+    verifyMetrics(() -> {
+      byte[] out = new byte[(int) longLen];
+      intercept(EOFException.class,
+          () -> in3.readFully(0, out));
+      in3.seek(longLen - 1);
+      assertEquals("read past real EOF on " + in3,
+          -1, in3.read());
+      in3.close();
+      return in3.toString();
+    },
+        // two GET calls were made, one for readFully,
+        // the second on the read() past the EOF
+        // the operation has got as far as S3
+        with(STREAM_READ_OPENED, 2));
+
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java
index 7ae60a8c7d8..03bc10f86cd 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java
@@ -62,6 +62,12 @@ public final class OperationCost {
   public static final OperationCost NO_IO =
       new OperationCost(0, 0);
 
+  /**
+   * More detailed description of the NO_IO cost.
+   */
+  public static final OperationCost NO_HEAD_OR_LIST =
+      NO_IO;
+
   /** A HEAD operation. */
   public static final OperationCost HEAD_OPERATION = new OperationCost(1, 0);
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
index 27798e21ed2..d73a938bcce 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
@@ -41,7 +42,6 @@ import org.apache.hadoop.util.LineReader;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -50,6 +50,9 @@ import org.slf4j.LoggerFactory;
 import java.io.EOFException;
 import java.io.IOException;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
@@ -57,20 +60,22 @@ import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatSt
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMaximumStatistic;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
-import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
 import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString;
 import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
 import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
 import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
 import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
 import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MIN;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 
 /**
- * Look at the performance of S3a operations.
+ * Look at the performance of S3a Input Stream Reads.
  */
 public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
   private static final Logger LOG = LoggerFactory.getLogger(
       ITestS3AInputStreamPerformance.class);
+  private static final int READAHEAD_128K = 128 * _1KB;
 
   private S3AFileSystem s3aFS;
   private Path testData;
@@ -128,14 +133,16 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
     describe("cleanup");
     IOUtils.closeStream(in);
     if (in != null) {
+      final IOStatistics stats = in.getIOStatistics();
       LOG.info("Stream statistics {}",
-          ioStatisticsSourceToString(in));
-      IOSTATS.aggregate(in.getIOStatistics());
+          ioStatisticsToPrettyString(stats));
+      IOSTATS.aggregate(stats);
     }
     if (s3aFS != null) {
+      final IOStatistics stats = s3aFS.getIOStatistics();
       LOG.info("FileSystem statistics {}",
-          ioStatisticsSourceToString(s3aFS));
-      FILESYSTEM_IOSTATS.aggregate(s3aFS.getIOStatistics());
+          ioStatisticsToPrettyString(stats));
+      FILESYSTEM_IOSTATS.aggregate(stats);
       IOUtils.closeStream(s3aFS);
     }
   }
@@ -177,7 +184,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
   FSDataInputStream openTestFile(S3AInputPolicy inputPolicy, long readahead)
       throws IOException {
     requireCSVTestData();
-    return openDataFile(s3aFS, this.testData, inputPolicy, readahead);
+    return openDataFile(s3aFS, testData, inputPolicy, readahead, testDataStatus.getLen());
   }
 
   /**
@@ -187,27 +194,28 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
    * @param path path to open
    * @param inputPolicy input policy to use
    * @param readahead readahead/buffer size
+   * @param length
    * @return the stream, wrapping an S3a one
    * @throws IOException IO problems
    */
   private FSDataInputStream openDataFile(S3AFileSystem fs,
       Path path,
       S3AInputPolicy inputPolicy,
-      long readahead) throws IOException {
+      long readahead,
+      final long length) throws IOException {
     int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE,
         DEFAULT_READ_BUFFER_SIZE);
-    S3AInputPolicy policy = fs.getInputPolicy();
-    fs.setInputPolicy(inputPolicy);
-    try {
-      FSDataInputStream stream = fs.open(path, bufferSize);
-      if (readahead >= 0) {
-        stream.setReadahead(readahead);
-      }
-      streamStatistics = getInputStreamStatistics(stream);
-      return stream;
-    } finally {
-      fs.setInputPolicy(policy);
+    final FutureDataInputStreamBuilder builder = fs.openFile(path)
+        .opt(FS_OPTION_OPENFILE_READ_POLICY,
+            inputPolicy.toString())
+        .opt(FS_OPTION_OPENFILE_LENGTH, length)
+        .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
+    if (readahead > 0) {
+      builder.opt(READAHEAD_RANGE, readahead);
     }
+    FSDataInputStream stream = awaitFuture(builder.build());
+    streamStatistics = getInputStreamStatistics(stream);
+    return stream;
   }
 
   /**
@@ -293,8 +301,10 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
       if (bandwidth(blockTimer, blockSize) < minimumBandwidth) {
         LOG.warn("Bandwidth {} too low on block {}: resetting connection",
             bw, blockId);
-        Assert.assertTrue("Bandwidth of " + bw +" too low after  "
-            + resetCount + " attempts", resetCount <= maxResetCount);
+        Assertions.assertThat(resetCount)
+            .describedAs("Bandwidth of %s too low after  %s attempts",
+                bw, resetCount)
+            .isLessThanOrEqualTo(maxResetCount);
         resetCount++;
         // reset the connection
         getS3AInputStream(in).resetConnection();
@@ -359,7 +369,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
   public void testDecompressionSequential128K() throws Throwable {
     describe("Decompress with a 128K readahead");
     skipIfClientSideEncryption();
-    executeDecompression(128 * _1KB, S3AInputPolicy.Sequential);
+    executeDecompression(READAHEAD_128K, S3AInputPolicy.Sequential);
     assertStreamOpenedExactlyOnce();
   }
 
@@ -558,7 +568,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
     byte[] buffer = new byte[datasetLen];
     int readahead = _8K;
     int halfReadahead = _4K;
-    in = openDataFile(fs, dataFile, S3AInputPolicy.Random, readahead);
+    in = openDataFile(fs, dataFile, S3AInputPolicy.Random, readahead, datasetLen);
 
     LOG.info("Starting initial reads");
     S3AInputStream s3aStream = getS3aStream();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/AbstractS3SelectTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/AbstractS3SelectTest.java
index 56d99d1abe3..bf5d96e73b3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/AbstractS3SelectTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/AbstractS3SelectTest.java
@@ -60,11 +60,11 @@ import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.DurationInfo;
 
-import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getLandsatCSVPath;
 import static org.apache.hadoop.fs.s3a.select.CsvFile.ALL_QUOTES;
 import static org.apache.hadoop.fs.s3a.select.SelectConstants.*;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 
 /**
  * Superclass for S3 Select tests.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java
index 4c173ab07b3..97fbaae0ae4 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java
@@ -63,8 +63,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.util.DurationInfo;
 
-import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
-import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_NORMAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
 import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
 import static org.apache.hadoop.fs.s3a.select.CsvFile.ALL_QUOTES;
 import static org.apache.hadoop.fs.s3a.select.SelectBinding.expandBackslashChars;
@@ -767,7 +767,8 @@ public class ITestS3Select extends AbstractS3SelectTest {
     JobConf conf = createJobConf();
     inputOpt(conf, CSV_INPUT_HEADER, CSV_HEADER_OPT_NONE);
     inputMust(conf, CSV_INPUT_HEADER, CSV_HEADER_OPT_IGNORE);
-    inputMust(conf, INPUT_FADVISE, INPUT_FADV_NORMAL);
+    inputMust(conf, FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_DEFAULT);
     inputMust(conf, SELECT_ERRORS_INCLUDE_SQL, "true");
     verifySelectionCount(EVEN_ROWS_COUNT,
         SELECT_EVEN_ROWS_NO_HEADER,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3SelectMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3SelectMRJob.java
index 52a59138477..c1c7b89dce8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3SelectMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3SelectMRJob.java
@@ -30,7 +30,6 @@ import org.junit.Test;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.WordCount;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.S3AUtils;
@@ -44,6 +43,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.functional.FutureIO;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 
@@ -203,7 +203,7 @@ public class ITestS3SelectMRJob extends AbstractS3SelectTest {
   private String readStringFromFile(Path path) throws IOException {
     int bytesLen = (int)fs.getFileStatus(path).getLen();
     byte[] buffer = new byte[bytesLen];
-    return FutureIOSupport.awaitFuture(
+    return FutureIO.awaitFuture(
         fs.openFile(path).build().thenApply(in -> {
           try {
             IOUtils.readFully(in, buffer, 0, bytesLen);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org