You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/04/24 16:36:17 UTC

[hadoop] 01/04: HADOOP-16202. Enhanced openFile(): hadoop-common changes. (#2584/1)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 1b4dba99b5d941eb8fd4787462a998c13ea2f171
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Sun Apr 24 17:03:59 2022 +0100

    HADOOP-16202. Enhanced openFile(): hadoop-common changes. (#2584/1)
    
    This defines standard option and values for the
    openFile() builder API for opening a file:
    
    fs.option.openfile.read.policy
     A list of the desired read policy, in preferred order.
     standard values are
     adaptive, default, random, sequential, vector, whole-file
    
    fs.option.openfile.length
     How long the file is.
    
    fs.option.openfile.split.start
     start of a task's split
    
    fs.option.openfile.split.end
     end of a task's split
    
    These can be used by filesystem connectors to optimize their
    reading of the source file, including but not limited to
    * skipping existence/length probes when opening a file
    * choosing a policy for prefetching/caching data
    
    The hadoop shell commands which read files all declare "whole-file"
    and "sequential", as appropriate.
    
    Contributed by Steve Loughran.
    
    Change-Id: Ia290f79ea7973ce8713d4f90f1315b24d7a23da1
---
 .../java/org/apache/hadoop/fs/AvroFSInput.java     |  11 +-
 .../org/apache/hadoop/fs/ChecksumFileSystem.java   |   4 +-
 .../main/java/org/apache/hadoop/fs/FSBuilder.java  |  14 +
 .../java/org/apache/hadoop/fs/FileContext.java     |  18 +-
 .../main/java/org/apache/hadoop/fs/FileSystem.java |  11 +-
 .../main/java/org/apache/hadoop/fs/FileUtil.java   |  61 ++-
 .../hadoop/fs/FutureDataInputStreamBuilder.java    |   8 +-
 .../main/java/org/apache/hadoop/fs/Options.java    | 119 +++++
 .../hadoop/fs/impl/AbstractFSBuilderImpl.java      |  38 +-
 .../fs/impl/FileSystemMultipartUploader.java       |   9 +-
 .../fs/impl/FutureDataInputStreamBuilderImpl.java  |   8 +-
 .../org/apache/hadoop/fs/impl/FutureIOSupport.java |  83 +--
 .../apache/hadoop/fs/impl/OpenFileParameters.java  |  13 +
 .../apache/hadoop/fs/impl/WrappedIOException.java  |  11 +-
 .../hadoop/fs/shell/CommandWithDestination.java    |   9 +-
 .../org/apache/hadoop/fs/shell/CopyCommands.java   |   3 +-
 .../java/org/apache/hadoop/fs/shell/Display.java   |   3 +-
 .../main/java/org/apache/hadoop/fs/shell/Head.java |   8 +-
 .../java/org/apache/hadoop/fs/shell/PathData.java  |  35 ++
 .../main/java/org/apache/hadoop/fs/shell/Tail.java |  11 +-
 .../hadoop/fs/statistics/StoreStatisticNames.java  |   9 +
 .../hadoop/fs/statistics/StreamStatisticNames.java |  19 +-
 .../fs/statistics/impl/IOStatisticsBinding.java    |  44 +-
 .../java/org/apache/hadoop/io/SequenceFile.java    |  14 +-
 .../org/apache/hadoop/util/JsonSerialization.java  |   6 +-
 .../util/functional/CommonCallableSupplier.java    |   2 +-
 .../apache/hadoop/util/functional/FutureIO.java    |  90 ++++
 .../src/site/markdown/filesystem/filesystem.md     |  90 +---
 .../filesystem/fsdatainputstreambuilder.md         | 588 ++++++++++++++++++++-
 .../src/site/markdown/filesystem/index.md          |   1 +
 .../src/site/markdown/filesystem/openfile.md       | 122 +++++
 .../AbstractContractMultipartUploaderTest.java     |   2 +-
 .../fs/contract/AbstractContractOpenTest.java      |  80 ++-
 .../hadoop/fs/contract/ContractTestUtils.java      |  13 +-
 .../fs/statistics/IOStatisticAssertions.java       |  20 +
 .../hadoop/fs/statistics/TestDurationTracking.java |   3 +-
 36 files changed, 1320 insertions(+), 260 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java
index b4a4a85674d..213fbc24c4d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java
@@ -25,6 +25,10 @@ import org.apache.avro.file.SeekableInput;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
+
 /** Adapts an {@link FSDataInputStream} to Avro's SeekableInput interface. */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
@@ -42,7 +46,12 @@ public class AvroFSInput implements Closeable, SeekableInput {
   public AvroFSInput(final FileContext fc, final Path p) throws IOException {
     FileStatus status = fc.getFileStatus(p);
     this.len = status.getLen();
-    this.stream = fc.open(p);
+    this.stream = awaitFuture(fc.openFile(p)
+        .opt(FS_OPTION_OPENFILE_READ_POLICY,
+            FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+        .withFileStatus(status)
+        .build());
+    fc.open(p);
   }
 
   @Override
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
index c7f8e36c3f6..59ffe00bcb2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -45,6 +44,7 @@ import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.LambdaUtils;
 import org.apache.hadoop.util.Progressable;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
 import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
 import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
 
@@ -889,7 +889,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       final OpenFileParameters parameters) throws IOException {
     AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
         parameters.getMandatoryKeys(),
-        Collections.emptySet(),
+        FS_OPTION_OPENFILE_STANDARD_OPTIONS,
         "for " + path);
     return LambdaUtils.eval(
         new CompletableFuture<>(),
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java
index b7757a62e28..a4c7254cfeb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java
@@ -61,6 +61,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
    */
   B opt(@Nonnull String key, float value);
 
+  /**
+   * Set optional long parameter for the Builder.
+   *
+   * @see #opt(String, String)
+   */
+  B opt(@Nonnull String key, long value);
+
   /**
    * Set optional double parameter for the Builder.
    *
@@ -104,6 +111,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
    */
   B must(@Nonnull String key, float value);
 
+  /**
+   * Set mandatory long option.
+   *
+   * @see #must(String, String)
+   */
+  B must(@Nonnull String key, long value);
+
   /**
    * Set mandatory double option.
    *
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
index 9922dfa0ac8..f3004ce7e03 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
@@ -70,7 +70,12 @@ import org.apache.hadoop.tracing.Tracer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
 import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 
 /**
  * The FileContext class provides an interface for users of the Hadoop
@@ -2198,7 +2203,12 @@ public class FileContext implements PathCapabilities {
         EnumSet<CreateFlag> createFlag = overwrite ? EnumSet.of(
             CreateFlag.CREATE, CreateFlag.OVERWRITE) :
             EnumSet.of(CreateFlag.CREATE);
-        InputStream in = open(qSrc);
+        InputStream in = awaitFuture(openFile(qSrc)
+            .opt(FS_OPTION_OPENFILE_READ_POLICY,
+                FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+            .opt(FS_OPTION_OPENFILE_LENGTH,
+                fs.getLen())   // file length hint for object stores
+            .build());
         try (OutputStream out = create(qDst, createFlag)) {
           IOUtils.copyBytes(in, out, conf, true);
         } finally {
@@ -2930,9 +2940,11 @@ public class FileContext implements PathCapabilities {
       final Path absF = fixRelativePart(getPath());
       OpenFileParameters parameters = new OpenFileParameters()
           .withMandatoryKeys(getMandatoryKeys())
+          .withOptionalKeys(getOptionalKeys())
           .withOptions(getOptions())
-          .withBufferSize(getBufferSize())
-          .withStatus(getStatus());
+          .withStatus(getStatus())
+          .withBufferSize(
+              getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
       return new FSLinkResolver<CompletableFuture<FSDataInputStream>>() {
         @Override
         public CompletableFuture<FSDataInputStream> next(
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index fdb1a475520..aa194e84a35 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.classification.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
 import static org.apache.hadoop.util.Preconditions.checkArgument;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
 import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -4616,7 +4617,7 @@ public abstract class FileSystem extends Configured
       final OpenFileParameters parameters) throws IOException {
     AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
         parameters.getMandatoryKeys(),
-        Collections.emptySet(),
+        Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS,
         "for " + path);
     return LambdaUtils.eval(
         new CompletableFuture<>(), () ->
@@ -4644,7 +4645,7 @@ public abstract class FileSystem extends Configured
       final OpenFileParameters parameters) throws IOException {
     AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
         parameters.getMandatoryKeys(),
-        Collections.emptySet(), "");
+        Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, "");
     CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
     try {
       result.complete(open(pathHandle, parameters.getBufferSize()));
@@ -4751,9 +4752,11 @@ public abstract class FileSystem extends Configured
       Optional<Path> optionalPath = getOptionalPath();
       OpenFileParameters parameters = new OpenFileParameters()
           .withMandatoryKeys(getMandatoryKeys())
+          .withOptionalKeys(getOptionalKeys())
           .withOptions(getOptions())
-          .withBufferSize(getBufferSize())
-          .withStatus(super.getStatus());  // explicit to avoid IDE warnings
+          .withStatus(super.getStatus())
+          .withBufferSize(
+              getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
       if(optionalPath.isPresent()) {
         return getFS().openFileWithOptions(optionalPath.get(),
             parameters);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
index 0d5ced79fc4..7400ca36daa 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
@@ -77,6 +77,11 @@ import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
+
 /**
  * A collection of file-processing util methods
  */
@@ -396,7 +401,32 @@ public class FileUtil {
     return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf);
   }
 
-  /** Copy files between FileSystems. */
+  /**
+   * Copy a file/directory tree within/between filesystems.
+   * <p></p>
+   * returns true if the operation succeeded. When deleteSource is true,
+   * this means "after the copy, delete(source) returned true"
+   * If the destination is a directory, and mkdirs (dest) fails,
+   * the operation will return false rather than raise any exception.
+   * <p></p>
+   * The overwrite flag is about overwriting files; it has no effect about
+   * handing an attempt to copy a file atop a directory (expect an IOException),
+   * or a directory over a path which contains a file (mkdir will fail, so
+   * "false").
+   * <p></p>
+   * The operation is recursive, and the deleteSource operation takes place
+   * as each subdirectory is copied. Therefore, if an operation fails partway
+   * through, the source tree may be partially deleted.
+   * @param srcFS source filesystem
+   * @param srcStatus status of source
+   * @param dstFS destination filesystem
+   * @param dst path of source
+   * @param deleteSource delete the source?
+   * @param overwrite overwrite files at destination?
+   * @param conf configuration to use when opening files
+   * @return true if the operation succeeded.
+   * @throws IOException failure
+   */
   public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
                              FileSystem dstFS, Path dst,
                              boolean deleteSource,
@@ -409,22 +439,27 @@ public class FileUtil {
       if (!dstFS.mkdirs(dst)) {
         return false;
       }
-      FileStatus contents[] = srcFS.listStatus(src);
-      for (int i = 0; i < contents.length; i++) {
-        copy(srcFS, contents[i], dstFS,
-             new Path(dst, contents[i].getPath().getName()),
-             deleteSource, overwrite, conf);
+      RemoteIterator<FileStatus> contents = srcFS.listStatusIterator(src);
+      while (contents.hasNext()) {
+        FileStatus next = contents.next();
+        copy(srcFS, next, dstFS,
+            new Path(dst, next.getPath().getName()),
+            deleteSource, overwrite, conf);
       }
     } else {
-      InputStream in=null;
+      InputStream in = null;
       OutputStream out = null;
       try {
-        in = srcFS.open(src);
+        in = awaitFuture(srcFS.openFile(src)
+            .opt(FS_OPTION_OPENFILE_READ_POLICY,
+                FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+            .opt(FS_OPTION_OPENFILE_LENGTH,
+                srcStatus.getLen())   // file length hint for object stores
+            .build());
         out = dstFS.create(dst, overwrite);
         IOUtils.copyBytes(in, out, conf, true);
       } catch (IOException e) {
-        IOUtils.closeStream(out);
-        IOUtils.closeStream(in);
+        IOUtils.cleanupWithLogger(LOG, in, out);
         throw e;
       }
     }
@@ -503,7 +538,11 @@ public class FileUtil {
              deleteSource, conf);
       }
     } else {
-      InputStream in = srcFS.open(src);
+      InputStream in = awaitFuture(srcFS.openFile(src)
+          .withFileStatus(srcStatus)
+          .opt(FS_OPTION_OPENFILE_READ_POLICY,
+              FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+          .build());
       IOUtils.copyBytes(in, Files.newOutputStream(dst.toPath()), conf);
     }
     if (deleteSource) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java
index 27a522e5930..e7f441a75d3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.fs;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
@@ -34,7 +35,7 @@ import org.apache.hadoop.classification.InterfaceStability;
  * options accordingly, for example:
  *
  * If the option is not related to the file system, the option will be ignored.
- * If the option is must, but not supported by the file system, a
+ * If the option is must, but not supported/known by the file system, an
  * {@link IllegalArgumentException} will be thrown.
  *
  */
@@ -51,10 +52,11 @@ public interface FutureDataInputStreamBuilder
   /**
    * A FileStatus may be provided to the open request.
    * It is up to the implementation whether to use this or not.
-   * @param status status.
+   * @param status status: may be null
    * @return the builder.
    */
-  default FutureDataInputStreamBuilder withFileStatus(FileStatus status) {
+  default FutureDataInputStreamBuilder withFileStatus(
+      @Nullable FileStatus status) {
     return this;
   }
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
index 75bc12df8fd..9b457272fcb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
@@ -17,9 +17,13 @@
  */
 package org.apache.hadoop.fs;
 
+import java.util.Collections;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -518,4 +522,119 @@ public final class Options {
     MD5MD5CRC,  // MD5 of block checksums, which are MD5 over chunk CRCs
     COMPOSITE_CRC  // Block/chunk-independent composite CRC
   }
+
+  /**
+   * The standard {@code openFile()} options.
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static final class OpenFileOptions {
+
+    private OpenFileOptions() {
+    }
+
+    /**
+     * Prefix for all standard filesystem options: {@value}.
+     */
+    private static final String FILESYSTEM_OPTION = "fs.option.";
+
+    /**
+     * Prefix for all openFile options: {@value}.
+     */
+    public static final String FS_OPTION_OPENFILE =
+        FILESYSTEM_OPTION + "openfile.";
+
+    /**
+     * OpenFile option for file length: {@value}.
+     */
+    public static final String FS_OPTION_OPENFILE_LENGTH =
+        FS_OPTION_OPENFILE + "length";
+
+    /**
+     * OpenFile option for split start: {@value}.
+     */
+    public static final String FS_OPTION_OPENFILE_SPLIT_START =
+        FS_OPTION_OPENFILE + "split.start";
+
+    /**
+     * OpenFile option for split end: {@value}.
+     */
+    public static final String FS_OPTION_OPENFILE_SPLIT_END =
+        FS_OPTION_OPENFILE + "split.end";
+
+    /**
+     * OpenFile option for buffer size: {@value}.
+     */
+    public static final String FS_OPTION_OPENFILE_BUFFER_SIZE =
+        FS_OPTION_OPENFILE + "buffer.size";
+
+    /**
+     * OpenFile option for read policies: {@value}.
+     */
+    public static final String FS_OPTION_OPENFILE_READ_POLICY =
+        FS_OPTION_OPENFILE + "read.policy";
+
+    /**
+     * Set of standard options which openFile implementations
+     * MUST recognize, even if they ignore the actual values.
+     */
+    public static final Set<String> FS_OPTION_OPENFILE_STANDARD_OPTIONS =
+        Collections.unmodifiableSet(Stream.of(
+                FS_OPTION_OPENFILE_BUFFER_SIZE,
+                FS_OPTION_OPENFILE_READ_POLICY,
+                FS_OPTION_OPENFILE_LENGTH,
+                FS_OPTION_OPENFILE_SPLIT_START,
+                FS_OPTION_OPENFILE_SPLIT_END)
+            .collect(Collectors.toSet()));
+
+    /**
+     * Read policy for adaptive IO: {@value}.
+     */
+    public static final String FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE =
+        "adaptive";
+
+    /**
+     * Read policy {@value} -whateve the implementation does by default.
+     */
+    public static final String FS_OPTION_OPENFILE_READ_POLICY_DEFAULT =
+        "default";
+
+    /**
+     * Read policy for random IO: {@value}.
+     */
+    public static final String FS_OPTION_OPENFILE_READ_POLICY_RANDOM =
+        "random";
+
+    /**
+     * Read policy for sequential IO: {@value}.
+     */
+    public static final String FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL =
+        "sequential";
+
+    /**
+     * Vectored IO API to be used: {@value}.
+     */
+    public static final String FS_OPTION_OPENFILE_READ_POLICY_VECTOR =
+        "vector";
+
+    /**
+     * Whole file to be read, end-to-end: {@value}.
+     */
+    public static final String FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE =
+        "whole-file";
+
+    /**
+     * All the current read policies as a set.
+     */
+    public static final Set<String> FS_OPTION_OPENFILE_READ_POLICIES =
+        Collections.unmodifiableSet(Stream.of(
+                FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE,
+                FS_OPTION_OPENFILE_READ_POLICY_DEFAULT,
+                FS_OPTION_OPENFILE_READ_POLICY_RANDOM,
+                FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL,
+                FS_OPTION_OPENFILE_READ_POLICY_VECTOR,
+                FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+            .collect(Collectors.toSet()));
+
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
index c69e7afe4e3..9d3a46d6332 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
@@ -46,7 +46,7 @@ import static org.apache.hadoop.util.Preconditions.checkNotNull;
  * <code>
  *   .opt("foofs:option.a", true)
  *   .opt("foofs:option.b", "value")
- *   .opt("barfs:cache", true)
+ *   .opt("fs.s3a.open.option.etag", "9fe4c37c25b")
  *   .must("foofs:cache", true)
  *   .must("barfs:cache-size", 256 * 1024 * 1024)
  *   .build();
@@ -88,6 +88,9 @@ public abstract class
   /** Keep track of the keys for mandatory options. */
   private final Set<String> mandatoryKeys = new HashSet<>();
 
+  /** Keep track of the optional keys. */
+  private final Set<String> optionalKeys = new HashSet<>();
+
   /**
    * Constructor with both optional path and path handle.
    * Either or both argument may be empty, but it is an error for
@@ -163,6 +166,7 @@ public abstract class
   @Override
   public B opt(@Nonnull final String key, @Nonnull final String value) {
     mandatoryKeys.remove(key);
+    optionalKeys.add(key);
     options.set(key, value);
     return getThisBuilder();
   }
@@ -175,6 +179,7 @@ public abstract class
   @Override
   public B opt(@Nonnull final String key, boolean value) {
     mandatoryKeys.remove(key);
+    optionalKeys.add(key);
     options.setBoolean(key, value);
     return getThisBuilder();
   }
@@ -187,10 +192,19 @@ public abstract class
   @Override
   public B opt(@Nonnull final String key, int value) {
     mandatoryKeys.remove(key);
+    optionalKeys.add(key);
     options.setInt(key, value);
     return getThisBuilder();
   }
 
+  @Override
+  public B opt(@Nonnull final String key, final long value) {
+    mandatoryKeys.remove(key);
+    optionalKeys.add(key);
+    options.setLong(key, value);
+    return getThisBuilder();
+  }
+
   /**
    * Set optional float parameter for the Builder.
    *
@@ -199,6 +213,7 @@ public abstract class
   @Override
   public B opt(@Nonnull final String key, float value) {
     mandatoryKeys.remove(key);
+    optionalKeys.add(key);
     options.setFloat(key, value);
     return getThisBuilder();
   }
@@ -211,6 +226,7 @@ public abstract class
   @Override
   public B opt(@Nonnull final String key, double value) {
     mandatoryKeys.remove(key);
+    optionalKeys.add(key);
     options.setDouble(key, value);
     return getThisBuilder();
   }
@@ -223,6 +239,7 @@ public abstract class
   @Override
   public B opt(@Nonnull final String key, @Nonnull final String... values) {
     mandatoryKeys.remove(key);
+    optionalKeys.add(key);
     options.setStrings(key, values);
     return getThisBuilder();
   }
@@ -248,6 +265,7 @@ public abstract class
   @Override
   public B must(@Nonnull final String key, boolean value) {
     mandatoryKeys.add(key);
+    optionalKeys.remove(key);
     options.setBoolean(key, value);
     return getThisBuilder();
   }
@@ -260,10 +278,19 @@ public abstract class
   @Override
   public B must(@Nonnull final String key, int value) {
     mandatoryKeys.add(key);
+    optionalKeys.remove(key);
     options.setInt(key, value);
     return getThisBuilder();
   }
 
+  @Override
+  public B must(@Nonnull final String key, final long value) {
+    mandatoryKeys.add(key);
+    optionalKeys.remove(key);
+    options.setLong(key, value);
+    return getThisBuilder();
+  }
+
   /**
    * Set mandatory float option.
    *
@@ -272,6 +299,7 @@ public abstract class
   @Override
   public B must(@Nonnull final String key, float value) {
     mandatoryKeys.add(key);
+    optionalKeys.remove(key);
     options.setFloat(key, value);
     return getThisBuilder();
   }
@@ -284,6 +312,7 @@ public abstract class
   @Override
   public B must(@Nonnull final String key, double value) {
     mandatoryKeys.add(key);
+    optionalKeys.remove(key);
     options.setDouble(key, value);
     return getThisBuilder();
   }
@@ -296,6 +325,7 @@ public abstract class
   @Override
   public B must(@Nonnull final String key, @Nonnull final String... values) {
     mandatoryKeys.add(key);
+    optionalKeys.remove(key);
     options.setStrings(key, values);
     return getThisBuilder();
   }
@@ -314,6 +344,12 @@ public abstract class
   public Set<String> getMandatoryKeys() {
     return Collections.unmodifiableSet(mandatoryKeys);
   }
+  /**
+   * Get all the keys that are set as optional keys.
+   */
+  public Set<String> getOptionalKeys() {
+    return Collections.unmodifiableSet(optionalKeys);
+  }
 
   /**
    * Reject a configuration if one or more mandatory keys are
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java
index 2339e421289..481d927672d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathHandle;
 import org.apache.hadoop.fs.UploadHandle;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.functional.FutureIO;
 
 import static org.apache.hadoop.fs.Path.mergePaths;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@@ -98,7 +99,7 @@ public class FileSystemMultipartUploader extends AbstractMultipartUploader {
   public CompletableFuture<UploadHandle> startUpload(Path filePath)
       throws IOException {
     checkPath(filePath);
-    return FutureIOSupport.eval(() -> {
+    return FutureIO.eval(() -> {
       Path collectorPath = createCollectorPath(filePath);
       fs.mkdirs(collectorPath, FsPermission.getDirDefault());
 
@@ -116,7 +117,7 @@ public class FileSystemMultipartUploader extends AbstractMultipartUploader {
       throws IOException {
     checkPutArguments(filePath, inputStream, partNumber, uploadId,
         lengthInBytes);
-    return FutureIOSupport.eval(() -> innerPutPart(filePath,
+    return FutureIO.eval(() -> innerPutPart(filePath,
         inputStream, partNumber, uploadId, lengthInBytes));
   }
 
@@ -179,7 +180,7 @@ public class FileSystemMultipartUploader extends AbstractMultipartUploader {
       Map<Integer, PartHandle> handleMap) throws IOException {
 
     checkPath(filePath);
-    return FutureIOSupport.eval(() ->
+    return FutureIO.eval(() ->
         innerComplete(uploadId, filePath, handleMap));
   }
 
@@ -251,7 +252,7 @@ public class FileSystemMultipartUploader extends AbstractMultipartUploader {
     Path collectorPath = new Path(new String(uploadIdByteArray, 0,
         uploadIdByteArray.length, Charsets.UTF_8));
 
-    return FutureIOSupport.eval(() -> {
+    return FutureIO.eval(() -> {
       // force a check for a file existing; raises FNFE if not found
       fs.getFileStatus(collectorPath);
       fs.delete(collectorPath, true);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java
index 24a8d49747f..70e39de7388 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.impl;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
@@ -47,7 +48,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
  * options accordingly, for example:
  *
  * If the option is not related to the file system, the option will be ignored.
- * If the option is must, but not supported by the file system, a
+ * If the option is must, but not supported/known by the file system, an
  * {@link IllegalArgumentException} will be thrown.
  *
  */
@@ -147,8 +148,9 @@ public abstract class FutureDataInputStreamBuilderImpl
   }
 
   @Override
-  public FutureDataInputStreamBuilder withFileStatus(FileStatus st) {
-    this.status = requireNonNull(st, "status");
+  public FutureDataInputStreamBuilder withFileStatus(
+      @Nullable FileStatus st) {
+    this.status = st;
     return this;
   }
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java
index 18f5187cb61..f47e5f4fbfb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.impl;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -37,14 +36,16 @@ import org.apache.hadoop.util.functional.FutureIO;
 
 /**
  * Support for future IO and the FS Builder subclasses.
- * If methods in here are needed for applications, promote
- * to {@link FutureIO} for public use -with the original
- * method relaying to it. This is to ensure that external
- * filesystem implementations can safely use these methods
+ * All methods in this class have been superceded by those in
+ * {@link FutureIO}.
+ * The methods here are retained but all marked as deprecated.
+ * This is to ensure that any external
+ * filesystem implementations can still use these methods
  * without linkage problems surfacing.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
+@Deprecated
 public final class FutureIOSupport {
 
   private FutureIOSupport() {
@@ -53,6 +54,7 @@ public final class FutureIOSupport {
   /**
    * Given a future, evaluate it. Raised exceptions are
    * extracted and handled.
+   * See {@link FutureIO#awaitFuture(Future, long, TimeUnit)}.
    * @param future future to evaluate
    * @param <T> type of the result.
    * @return the result, if all went well.
@@ -60,7 +62,8 @@ public final class FutureIOSupport {
    * @throws IOException if something went wrong
    * @throws RuntimeException any nested RTE thrown
    */
-  public static <T> T  awaitFuture(final Future<T> future)
+  @Deprecated
+  public static <T> T awaitFuture(final Future<T> future)
       throws InterruptedIOException, IOException, RuntimeException {
     return FutureIO.awaitFuture(future);
   }
@@ -69,6 +72,7 @@ public final class FutureIOSupport {
   /**
    * Given a future, evaluate it. Raised exceptions are
    * extracted and handled.
+   * See {@link FutureIO#awaitFuture(Future, long, TimeUnit)}.
    * @param future future to evaluate
    * @param <T> type of the result.
    * @return the result, if all went well.
@@ -77,6 +81,7 @@ public final class FutureIOSupport {
    * @throws RuntimeException any nested RTE thrown
    * @throws TimeoutException the future timed out.
    */
+  @Deprecated
   public static <T> T awaitFuture(final Future<T> future,
       final long timeout,
       final TimeUnit unit)
@@ -88,10 +93,7 @@ public final class FutureIOSupport {
   /**
    * From the inner cause of an execution exception, extract the inner cause
    * if it is an IOE or RTE.
-   * This will always raise an exception, either the inner IOException,
-   * an inner RuntimeException, or a new IOException wrapping the raised
-   * exception.
-   *
+   * See {@link FutureIO#raiseInnerCause(ExecutionException)}.
    * @param e exception.
    * @param <T> type of return value.
    * @return nothing, ever.
@@ -99,6 +101,7 @@ public final class FutureIOSupport {
    * any non-Runtime-Exception
    * @throws RuntimeException if that is the inner cause.
    */
+  @Deprecated
   public static <T> T raiseInnerCause(final ExecutionException e)
       throws IOException {
     return FutureIO.raiseInnerCause(e);
@@ -107,6 +110,7 @@ public final class FutureIOSupport {
   /**
    * Extract the cause of a completion failure and rethrow it if an IOE
    * or RTE.
+   * See {@link FutureIO#raiseInnerCause(CompletionException)}.
    * @param e exception.
    * @param <T> type of return value.
    * @return nothing, ever.
@@ -114,20 +118,15 @@ public final class FutureIOSupport {
    * any non-Runtime-Exception
    * @throws RuntimeException if that is the inner cause.
    */
+  @Deprecated
   public static <T> T raiseInnerCause(final CompletionException e)
       throws IOException {
     return FutureIO.raiseInnerCause(e);
   }
 
   /**
-   * Propagate options to any builder, converting everything with the
-   * prefix to an option where, if there were 2+ dot-separated elements,
-   * it is converted to a schema.
-   * <pre>{@code
-   *   fs.example.s3a.option => s3a:option
-   *   fs.example.fs.io.policy => s3a.io.policy
-   *   fs.example.something => something
-   * }</pre>
+   * Propagate options to any builder.
+   * {@link FutureIO#propagateOptions(FSBuilder, Configuration, String, String)}
    * @param builder builder to modify
    * @param conf configuration to read
    * @param optionalPrefix prefix for optional settings
@@ -136,56 +135,39 @@ public final class FutureIOSupport {
    * @param <U> type of builder
    * @return the builder passed in.
    */
+  @Deprecated
   public static <T, U extends FSBuilder<T, U>>
         FSBuilder<T, U> propagateOptions(
       final FSBuilder<T, U> builder,
       final Configuration conf,
       final String optionalPrefix,
       final String mandatoryPrefix) {
-    propagateOptions(builder, conf,
-        optionalPrefix, false);
-    propagateOptions(builder, conf,
-        mandatoryPrefix, true);
-    return builder;
+    return FutureIO.propagateOptions(builder,
+        conf, optionalPrefix, mandatoryPrefix);
   }
 
   /**
-   * Propagate options to any builder, converting everything with the
-   * prefix to an option where, if there were 2+ dot-separated elements,
-   * it is converted to a schema.
-   * <pre>{@code
-   *   fs.example.s3a.option => s3a:option
-   *   fs.example.fs.io.policy => s3a.io.policy
-   *   fs.example.something => something
-   * }</pre>
+   * Propagate options to any builder.
+   * {@link FutureIO#propagateOptions(FSBuilder, Configuration, String, boolean)}
    * @param builder builder to modify
    * @param conf configuration to read
    * @param prefix prefix to scan/strip
    * @param mandatory are the options to be mandatory or optional?
    */
+  @Deprecated
   public static void propagateOptions(
       final FSBuilder<?, ?> builder,
       final Configuration conf,
       final String prefix,
       final boolean mandatory) {
-
-    final String p = prefix.endsWith(".") ? prefix : (prefix + ".");
-    final Map<String, String> propsWithPrefix = conf.getPropsWithPrefix(p);
-    for (Map.Entry<String, String> entry : propsWithPrefix.entrySet()) {
-      // change the schema off each entry
-      String key = entry.getKey();
-      String val = entry.getValue();
-      if (mandatory) {
-        builder.must(key, val);
-      } else {
-        builder.opt(key, val);
-      }
-    }
+    FutureIO.propagateOptions(builder, conf, prefix, mandatory);
   }
 
   /**
    * Evaluate a CallableRaisingIOE in the current thread,
    * converting IOEs to RTEs and propagating.
+   * See {@link FutureIO#eval(CallableRaisingIOE)}.
+   *
    * @param callable callable to invoke
    * @param <T> Return type.
    * @return the evaluated result.
@@ -194,17 +176,6 @@ public final class FutureIOSupport {
    */
   public static <T> CompletableFuture<T> eval(
       CallableRaisingIOE<T> callable) {
-    CompletableFuture<T> result = new CompletableFuture<>();
-    try {
-      result.complete(callable.apply());
-    } catch (UnsupportedOperationException | IllegalArgumentException tx) {
-      // fail fast here
-      throw tx;
-    } catch (Throwable tx) {
-      // fail lazily here to ensure callers expect all File IO operations to
-      // surface later
-      result.completeExceptionally(tx);
-    }
-    return result;
+    return FutureIO.eval(callable);
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java
index 77b4ff52696..a19c5faff4d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java
@@ -38,6 +38,9 @@ public class OpenFileParameters {
    */
   private Set<String> mandatoryKeys;
 
+  /** The optional keys. */
+  private Set<String> optionalKeys;
+
   /**
    * Options set during the build sequence.
    */
@@ -61,6 +64,11 @@ public class OpenFileParameters {
     return this;
   }
 
+  public OpenFileParameters withOptionalKeys(final Set<String> keys) {
+    this.optionalKeys = requireNonNull(keys);
+    return this;
+  }
+
   public OpenFileParameters withOptions(final Configuration opts) {
     this.options = requireNonNull(opts);
     return this;
@@ -80,6 +88,10 @@ public class OpenFileParameters {
     return mandatoryKeys;
   }
 
+  public Set<String> getOptionalKeys() {
+    return optionalKeys;
+  }
+
   public Configuration getOptions() {
     return options;
   }
@@ -91,4 +103,5 @@ public class OpenFileParameters {
   public FileStatus getStatus() {
     return status;
   }
+
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java
index d2c999683c6..d5144b5e9c5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java
@@ -20,20 +20,17 @@ package org.apache.hadoop.fs.impl;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
- * A wrapper for an IOException which
- * {@link FutureIOSupport#raiseInnerCause(ExecutionException)} knows to
- * always extract the exception.
+ * A wrapper for an IOException.
  *
  * The constructor signature guarantees the cause will be an IOException,
  * and as it checks for a null-argument, non-null.
- * @deprecated use the {@code UncheckedIOException}.
+ * @deprecated use the {@code UncheckedIOException} directly.]
  */
 @Deprecated
 @InterfaceAudience.Private
@@ -51,8 +48,4 @@ public class WrappedIOException extends UncheckedIOException {
     super(Preconditions.checkNotNull(cause));
   }
 
-  @Override
-  public synchronized IOException getCause() {
-    return (IOException) super.getCause();
-  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
index f6f4247489f..678225f81e0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
@@ -55,6 +55,9 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 
 /**
  * Provides: argument processing to ensure the destination is valid
@@ -348,7 +351,11 @@ abstract class CommandWithDestination extends FsCommand {
     src.fs.setVerifyChecksum(verifyChecksum);
     InputStream in = null;
     try {
-      in = src.fs.open(src.path);
+      in = awaitFuture(src.fs.openFile(src.path)
+          .withFileStatus(src.stat)
+          .opt(FS_OPTION_OPENFILE_READ_POLICY,
+              FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+          .build());
       copyStreamToTarget(in, target);
       preserveAttributes(src, target, preserveRawXattrs);
     } finally {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
index b03d7de8a1c..0643a2e983d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
@@ -98,7 +98,8 @@ class CopyCommands {
       try {
         for (PathData src : srcs) {
           if (src.stat.getLen() != 0) {
-            try (FSDataInputStream in = src.fs.open(src.path)) {
+            // Always do sequential reads.
+            try (FSDataInputStream in = src.openForSequentialIO()) {
               IOUtils.copyBytes(in, out, getConf(), false);
               writeDelimiter(out);
             }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java
index 670fa152f72..d3ca013a3f2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java
@@ -105,7 +105,8 @@ class Display extends FsCommand {
     }
 
     protected InputStream getInputStream(PathData item) throws IOException {
-      return item.fs.open(item.path);
+      // Always do sequential reads;
+      return item.openForSequentialIO();
     }
   }
   
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Head.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Head.java
index 2280225b5ae..7242f261801 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Head.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Head.java
@@ -28,6 +28,8 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+
 /**
  * Show the first 1KB of the file.
  */
@@ -68,11 +70,9 @@ class Head extends FsCommand {
   }
 
   private void dumpToOffset(PathData item) throws IOException {
-    FSDataInputStream in = item.fs.open(item.path);
-    try {
+    try (FSDataInputStream in = item.openFile(
+        FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
       IOUtils.copyBytes(in, System.out, endingOffset, false);
-    } finally {
-      in.close();
     }
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
index 1ff8d8f0494..2071a16799a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
@@ -29,6 +29,7 @@ import java.util.regex.Pattern;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -39,6 +40,10 @@ import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.PathNotFoundException;
 import org.apache.hadoop.fs.RemoteIterator;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator;
 
 /**
@@ -601,4 +606,34 @@ public class PathData implements Comparable<PathData> {
   public int hashCode() {
     return path.hashCode();
   }
+
+
+  /**
+   * Open a file for sequential IO.
+   * <p></p>
+   * This uses FileSystem.openFile() to request sequential IO;
+   * the file status is also passed in.
+   * Filesystems may use to optimize their IO.
+   * @return an input stream
+   * @throws IOException failure
+   */
+  protected FSDataInputStream openForSequentialIO()
+      throws IOException {
+    return openFile(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
+  }
+
+  /**
+   * Open a file.
+   * @param policy fadvise policy.
+   * @return an input stream
+   * @throws IOException failure
+   */
+  protected FSDataInputStream openFile(final String policy) throws IOException {
+    return awaitFuture(fs.openFile(path)
+        .opt(FS_OPTION_OPENFILE_READ_POLICY,
+            policy)
+        .opt(FS_OPTION_OPENFILE_LENGTH,
+            stat.getLen())   // file length hint for object stores
+        .build());
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Tail.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Tail.java
index 98db1c3b7b6..26ac3fee471 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Tail.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Tail.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.io.IOUtils;
 
 import org.apache.hadoop.classification.VisibleForTesting;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+
 /**
  * Get a listing of all files in that match the file patterns.
  */
@@ -107,16 +109,15 @@ class Tail extends FsCommand {
     if (offset < 0) {
       offset = Math.max(fileSize + offset, 0);
     }
-    
-    FSDataInputStream in = item.fs.open(item.path);
-    try {
+    // Always do sequential reads.
+    try (FSDataInputStream in = item.openFile(
+        FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
       in.seek(offset);
       // use conf so the system configured io block size is used
       IOUtils.copyBytes(in, System.out, getConf(), false);
       offset = in.getPos();
-    } finally {
-      in.close();
     }
     return offset;
   }
+
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
index 166007f5c9a..c458269c351 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
@@ -118,6 +118,9 @@ public final class StoreStatisticNames {
   /** {@value}. */
   public static final String OP_OPEN = "op_open";
 
+  /** Call to openFile() {@value}. */
+  public static final String OP_OPENFILE = "op_openfile";
+
   /** {@value}. */
   public static final String OP_REMOVE_ACL = "op_remove_acl";
 
@@ -323,6 +326,12 @@ public final class StoreStatisticNames {
   public static final String ACTION_EXECUTOR_ACQUIRED =
       "action_executor_acquired";
 
+  /**
+   * A file was opened: {@value}.
+   */
+  public static final String ACTION_FILE_OPENED
+      = "action_file_opened";
+
   /**
    * An HTTP HEAD request was made: {@value}.
    */
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index 7e9137294c1..ca755f08419 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -76,7 +76,7 @@ public final class StreamStatisticNames {
   public static final String STREAM_READ_CLOSED = "stream_read_closed";
 
   /**
-   * Total count of times an attempt to close an input stream was made
+   * Total count of times an attempt to close an input stream was made.
    * Value: {@value}.
    */
   public static final String STREAM_READ_CLOSE_OPERATIONS
@@ -118,6 +118,23 @@ public final class StreamStatisticNames {
   public static final String STREAM_READ_OPERATIONS_INCOMPLETE
       = "stream_read_operations_incomplete";
 
+  /**
+   * count/duration of aborting a remote stream during stream IO
+   * IO.
+   * Value: {@value}.
+   */
+  public static final String STREAM_READ_REMOTE_STREAM_ABORTED
+      = "stream_read_remote_stream_aborted";
+
+  /**
+   * count/duration of closing a remote stream,
+   * possibly including draining the stream to recycle
+   * the HTTP connection.
+   * Value: {@value}.
+   */
+  public static final String STREAM_READ_REMOTE_STREAM_DRAINED
+      = "stream_read_remote_stream_drain";
+
   /**
    * Count of version mismatches encountered while reading an input stream.
    * Value: {@value}.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
index 70366fc7a30..c45dfc21a1b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
@@ -521,23 +521,39 @@ public final class IOStatisticsBinding {
       // create the tracker outside try-with-resources so
       // that failures can be set in the catcher.
       DurationTracker tracker = createTracker(factory, statistic);
-      try {
-        // exec the input function and return its value
-        return input.apply();
-      } catch (IOException | RuntimeException e) {
-        // input function failed: note it
-        tracker.failed();
-        // and rethrow
-        throw e;
-      } finally {
-        // update the tracker.
-        // this is called after the catch() call would have
-        // set the failed flag.
-        tracker.close();
-      }
+      return invokeTrackingDuration(tracker, input);
     };
   }
 
+  /**
+   * Given an IOException raising callable/lambda expression,
+   * execute it, updating the tracker on success/failure.
+   * @param tracker duration tracker.
+   * @param input input callable.
+   * @param <B> return type.
+   * @return the result of the invocation
+   * @throws IOException on failure.
+   */
+  public static <B> B invokeTrackingDuration(
+      final DurationTracker tracker,
+      final CallableRaisingIOE<B> input)
+      throws IOException {
+    try {
+      // exec the input function and return its value
+      return input.apply();
+    } catch (IOException | RuntimeException e) {
+      // input function failed: note it
+      tracker.failed();
+      // and rethrow
+      throw e;
+    } finally {
+      // update the tracker.
+      // this is called after the catch() call would have
+      // set the failed flag.
+      tracker.close();
+    }
+  }
+
   /**
    * Given an IOException raising Consumer,
    * return a new one which wraps the inner and tracks
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
index 037bbd3fd0f..890e7916ab0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
@@ -57,6 +57,11 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SEQFILE_COMP
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_KEY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 
 /** 
  * <code>SequenceFile</code>s are flat files consisting of binary key/value 
@@ -1948,7 +1953,14 @@ public class SequenceFile {
      */
     protected FSDataInputStream openFile(FileSystem fs, Path file,
         int bufferSize, long length) throws IOException {
-      return fs.open(file, bufferSize);
+      FutureDataInputStreamBuilder builder = fs.openFile(file)
+          .opt(FS_OPTION_OPENFILE_READ_POLICY,
+              FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+          .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
+      if (length >= 0) {
+        builder.opt(FS_OPTION_OPENFILE_LENGTH, length);
+      }
+      return awaitFuture(builder.build());
     }
     
     /**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java
index d2bc8cd2960..002c725490f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIOException;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
 import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 
 /**
@@ -265,7 +267,9 @@ public class JsonSerialization<T> {
     if (status != null && status.getLen() == 0) {
       throw new EOFException("No data in " + path);
     }
-    FutureDataInputStreamBuilder builder = fs.openFile(path);
+    FutureDataInputStreamBuilder builder = fs.openFile(path)
+        .opt(FS_OPTION_OPENFILE_READ_POLICY,
+            FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE);
     if (status != null) {
       builder.withFileStatus(status);
     }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java
index e2cdc0fd414..32e299b4d45 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.util.DurationInfo;
 
-import static org.apache.hadoop.fs.impl.FutureIOSupport.raiseInnerCause;
+import static org.apache.hadoop.util.functional.FutureIO.raiseInnerCause;
 
 /**
  * A bridge from Callable to Supplier; catching exceptions
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
index 3f7218baa75..c3fda19d8d7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.util.functional;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.UncheckedIOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -29,6 +31,8 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSBuilder;
 
 /**
  * Future IO Helper methods.
@@ -86,6 +90,8 @@ public final class FutureIO {
    * extracted and rethrown.
    * </p>
    * @param future future to evaluate
+   * @param timeout timeout to wait
+   * @param unit time unit.
    * @param <T> type of the result.
    * @return the result, if all went well.
    * @throws InterruptedIOException future was interrupted
@@ -185,4 +191,88 @@ public final class FutureIO {
     }
   }
 
+  /**
+   * Propagate options to any builder, converting everything with the
+   * prefix to an option where, if there were 2+ dot-separated elements,
+   * it is converted to a schema.
+   * See {@link #propagateOptions(FSBuilder, Configuration, String, boolean)}.
+   * @param builder builder to modify
+   * @param conf configuration to read
+   * @param optionalPrefix prefix for optional settings
+   * @param mandatoryPrefix prefix for mandatory settings
+   * @param <T> type of result
+   * @param <U> type of builder
+   * @return the builder passed in.
+   */
+  public static <T, U extends FSBuilder<T, U>>
+      FSBuilder<T, U> propagateOptions(
+        final FSBuilder<T, U> builder,
+        final Configuration conf,
+        final String optionalPrefix,
+        final String mandatoryPrefix) {
+    propagateOptions(builder, conf,
+        optionalPrefix, false);
+    propagateOptions(builder, conf,
+        mandatoryPrefix, true);
+    return builder;
+  }
+
+  /**
+   * Propagate options to any builder, converting everything with the
+   * prefix to an option where, if there were 2+ dot-separated elements,
+   * it is converted to a schema.
+   * <pre>
+   *   fs.example.s3a.option becomes "s3a.option"
+   *   fs.example.fs.io.policy becomes "fs.io.policy"
+   *   fs.example.something becomes "something"
+   * </pre>
+   * @param builder builder to modify
+   * @param conf configuration to read
+   * @param prefix prefix to scan/strip
+   * @param mandatory are the options to be mandatory or optional?
+   */
+  public static void propagateOptions(
+      final FSBuilder<?, ?> builder,
+      final Configuration conf,
+      final String prefix,
+      final boolean mandatory) {
+
+    final String p = prefix.endsWith(".") ? prefix : (prefix + ".");
+    final Map<String, String> propsWithPrefix = conf.getPropsWithPrefix(p);
+    for (Map.Entry<String, String> entry : propsWithPrefix.entrySet()) {
+      // change the schema off each entry
+      String key = entry.getKey();
+      String val = entry.getValue();
+      if (mandatory) {
+        builder.must(key, val);
+      } else {
+        builder.opt(key, val);
+      }
+    }
+  }
+
+  /**
+   * Evaluate a CallableRaisingIOE in the current thread,
+   * converting IOEs to RTEs and propagating.
+   * @param callable callable to invoke
+   * @param <T> Return type.
+   * @return the evaluated result.
+   * @throws UnsupportedOperationException fail fast if unsupported
+   * @throws IllegalArgumentException invalid argument
+   */
+  public static <T> CompletableFuture<T> eval(
+      CallableRaisingIOE<T> callable) {
+    CompletableFuture<T> result = new CompletableFuture<>();
+    try {
+      result.complete(callable.apply());
+    } catch (UnsupportedOperationException | IllegalArgumentException tx) {
+      // fail fast here
+      throw tx;
+    } catch (Throwable tx) {
+      // fail lazily here to ensure callers expect all File IO operations to
+      // surface later
+      result.completeExceptionally(tx);
+    }
+    return result;
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
index 4517bd8ff4a..004220c4bed 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
@@ -814,97 +814,11 @@ exists in the metadata, but no copies of any its blocks can be located;
 
 ### `FSDataInputStreamBuilder openFile(Path path)`
 
-Creates a [`FSDataInputStreamBuilder`](fsdatainputstreambuilder.html)
-to construct a operation to open the file at `path` for reading.
+See [openFile()](openfile.html).
 
-When `build()` is invoked on the returned `FSDataInputStreamBuilder` instance,
-the builder parameters are verified and
-`openFileWithOptions(Path, OpenFileParameters)` invoked.
-
-This (protected) operation returns a `CompletableFuture<FSDataInputStream>`
-which, when its `get()` method is called, either returns an input
-stream of the contents of opened file, or raises an exception.
-
-The base implementation of the `openFileWithOptions(PathHandle, OpenFileParameters)`
-ultimately invokes `open(Path, int)`.
-
-Thus the chain `openFile(path).build().get()` has the same preconditions
-and postconditions as `open(Path p, int bufferSize)`
-
-However, there is one difference which implementations are free to
-take advantage of: 
-
-The returned stream MAY implement a lazy open where file non-existence or
-access permission failures may not surface until the first `read()` of the
-actual data.
-
-The `openFile()` operation may check the state of the filesystem during its
-invocation, but as the state of the filesystem may change betwen this call and
-the actual `build()` and `get()` operations, this file-specific
-preconditions (file exists, file is readable, etc) MUST NOT be checked here.
-
-FileSystem implementations which do not implement `open(Path, int)`
-MAY postpone raising an `UnsupportedOperationException` until either the
-`FSDataInputStreamBuilder.build()` or the subsequent `get()` call,
-else they MAY fail fast in the `openFile()` call.
-
-### Implementors notes
-
-The base implementation of `openFileWithOptions()` actually executes
-the `open(path)` operation synchronously, yet still returns the result
-or any failures in the `CompletableFuture<>`, so as to ensure that users
-code expecting this.
-
-Any filesystem where the time to open a file may be significant SHOULD
-execute it asynchronously by submitting the operation in some executor/thread
-pool. This is particularly recommended for object stores and other filesystems
-likely to be accessed over long-haul connections.
-
-Arbitrary filesystem-specific options MAY be supported; these MUST
-be prefixed with either the filesystem schema, e.g. `hdfs.`
-or in the "fs.SCHEMA" format as normal configuration settings `fs.hdfs`). The
-latter style allows the same configuration option to be used for both
-filesystem configuration and file-specific configuration.
-
-It SHOULD be possible to always open a file without specifying any options,
-so as to present a consistent model to users. However, an implementation MAY
-opt to require one or more mandatory options to be set.
-
-The returned stream may perform "lazy" evaluation of file access. This is
-relevant for object stores where the probes for existence are expensive, and,
-even with an asynchronous open, may be considered needless.
- 
 ### `FSDataInputStreamBuilder openFile(PathHandle)`
 
-Creates a `FSDataInputStreamBuilder` to build an operation to open a file.
-Creates a [`FSDataInputStreamBuilder`](fsdatainputstreambuilder.html)
-to construct a operation to open the file identified by the given `PathHandle` for reading.
-
-When `build()` is invoked on the returned `FSDataInputStreamBuilder` instance,
-the builder parameters are verified and
-`openFileWithOptions(PathHandle, OpenFileParameters)` invoked.
-
-This (protected) operation returns a `CompletableFuture<FSDataInputStream>`
-which, when its `get()` method is called, either returns an input
-stream of the contents of opened file, or raises an exception.
-
-The base implementation of the `openFileWithOptions(PathHandle, OpenFileParameters)` method
-returns a future which invokes `open(Path, int)`.
-
-Thus the chain `openFile(pathhandle).build().get()` has the same preconditions
-and postconditions as `open(Pathhandle, int)`
-
-As with `FSDataInputStreamBuilder openFile(PathHandle)`, the `openFile()`
-call must not be where path-specific preconditions are checked -that
-is postponed to the `build()` and `get()` calls.
-
-FileSystem implementations which do not implement `open(PathHandle handle, int bufferSize)`
-MAY postpone raising an `UnsupportedOperationException` until either the
-`FSDataInputStreamBuilder.build()` or the subsequent `get()` call,
-else they MAY fail fast in the `openFile()` call.
-
-The base implementation raises this exception in the `build()` operation;
-other implementations SHOULD copy this.
+See [openFile()](openfile.html).
 
 ### `PathHandle getPathHandle(FileStatus stat, HandleOpt... options)`
 
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md
index eadba174fc1..db630e05c22 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md
@@ -13,10 +13,10 @@
 -->
 
 <!--  ============================================================= -->
-<!--  CLASS: FSDataInputStreamBuilder -->
+<!--  CLASS: FutureDataInputStreamBuilder -->
 <!--  ============================================================= -->
 
-# class `org.apache.hadoop.fs.FSDataInputStreamBuilder`
+# class `org.apache.hadoop.fs.FutureDataInputStreamBuilder`
 
 <!-- MACRO{toc|fromDepth=1|toDepth=2} -->
 
@@ -27,7 +27,7 @@ file for reading.
 
 ## Invariants
 
-The `FSDataInputStreamBuilder` interface does not require parameters or
+The `FutureDataInputStreamBuilder` interface does not require parameters or
 or the state of `FileSystem` until [`build()`](#build) is
 invoked and/or during the asynchronous open operation itself.
 
@@ -39,11 +39,11 @@ path validation.
 ## Implementation-agnostic parameters.
 
 
-### <a name="Builder.bufferSize"></a> `FSDataInputStreamBuilder bufferSize(int bufSize)`
+### <a name="Builder.bufferSize"></a> `FutureDataInputStreamBuilder bufferSize(int bufSize)`
 
 Set the size of the buffer to be used.
 
-### <a name="Builder.withFileStatus"></a> `FSDataInputStreamBuilder withFileStatus(FileStatus status)`
+### <a name="Builder.withFileStatus"></a> `FutureDataInputStreamBuilder withFileStatus(FileStatus status)`
 
 A `FileStatus` instance which refers to the file being opened.
 
@@ -53,7 +53,7 @@ So potentially saving on remote calls especially to object stores.
 Requirements:
 
 * `status != null`
-* `status.getPath()` == the resolved path of the file being opened.
+* `status.getPath().getName()` == the name of the file being opened.
 
 The path validation MUST take place if the store uses the `FileStatus` when
 it opens files, and MAY be performed otherwise. The validation
@@ -65,27 +65,85 @@ If a filesystem implementation extends the `FileStatus` returned in its
 implementation MAY use this information when opening the file.
 
 This is relevant with those stores which return version/etag information,
-including the S3A and ABFS connectors -they MAY use this to guarantee that
-the file they opened is exactly the one returned in the listing.
+-they MAY use this to guarantee that the file they opened
+is exactly the one returned in the listing.
+
+
+The final `status.getPath().getName()` element of the supplied status MUST equal
+the name value of the path supplied to the  `openFile(path)` call.
+
+Filesystems MUST NOT validate the rest of the path.
+This is needed to support viewfs and other mount-point wrapper filesystems
+where schemas and paths are different. These often create their own FileStatus results
+
+Preconditions
+
+```python
+status == null or status.getPath().getName() == path.getName()
+
+```
+
+Filesystems MUST NOT require the class of `status` to equal
+that of any specific subclass their implementation returns in filestatus/list
+operations. This is to support wrapper filesystems and serialization/deserialization
+of the status.
+
 
 ### Set optional or mandatory parameters
 
-    FSDataInputStreamBuilder opt(String key, ...)
-    FSDataInputStreamBuilder must(String key, ...)
+    FutureDataInputStreamBuilder opt(String key, ...)
+    FutureDataInputStreamBuilder must(String key, ...)
 
 Set optional or mandatory parameters to the builder. Using `opt()` or `must()`,
 client can specify FS-specific parameters without inspecting the concrete type
 of `FileSystem`.
 
+Example:
+
 ```java
 out = fs.openFile(path)
-    .opt("fs.s3a.experimental.input.fadvise", "random")
-    .must("fs.s3a.readahead.range", 256 * 1024)
+    .must("fs.option.openfile.read.policy", "random")
+    .opt("fs.http.connection.timeout", 30_000L)
     .withFileStatus(statusFromListing)
     .build()
     .get();
 ```
 
+Here the read policy of `random` has been specified,
+with the requirement that the filesystem implementation must understand the option.
+An http-specific option has been supplied which may be interpreted by any store;
+If the filesystem opening the file does not recognize the option, it can safely be
+ignored.
+
+### When to use `opt()` versus `must()`
+
+The difference between `opt()` versus `must()` is how the FileSystem opening
+the file must react to an option which it does not recognize.
+
+```python
+
+def must(name, value):
+  if not name in known_keys:
+    raise IllegalArgumentException
+  if not name in supported_keys:
+    raise UnsupportedException
+
+
+def opt(name, value):
+  if not name in known_keys:
+     # ignore option
+
+```
+
+For any known key, the validation of the `value` argument MUST be the same
+irrespective of how the (key, value) pair was declared.
+
+1. For a filesystem-specific option, it is the choice of the implementation
+   how to validate the entry.
+1. For standard options, the specification of what is a valid `value` is
+   defined in this filesystem specification, validated through contract
+   tests.
+
 #### Implementation Notes
 
 Checking for supported options must be performed in the `build()` operation.
@@ -93,9 +151,9 @@ Checking for supported options must be performed in the `build()` operation.
 1. If a mandatory parameter declared via `must(key, value)`) is not recognized,
 `IllegalArgumentException` MUST be thrown.
 
-1. If a mandatory parameter declared via `must(key, value)`) relies on
+1. If a mandatory parameter declared via `must(key, value)` relies on
 a feature which is recognized but not supported in the specific
-Filesystem/FileContext instance `UnsupportedException` MUST be thrown.
+`FileSystem`/`FileContext` instance `UnsupportedException` MUST be thrown.
 
 The behavior of resolving the conflicts between the parameters set by
 builder methods (i.e., `bufferSize()`) and `opt()`/`must()` is as follows:
@@ -110,13 +168,18 @@ custom subclasses.
 
 This is critical to ensure safe use of the feature: directory listing/
 status serialization/deserialization can result result in the `withFileStatus()`
-argumennt not being the custom subclass returned by the Filesystem instance's
+argument not being the custom subclass returned by the Filesystem instance's
 own `getFileStatus()`, `listFiles()`, `listLocatedStatus()` calls, etc.
 
 In such a situation the implementations must:
 
-1. Validate the path (always).
-1. Use the status/convert to the custom type, *or* simply discard it.
+1. Verify that `status.getPath().getName()` matches the current `path.getName()`
+   value. The rest of the path MUST NOT be validated.
+1. Use any status fields as desired -for example the file length.
+
+Even if not values of the status are used, the presence of the argument
+can be interpreted as the caller declaring that they believe the file
+to be present and of the given size.
 
 ## Builder interface
 
@@ -128,26 +191,499 @@ completed, returns an input stream which can read data from the filesystem.
 
 The `build()` operation MAY perform the validation of the file's existence,
 its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the returned
+    `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+  any of the read such as `read()` or `PositionedRead`.
 
 That is, the precondition  `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
 
-Thus, if even a file does not exist, the following call will still succeed, returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
+the following call MUST succeed, returning a `CompletableFuture` to be evaluated.
 
 ```java
 Path p = new Path("file://tmp/file-which-does-not-exist");
 
 CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
       .openFile(p)
-      .build;
+      .build();
 ```
 
-The preconditions for opening the file are checked during the asynchronous
-evaluation, and so will surface when the future is completed:
+The inability to access/read a file MUST raise an `IOException`or subclass
+in either the future's `get()` call, or, for late binding operations,
+when an operation to read data is invoked.
+
+Therefore the following sequence SHALL fail when invoked on the
+`future` returned by the previous example.
 
 ```java
-FSDataInputStream in = future.get();
+  future.get().read();
 ```
+
+Access permission checks have the same visibility requirements: permission failures
+MUST be delayed until the `get()` call and MAY be delayed into subsequent operations.
+
+Note: some operations on the input stream, such as `seek()` may not attempt any IO
+at all. Such operations MAY NOT raise exceotions when interacting with
+nonexistent/unreadable files.
+
+## <a name="options"></a> Standard `openFile()` options since Hadoop 3.3.3
+
+These are options which `FileSystem` and `FileContext` implementation
+MUST recognise and MAY support by changing the behavior of
+their input streams as appropriate.
+
+Hadoop 3.3.0 added the `openFile()` API; these standard options were defined in
+a later release. Therefore, although they are "well known", unless confident that
+the application will only be executed against releases of Hadoop which knows of
+the options -applications SHOULD set the options via `opt()` calls rather than `must()`.
+
+When opening a file through the `openFile()` builder API, callers MAY use
+both `.opt(key, value)` and `.must(key, value)` calls to set standard and
+filesystem-specific options.
+
+If set as an `opt()` parameter, unsupported "standard" options MUST be ignored,
+as MUST unrecognized standard options.
+
+If set as a `must()` parameter, unsupported "standard" options MUST be ignored.
+unrecognized standard options MUST be rejected.
+
+The standard `openFile()` options are defined
+in `org.apache.hadoop.fs.OpenFileOptions`; they all SHALL start
+with `fs.option.openfile.`.
+
+Note that while all `FileSystem`/`FileContext` instances SHALL support these
+options to the extent that `must()` declarations SHALL NOT fail, the
+implementations MAY support them to the extent of interpreting the values. This
+means that it is not a requirement for the stores to actually read the read
+policy or file length values and use them when opening files.
+
+Unless otherwise stated, they SHOULD be viewed as hints.
+
+Note: if a standard option is added such that if set but not
+supported would be an error, then implementations SHALL reject it. For example,
+the S3A filesystem client supports the ability to push down SQL commands. If
+something like that were ever standardized, then the use of the option, either
+in `opt()` or `must()` argument MUST be rejected for filesystems which don't
+support the feature.
+
+### <a name="buffer.size"></a>  Option: `fs.option.openfile.buffer.size`
+
+Read buffer size in bytes.
+
+This overrides the default value set in the configuration with the option
+`io.file.buffer.size`.
+
+It is supported by all filesystem clients which allow for stream-specific buffer
+sizes to be set via `FileSystem.open(path, buffersize)`.
+
+### <a name="read.policy"></a> Option: `fs.option.openfile.read.policy`
+
+Declare the read policy of the input stream. This is a hint as to what the
+expected read pattern of an input stream will be. This MAY control readahead,
+buffering and other optimizations.
+
+Sequential reads may be optimized with prefetching data and/or reading data in
+larger blocks. Some applications (e.g. distCp) perform sequential IO even over
+columnar data.
+
+In contrast, random IO reads data in different parts of the file using a
+sequence of `seek()/read()`
+or via the `PositionedReadable` or `ByteBufferPositionedReadable` APIs.
+
+Random IO performance may be best if little/no prefetching takes place, along
+with other possible optimizations
+
+Queries over columnar formats such as Apache ORC and Apache Parquet perform such
+random IO; other data formats may be best read with sequential or whole-file
+policies.
+
+What is key is that optimizing reads for seqential reads may impair random
+performance -and vice versa.
+
+1. The seek policy is a hint; even if declared as a `must()` option, the
+   filesystem MAY ignore it.
+1. The interpretation/implementation of a policy is a filesystem specific
+   behavior -and it may change with Hadoop releases and/or specific storage
+   subsystems.
+1. If a policy is not recognized, the filesystem client MUST ignore it.
+
+| Policy       | Meaning                                                  |
+|--------------|----------------------------------------------------------|
+| `adaptive`   | Any adaptive policy implemented by the store.            |
+| `default`    | The default policy for this store. Generally "adaptive". |
+| `random`     | Optimize for random access.                              |
+| `sequential` | Optimize for sequential access.                          |
+| `vector`     | The Vectored IO API is intended to be used.              |
+| `whole-file` | The whole file will be read.                             |
+
+Choosing the wrong read policy for an input source may be inefficient.
+
+A list of read policies MAY be supplied; the first one recognized/supported by
+the filesystem SHALL be the one used. This allows for custom policies to be
+supported, for example an `hbase-hfile` policy optimized for HBase HFiles.
+
+The S3A and ABFS input streams both implement
+the [IOStatisticsSource](iostatistics.html) API, and can be queried for their IO
+Performance.
+
+*Tip:* log the `toString()` value of input streams at `DEBUG`. The S3A and ABFS
+Input Streams log read statistics, which can provide insight about whether reads
+are being performed efficiently or not.
+
+_Futher reading_
+
+* [Linux fadvise()](https://linux.die.net/man/2/fadvise).
+* [Windows `CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior)
+
+#### <a name="read.policy."></a> Read Policy `adaptive`
+
+Try to adapt the seek policy to the read pattern of the application.
+
+The `normal` policy of the S3A client and the sole policy supported by
+the `wasb:` client are both adaptive -they assume sequential IO, but once a
+backwards seek/positioned read call is made the stream switches to random IO.
+
+Other filesystem implementations may wish to adopt similar strategies, and/or
+extend the algorithms to detect forward seeks and/or switch from random to
+sequential IO if that is considered more efficient.
+
+Adaptive read policies are the absence of the ability to
+declare the seek policy in the `open()` API, so requiring it to be declared, if
+configurable, in the cluster/application configuration. However, the switch from
+sequential to random seek policies may be exensive.
+
+When applications explicitly set the `fs.option.openfile.read.policy` option, if
+they know their read plan, they SHOULD declare which policy is most appropriate.
+
+#### <a name="read.policy.default"></a> Read Policy ``
+
+The default policy for the filesystem instance.
+Implementation/installation-specific.
+
+#### <a name="read.policy.sequential"></a> Read Policy `sequential`
+
+Expect sequential reads from the first byte read to the end of the file/until
+the stream is closed.
+
+#### <a name="read.policy.random"></a> Read Policy `random`
+
+Expect `seek()/read()` sequences, or use of `PositionedReadable`
+or `ByteBufferPositionedReadable` APIs.
+
+
+#### <a name="read.policy.vector"></a> Read Policy `vector`
+
+This declares that the caller intends to use the Vectored read API of
+[HADOOP-11867](https://issues.apache.org/jira/browse/HADOOP-11867)
+_Add a high-performance vectored read API_.
+
+This is a hint: it is not a requirement when using the API.
+It does inform the implemenations that the stream should be
+configured for optimal vectored IO performance, if such a
+feature has been implemented.
+
+It is *not* exclusive: the same stream may still be used for
+classic `InputStream` and `PositionedRead` API calls.
+Implementations SHOULD use the `random` read policy
+with these operations.
+
+#### <a name="read.policy.whole-file"></a> Read Policy `whole-file`
+
+
+This declares that the whole file is to be read end-to-end; the file system client is free to enable
+whatever strategies maximise performance for this. In particular, larger ranged reads/GETs can
+deliver high bandwidth by reducing socket/TLS setup costs and providing a connection long-lived
+enough for TCP flow control to determine the optimal download rate.
+
+Strategies can include:
+
+* Initiate an HTTP GET of the entire file in `openFile()` operation.
+* Prefech data in large blocks, possibly in parallel read operations.
+
+Applications which know that the entire file is to be read from an opened stream SHOULD declare this
+read policy.
+
+### <a name="openfile.length"></a> Option: `fs.option.openfile.length`
+
+Declare the length of a file.
+
+This can be used by clients to skip querying a remote store for the size
+of/existence of a file when opening it, similar to declaring a file status
+through the `withFileStatus()` option.
+
+If supported by a filesystem connector, this option MUST be interpreted as
+declaring the minimum length of the file:
+
+1. If the value is negative, the option SHALL be considered unset.
+2. It SHALL NOT be an error if the actual length of the file is greater than
+   this value.
+3. `read()`, `seek()` and positioned read calls MAY use a position across/beyond
+   this length but below the actual length of the file. Implementations MAY
+   raise `EOFExceptions` in such cases, or they MAY return data.
+
+If this option is used by the FileSystem implementation
+
+*Implementor's Notes*
+
+* A value of `fs.option.openfile.length` &lt; 0 MUST be rejected.
+* If a file status is supplied along with a value in `fs.opt.openfile.length`;
+  the file status values take precedence.
+
+### <a name="split.start"></a> Options: `fs.option.openfile.split.start` and `fs.option.openfile.split.end`
+
+Declare the start and end of the split when a file has been split for processing
+in pieces.
+
+1. If a value is negative, the option SHALL be considered unset.
+1. Filesystems MAY assume that the length of the file is greater than or equal
+   to the value of `fs.option.openfile.split.end`.
+1. And that they MAY raise an exception if the client application reads past the
+   value set in `fs.option.openfile.split.end`.
+1. The pair of options MAY be used to optimise the read plan, such as setting
+   the content range for GET requests, or using the split end as an implicit
+   declaration of the guaranteed minimum length of the file.
+1. If both options are set, and the split start is declared as greater than the
+   split end, then the split start SHOULD just be reset to zero, rather than
+   rejecting the operation.
+
+The split end value can provide a hint as to the end of the input stream. The
+split start can be used to optimize any initial read offset for filesystem
+clients.
+
+*Note for implementors: applications will read past the end of a split when they
+need to read to the end of a record/line which begins before the end of the
+split.
+
+Therefore clients MUST be allowed to `seek()`/`read()` past the length
+set in `fs.option.openfile.split.end` if the file is actually longer
+than that value.
+
+## <a name="s3a"></a> S3A-specific options
+
+The S3A Connector supports custom options for readahead and seek policy.
+
+| Name                                 | Type     | Meaning                                                     |
+|--------------------------------------|----------|-------------------------------------------------------------|
+| `fs.s3a.readahead.range`             | `long`   | readahead range in bytes                                    |
+| `fs.s3a.input.async.drain.threshold` | `long`   | threshold to switch to asynchronous draining of the stream  |
+| `fs.s3a.experimental.input.fadvise`  | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` |
+
+If the option set contains a SQL statement in the `fs.s3a.select.sql` statement,
+then the file is opened as an S3 Select query.
+Consult the S3A documentation for more details.
+
+## <a name="abfs"></a> ABFS-specific options
+
+The ABFS Connector supports custom input stream options.
+
+| Name                              | Type      | Meaning                                            |
+|-----------------------------------|-----------|----------------------------------------------------|
+| `fs.azure.buffered.pread.disable` | `boolean` | disable caching on the positioned read operations. |
+
+
+Disables caching on data read through the [PositionedReadable](fsdatainputstream.html#PositionedReadable)
+APIs.
+
+Consult the ABFS Documentation for more details.
+
+## <a name="examples"></a> Examples
+
+#### Declaring seek policy and split limits when opening a file.
+
+Here is an example from a proof of
+concept `org.apache.parquet.hadoop.util.HadoopInputFile`
+reader which uses a (nullable) file status and a split start/end.
+
+The `FileStatus` value is always passed in -but if it is null, then the split
+end is used to declare the length of the file.
+
+```java
+protected SeekableInputStream newStream(Path path, FileStatus stat,
+     long splitStart, long splitEnd)
+     throws IOException {
+
+   FutureDataInputStreamBuilder builder = fs.openFile(path)
+   .opt("fs.option.openfile.read.policy", "vector, random")
+   .withFileStatus(stat);
+
+   builder.opt("fs.option.openfile.split.start", splitStart);
+   builder.opt("fs.option.openfile.split.end", splitEnd);
+   CompletableFuture<FSDataInputStream> streamF = builder.build();
+   return HadoopStreams.wrap(FutureIO.awaitFuture(streamF));
+}
+```
+
+As a result, whether driven directly by a file listing, or when opening a file
+from a query plan of `(path, splitStart, splitEnd)`, there is no need to probe
+the remote store for the length of the file. When working with remote object
+stores, this can save tens to hundreds of milliseconds, even if such a probe is
+done asynchronously.
+
+If both the file length and the split end is set, then the file length MUST be
+considered "more" authoritative, that is it really SHOULD be defining the file
+length. If the split end is set, the caller MAY ot read past it.
+
+The `CompressedSplitLineReader` can read past the end of a split if it is
+partway through processing a compressed record. That is: it assumes an
+incomplete record read means that the file length is greater than the split
+length, and that it MUST read the entirety of the partially read record. Other
+readers may behave similarly.
+
+Therefore
+
+1. File length as supplied in a `FileStatus` or in `fs.option.openfile.length`
+   SHALL set the strict upper limit on the length of a file
+2. The split end as set in `fs.option.openfile.split.end` MUST be viewed as a
+   hint, rather than the strict end of the file.
+
+### Opening a file with both standard and non-standard options
+
+Standard and non-standard options MAY be combined in the same `openFile()`
+operation.
+
+```java
+Future<FSDataInputStream> f = openFile(path)
+  .must("fs.option.openfile.read.policy", "random, adaptive")
+  .opt("fs.s3a.readahead.range", 1024 * 1024)
+  .build();
+
+FSDataInputStream is = f.get();
+```
+
+The option set in `must()` MUST be understood, or at least recognized and
+ignored by all filesystems. In this example, S3A-specific option MAY be
+ignored by all other filesystem clients.
+
+### Opening a file with older releases
+
+Not all hadoop releases recognize the `fs.option.openfile.read.policy` option.
+
+The option can be safely used in application code if it is added via the `opt()`
+builder argument, as it will be treated as an unknown optional key which can
+then be discarded.
+
+```java
+Future<FSDataInputStream> f = openFile(path)
+  .opt("fs.option.openfile.read.policy", "vector, random, adaptive")
+  .build();
+
+FSDataInputStream is = f.get();
+```
+
+*Note 1* if the option name is set by a reference to a constant in
+`org.apache.hadoop.fs.Options.OpenFileOptions`, then the program will not link
+against versions of Hadoop without the specific option. Therefore for resilient
+linking against older releases -use a copy of the value.
+
+*Note 2* as option validation is performed in the FileSystem connector,
+a third-party connector designed to work with multiple hadoop versions
+MAY NOT support the option.
+
+### Passing options in to MapReduce
+
+Hadoop MapReduce will automatically read MR Job Options with the prefixes
+`mapreduce.job.input.file.option.` and `mapreduce.job.input.file.must.`
+prefixes, and apply these values as `.opt()` and `must()` respectively, after
+remove the mapreduce-specific prefixes.
+
+This makes passing options in to MR jobs straightforward. For example, to
+declare that a job should read its data using random IO:
+
+```java
+JobConf jobConf = (JobConf) job.getConfiguration()
+jobConf.set(
+    "mapreduce.job.input.file.option.fs.option.openfile.read.policy",
+    "random");
+```
+
+### MapReduce input format propagating options
+
+An example of a record reader passing in options to the file it opens.
+
+```java
+  public void initialize(InputSplit genericSplit,
+                     TaskAttemptContext context) throws IOException {
+    FileSplit split = (FileSplit)genericSplit;
+    Configuration job = context.getConfiguration();
+    start = split.getStart();
+    end = start + split.getLength();
+    Path file = split.getPath();
+
+    // open the file and seek to the start of the split
+    FutureDataInputStreamBuilder builder =
+      file.getFileSystem(job).openFile(file);
+    // the start and end of the split may be used to build
+    // an input strategy.
+    builder.opt("fs.option.openfile.split.start", start);
+    builder.opt("fs.option.openfile.split.end", end);
+    FutureIO.propagateOptions(builder, job,
+        "mapreduce.job.input.file.option",
+        "mapreduce.job.input.file.must");
+
+    fileIn = FutureIO.awaitFuture(builder.build());
+    fileIn.seek(start)
+    /* Rest of the operation on the opened stream */
+  }
+```
+
+### `FileContext.openFile`
+
+From `org.apache.hadoop.fs.AvroFSInput`; a file is opened with sequential input.
+Because the file length has already been probed for, the length is passd down
+
+```java
+  public AvroFSInput(FileContext fc, Path p) throws IOException {
+    FileStatus status = fc.getFileStatus(p);
+    this.len = status.getLen();
+    this.stream = awaitFuture(fc.openFile(p)
+        .opt("fs.option.openfile.read.policy",
+            "sequential")
+        .opt("fs.option.openfile.length",
+            Long.toString(status.getLen()))
+        .build());
+    fc.open(p);
+  }
+```
+
+In this example, the length is passed down as a string (via `Long.toString()`)
+rather than directly as a long. This is to ensure that the input format will
+link against versions of $Hadoop which do not have the
+`opt(String, long)` and `must(String, long)` builder parameters. Similarly, the
+values are passed as optional, so that if unrecognized the application will
+still succeed.
+
+### Example: reading a whole file
+
+This is from `org.apache.hadoop.util.JsonSerialization`.
+
+Its `load(FileSystem, Path, FileStatus)` method
+* declares the whole file is to be read end to end.
+* passes down the file status
+
+```java
+public T load(FileSystem fs,
+        Path path,
+        status)
+        throws IOException {
+
+ try (FSDataInputStream dataInputStream =
+          awaitFuture(fs.openFile(path)
+              .opt("fs.option.openfile.read.policy", "whole-file")
+              .withFileStatus(status)
+              .build())) {
+   return fromJsonStream(dataInputStream);
+ } catch (JsonProcessingException e) {
+   throw new PathIOException(path.toString(),
+       "Failed to read JSON file " + e, e);
+ }
+}
+```
+
+*Note:* : in Hadoop 3.3.2 and earlier, the `withFileStatus(status)` call
+required a non-null parameter; this has since been relaxed.
+For maximum compatibility across versions, only invoke the method
+when the file status is known to be non-null.
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
index a4aa136033a..e18f4c3bf4a 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
@@ -41,3 +41,4 @@ HDFS as these are commonly expected by Hadoop client applications.
 2. [Extending the specification and its tests](extending.html)
 1. [Uploading a file using Multiple Parts](multipartuploader.html)
 1. [IOStatistics](iostatistics.html)
+1. [openFile()](openfile.html).
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md
new file mode 100644
index 00000000000..afb3245c510
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md
@@ -0,0 +1,122 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# `FileSystem.openFile()`/`FileContext.openFile()`
+
+This is a method provided by both FileSystem and FileContext for
+advanced file opening options and, where implemented,
+an asynchrounous/lazy opening of a file.
+
+Creates a builder to open a file, supporting options
+both standard and filesystem specific. The return
+value of the `build()` call is a `Future<FSDataInputStream>`,
+which must be waited on. The file opening may be
+asynchronous, and it may actually be postponed (including
+permission/existence checks) until reads are actually
+performed.
+
+This API call was added to `FileSystem` and `FileContext` in
+Hadoop 3.3.0; it was tuned in Hadoop 3.3.1 as follows.
+
+* Added `opt(key, long)` and `must(key, long)`.
+* Declared that `withFileStatus(null)` is allowed.
+* Declared that `withFileStatus(status)` only checks
+  the filename of the path, not the full path.
+  This is needed to support passthrough/mounted filesystems.
+* Added standard option keys.
+
+###  <a name="openfile_path_"></a> `FutureDataInputStreamBuilder openFile(Path path)`
+
+Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html)
+to construct a operation to open the file at `path` for reading.
+
+When `build()` is invoked on the returned `FutureDataInputStreamBuilder` instance,
+the builder parameters are verified and
+`FileSystem.openFileWithOptions(Path, OpenFileParameters)` or
+`AbstractFileSystem.openFileWithOptions(Path, OpenFileParameters)` invoked.
+
+These protected methods returns a `CompletableFuture<FSDataInputStream>`
+which, when its `get()` method is called, either returns an input
+stream of the contents of opened file, or raises an exception.
+
+The base implementation of the `FileSystem.openFileWithOptions(PathHandle, OpenFileParameters)`
+ultimately invokes `FileSystem.open(Path, int)`.
+
+Thus the chain `FileSystem.openFile(path).build().get()` has the same preconditions
+and postconditions as `FileSystem.open(Path p, int bufferSize)`
+
+However, there is one difference which implementations are free to
+take advantage of:
+
+The returned stream MAY implement a lazy open where file non-existence or
+access permission failures may not surface until the first `read()` of the
+actual data.
+
+This saves network IO on object stores.
+
+The `openFile()` operation MAY check the state of the filesystem during its
+invocation, but as the state of the filesystem may change between this call and
+the actual `build()` and `get()` operations, this file-specific
+preconditions (file exists, file is readable, etc) MUST NOT be checked here.
+
+FileSystem implementations which do not implement `open(Path, int)`
+MAY postpone raising an `UnsupportedOperationException` until either the
+`FutureDataInputStreamBuilder.build()` or the subsequent `get()` call,
+else they MAY fail fast in the `openFile()` call.
+
+Consult [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html) for details
+on how to use the builder, and for standard options which may be passed in.
+
+### <a name="openfile_pathhandle_"></a> `FutureDataInputStreamBuilder openFile(PathHandle)`
+
+Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html)
+to construct a operation to open the file identified by the given `PathHandle` for reading.
+
+If implemented by a filesystem, the semantics of  [`openFile(Path)`](#openfile_path_)
+Thus the chain `openFile(pathhandle).build().get()` has the same preconditions and postconditions
+as `open(Pathhandle, int)`
+
+FileSystem implementations which do not implement `open(PathHandle handle, int bufferSize)`
+MAY postpone raising an `UnsupportedOperationException` until either the
+`FutureDataInputStreamBuilder.build()` or the subsequent `get()` call, else they MAY fail fast in
+the `openFile(PathHandle)` call.
+
+The base implementation raises this exception in the `build()` operation; other implementations
+SHOULD copy this.
+
+### Implementors notes
+
+The base implementation of `openFileWithOptions()` actually executes
+the `open(path)` operation synchronously, yet still returns the result
+or any failures in the `CompletableFuture<>`, so as to provide a consistent
+lifecycle across all filesystems.
+
+Any filesystem client where the time to open a file may be significant SHOULD
+execute it asynchronously by submitting the operation in some executor/thread
+pool. This is particularly recommended for object stores and other filesystems
+likely to be accessed over long-haul connections.
+
+Arbitrary filesystem-specific options MAY be supported; these MUST
+be prefixed with either the filesystem schema, e.g. `hdfs.`
+or in the `fs.SCHEMA` format as normal configuration settings `fs.hdfs`. The
+latter style allows the same configuration option to be used for both
+filesystem configuration and file-specific configuration.
+
+It SHOULD be possible to always open a file without specifying any options,
+so as to present a consistent model to users. However, an implementation MAY
+opt to require one or more mandatory options to be set.
+
+The returned stream may perform "lazy" evaluation of file access. This is
+relevant for object stores where the probes for existence are expensive, and,
+even with an asynchronous open, may be considered needless.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
index 3e754e4578d..c395afdb377 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
@@ -50,11 +50,11 @@ import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.DurationInfo;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
-import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
 import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 import static org.apache.hadoop.test.LambdaTestUtils.eventually;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 
 /**
  * Tests of multipart uploads.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
index a43053180fb..25bfe082b01 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
@@ -30,14 +30,18 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
 import org.apache.hadoop.io.IOUtils;
 
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.compareByteArrays;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 
 import org.junit.Test;
 
@@ -232,7 +236,7 @@ public abstract class AbstractContractOpenTest
         getFileSystem().openFile(path("testAwaitFutureFailToFNFE"))
             .opt("fs.test.something", true);
     intercept(FileNotFoundException.class,
-        () -> FutureIOSupport.awaitFuture(builder.build()));
+        () -> awaitFuture(builder.build()));
   }
 
   @Test
@@ -242,7 +246,7 @@ public abstract class AbstractContractOpenTest
         getFileSystem().openFile(path("testAwaitFutureFailToFNFE"))
             .opt("fs.test.something", true);
     intercept(FileNotFoundException.class,
-        () -> FutureIOSupport.awaitFuture(builder.build(),
+        () -> awaitFuture(builder.build(),
             10, TimeUnit.DAYS));
   }
 
@@ -250,7 +254,7 @@ public abstract class AbstractContractOpenTest
   public void testOpenFileExceptionallyTranslating() throws Throwable {
     describe("openFile missing file chains into exceptionally()");
     CompletableFuture<FSDataInputStream> f = getFileSystem()
-        .openFile(path("testOpenFileUnknownOption")).build();
+        .openFile(path("testOpenFileExceptionallyTranslating")).build();
     interceptFuture(RuntimeException.class,
         "exceptionally",
         f.exceptionally(ex -> {
@@ -262,11 +266,12 @@ public abstract class AbstractContractOpenTest
   public void testChainedFailureAwaitFuture() throws Throwable {
     describe("await Future handles chained failures");
     CompletableFuture<FSDataInputStream> f = getFileSystem()
-        .openFile(path("testOpenFileUnknownOption"))
+        .openFile(path("testChainedFailureAwaitFuture"))
+        .withFileStatus(null)
         .build();
     intercept(RuntimeException.class,
         "exceptionally",
-        () -> FutureIOSupport.awaitFuture(
+        () -> awaitFuture(
             f.exceptionally(ex -> {
               throw new RuntimeException("exceptionally", ex);
             })));
@@ -280,13 +285,34 @@ public abstract class AbstractContractOpenTest
     int len = 4096;
     createFile(fs, path, true,
         dataset(len, 0x40, 0x80));
+    FileStatus st = fs.getFileStatus(path);
     CompletableFuture<Long> readAllBytes = fs.openFile(path)
-        .withFileStatus(fs.getFileStatus(path))
+        .withFileStatus(st)
         .build()
         .thenApply(ContractTestUtils::readStream);
     assertEquals("Wrong number of bytes read value",
         len,
         (long) readAllBytes.get());
+    // now reattempt with a new FileStatus and a different path
+    // other than the final name element
+    // implementations MUST use path in openFile() call
+    FileStatus st2 = new FileStatus(
+        len, false,
+        st.getReplication(),
+        st.getBlockSize(),
+        st.getModificationTime(),
+        st.getAccessTime(),
+        st.getPermission(),
+        st.getOwner(),
+        st.getGroup(),
+        new Path("gopher:///localhost:/" + path.getName()));
+    assertEquals("Wrong number of bytes read value",
+        len,
+        (long) fs.openFile(path)
+            .withFileStatus(st2)
+            .build()
+            .thenApply(ContractTestUtils::readStream)
+            .get());
   }
 
   @Test
@@ -298,17 +324,47 @@ public abstract class AbstractContractOpenTest
         dataset(4, 0x40, 0x80));
     CompletableFuture<FSDataInputStream> future = fs.openFile(path).build();
     AtomicBoolean accepted = new AtomicBoolean(false);
-    future.thenAcceptAsync(i -> accepted.set(true)).get();
+    future.thenApply(stream -> {
+      accepted.set(true);
+      return stream;
+    }).get().close();
     assertTrue("async accept operation not invoked",
         accepted.get());
   }
 
+  /**
+   * Open a file with a null status, and the length
+   * passed in as an opt() option (along with sequential IO).
+   * The file is opened, the data read, and it must match
+   * the source data.
+   * opt() is used so that integration testing with external
+   * filesystem connectors will downgrade if the option is not
+   * recognized.
+   */
   @Test
-  public void testOpenFileNullStatus() throws Throwable {
-    describe("use openFile() with a null status");
+  public void testOpenFileNullStatusButFileLength() throws Throwable {
+    describe("use openFile() with a null status and expect the status to be"
+        + " ignored. block size, fadvise and length are passed in as"
+        + " opt() options");
     Path path = path("testOpenFileNullStatus");
-    intercept(NullPointerException.class,
-        () -> getFileSystem().openFile(path).withFileStatus(null));
+    FileSystem fs = getFileSystem();
+    int len = 4;
+    byte[] result = new byte[len];
+    byte[] dataset = dataset(len, 0x40, 0x80);
+    createFile(fs, path, true,
+        dataset);
+    CompletableFuture<FSDataInputStream> future = fs.openFile(path)
+        .withFileStatus(null)
+        .opt(FS_OPTION_OPENFILE_READ_POLICY,
+            "unknown, sequential, random")
+        .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768)
+        .opt(FS_OPTION_OPENFILE_LENGTH, len)
+        .build();
+
+    try (FSDataInputStream in = future.get()) {
+      in.readFully(result);
+    }
+    compareByteArrays(dataset, result, len);
   }
 
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index e13a49ca10e..eb56d957d9a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -1642,17 +1642,22 @@ public class ContractTestUtils extends Assert {
 
   /**
    * Read a whole stream; downgrades an IOE to a runtime exception.
+   * Closes the stream afterwards.
    * @param in input
    * @return the number of bytes read.
    * @throws AssertionError on any IOException
    */
   public static long readStream(InputStream in) {
-    long count = 0;
+    try {
+      long count = 0;
 
-    while (read(in) >= 0) {
-      count++;
+      while (read(in) >= 0) {
+        count++;
+      }
+      return count;
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, in);
     }
-    return count;
   }
 
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java
index 22f6c33d2e2..755599f0c39 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java
@@ -36,6 +36,8 @@ import org.assertj.core.api.ObjectAssert;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MIN;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
@@ -347,6 +349,24 @@ public final class IOStatisticAssertions {
         verifyStatisticsNotNull(stats).maximums());
   }
 
+  /**
+   * Assert that a duration is within a given minimum/maximum range.
+   * @param stats statistics source
+   * @param key statistic key without any suffix
+   * @param min minimum statistic must be equal to or greater than this.
+   * @param max maximum statistic must be equal to or less than this.
+   */
+  public static void assertDurationRange(
+      final IOStatistics stats,
+      final String key,
+      final long min,
+      final long max) {
+    assertThatStatisticMinimum(stats, key + SUFFIX_MIN)
+        .isGreaterThanOrEqualTo(min);
+    assertThatStatisticMaximum(stats, key + SUFFIX_MAX)
+        .isLessThanOrEqualTo(max);
+  }
+
   /**
    * Start an assertion chain on
    * a required mean statistic.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java
index 8258b62c1f7..cfde1583e2c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java
@@ -30,7 +30,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.fs.impl.FutureIOSupport;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 import org.apache.hadoop.util.functional.FunctionRaisingIOE;
@@ -276,7 +275,7 @@ public class TestDurationTracking extends AbstractHadoopTestBase {
    */
   @Test
   public void testDurationThroughEval() throws Throwable {
-    CompletableFuture<Object> eval = FutureIOSupport.eval(
+    CompletableFuture<Object> eval = FutureIO.eval(
         trackDurationOfOperation(stats, REQUESTS, () -> {
           sleepf(100);
           throw new FileNotFoundException("oops");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org