You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/04/11 13:34:46 UTC

[GitHub] [hadoop] mehakmeet commented on a diff in pull request #2584: HADOOP-16202. Enhance openFile() for better read performance against object stores

mehakmeet commented on code in PR #2584:
URL: https://github.com/apache/hadoop/pull/2584#discussion_r847102156


##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md:
##########
@@ -0,0 +1,122 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# `FileSystem.openFile()`/`FileContext.openFile()`
+
+This is a method provided by both FileSystem and FileContext for
+advanced file opening options and, where implemented,
+an asynchrounous/lazy opening of a file.
+
+Creates a builder to open a file, supporting options
+both standard and filesystem specific. The return
+value of the `build()` call is a `Future<FSDataInputStream>`,
+which must be waited on. The file opening may be
+asynchronous, and it may actually be postponed (including
+permission/existence checks) until reads are actually
+performed.
+
+This API call was added to `FileSystem` and `FileContext` in
+Hadoop 3.3.0; it was tuned in Hadoop 3.3.1 as follows.
+
+* Added `opt(key, long)` and `must(key, long)`.
+* Declared that `withFileStatus(null)` is allowed.
+* Declared that `withFileStatus(status)` only checks
+  the filename of the path, not the full path.
+  This is needed to support passthrough/mounted filesystems.
+* Added standard option keys.
+
+###  <a name="openfile_path_"></a> `FutureDataInputStreamBuilder openFile(Path path)`
+
+Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html)
+to construct a operation to open the file at `path` for reading.
+
+When `build()` is invoked on the returned `FutureDataInputStreamBuilder` instance,
+the builder parameters are verified and
+`FileSystem.openFileWithOptions(Path, OpenFileParameters)` or
+`AbstractFileSystem.openFileWithOptions(Path, OpenFileParameters)` invoked.
+
+These protected methods returns a `CompletableFuture<FSDataInputStream>`
+which, when its `get()` method is called, either returns an input
+stream of the contents of opened file, or raises an exception.
+
+The base implementation of the `FileSystem.openFileWithOptions(PathHandle, OpenFileParameters)`
+ultimately invokes `FileSystem.open(Path, int)`.
+
+Thus the chain `FileSystem.openFile(path).build().get()` has the same preconditions
+and postconditions as `FileSystem.open(Path p, int bufferSize)`
+
+However, there is one difference which implementations are free to
+take advantage of:
+
+The returned stream MAY implement a lazy open where file non-existence or
+access permission failures may not surface until the first `read()` of the
+actual data.
+
+This saves network IO on object stores.
+
+The `openFile()` operation MAY check the state of the filesystem during its
+invocation, but as the state of the filesystem may change betwen this call and

Review Comment:
   typo: "between"



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -4799,23 +4802,26 @@ public AWSCredentialProviderList shareCredentials(final String purpose) {
   @Retries.RetryTranslated
   @AuditEntryPoint
   private FSDataInputStream select(final Path source,
-      final String expression,
       final Configuration options,
-      final Optional<S3AFileStatus> providedStatus)
+      final OpenFileSupport.OpenFileInformation fileInformation)

Review Comment:
   Javadoc correction: Remove params not needed from javadocs of this method.



##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md:
##########
@@ -0,0 +1,122 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# `FileSystem.openFile()`/`FileContext.openFile()`
+
+This is a method provided by both FileSystem and FileContext for
+advanced file opening options and, where implemented,
+an asynchrounous/lazy opening of a file.
+
+Creates a builder to open a file, supporting options
+both standard and filesystem specific. The return
+value of the `build()` call is a `Future<FSDataInputStream>`,
+which must be waited on. The file opening may be
+asynchronous, and it may actually be postponed (including
+permission/existence checks) until reads are actually
+performed.
+
+This API call was added to `FileSystem` and `FileContext` in
+Hadoop 3.3.0; it was tuned in Hadoop 3.3.1 as follows.
+
+* Added `opt(key, long)` and `must(key, long)`.
+* Declared that `withFileStatus(null)` is allowed.
+* Declared that `withFileStatus(status)` only checks
+  the filename of the path, not the full path.
+  This is needed to support passthrough/mounted filesystems.
+* Added standard option keys.
+
+###  <a name="openfile_path_"></a> `FutureDataInputStreamBuilder openFile(Path path)`
+
+Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html)
+to construct a operation to open the file at `path` for reading.
+
+When `build()` is invoked on the returned `FutureDataInputStreamBuilder` instance,
+the builder parameters are verified and
+`FileSystem.openFileWithOptions(Path, OpenFileParameters)` or
+`AbstractFileSystem.openFileWithOptions(Path, OpenFileParameters)` invoked.
+
+These protected methods returns a `CompletableFuture<FSDataInputStream>`
+which, when its `get()` method is called, either returns an input
+stream of the contents of opened file, or raises an exception.
+
+The base implementation of the `FileSystem.openFileWithOptions(PathHandle, OpenFileParameters)`
+ultimately invokes `FileSystem.open(Path, int)`.
+
+Thus the chain `FileSystem.openFile(path).build().get()` has the same preconditions
+and postconditions as `FileSystem.open(Path p, int bufferSize)`
+
+However, there is one difference which implementations are free to
+take advantage of:
+
+The returned stream MAY implement a lazy open where file non-existence or
+access permission failures may not surface until the first `read()` of the
+actual data.
+
+This saves network IO on object stores.
+
+The `openFile()` operation MAY check the state of the filesystem during its
+invocation, but as the state of the filesystem may change betwen this call and
+the actual `build()` and `get()` operations, this file-specific
+preconditions (file exists, file is readable, etc) MUST NOT be checked here.
+
+FileSystem implementations which do not implement `open(Path, int)`
+MAY postpone raising an `UnsupportedOperationException` until either the
+`FutureDataInputStreamBuilder.build()` or the subsequent `get()` call,
+else they MAY fail fast in the `openFile()` call.
+
+Consult [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html) for details
+on how to use the builder, and for standard options which may be pssed in.

Review Comment:
   typo: "passed"



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete
+   */
+  @Test
+  public void testUnknownSeekPolicyS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "undefined")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+  }
+
+  /**
+   * The S3A option also supports a list of values.
+   */
+  @Test
+  public void testSeekPolicyListS3AOption() throws Throwable {
+    // fall back to the normal seek policy.

Review Comment:
   nit: Correct the comment. Since we aren't falling back, but using one of the listed policy.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete

Review Comment:
   nit: "Delete" is a mistype?



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete
+   */
+  @Test
+  public void testUnknownSeekPolicyS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "undefined")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+  }
+
+  /**
+   * The S3A option also supports a list of values.
+   */
+  @Test
+  public void testSeekPolicyListS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "hbase, random")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  /**
+   * Verify that if a list of policies is supplied in a configuration,
+   * the first recognized policy will be adopted.
+   */
+  @Test
+  public void testSeekPolicyExtractionFromList() throws Throwable {
+    String plist = "a, b, RandOm, other ";
+    Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
+    Collection<String> options = conf.getTrimmedStringCollection(
+        FS_OPTION_OPENFILE_READ_POLICY);
+    Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, null))
+        .describedAs("Policy from " + plist)
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  @Test
+  public void testAdaptiveSeekPolicyRecognized() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
+        .describedAs("adaptive")
+        .isEqualTo(S3AInputPolicy.Normal);

Review Comment:
   Since at the moment normal, default, and adaptive all map to normal, shouldn't we also verify if the boolean isAdaptive is also true in this case? 



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete
+   */
+  @Test
+  public void testUnknownSeekPolicyS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "undefined")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+  }
+
+  /**
+   * The S3A option also supports a list of values.
+   */
+  @Test
+  public void testSeekPolicyListS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "hbase, random")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  /**
+   * Verify that if a list of policies is supplied in a configuration,
+   * the first recognized policy will be adopted.
+   */
+  @Test
+  public void testSeekPolicyExtractionFromList() throws Throwable {
+    String plist = "a, b, RandOm, other ";

Review Comment:
   Would be better to include valid policies in this list to show the first valid policy is picked up.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete
+   */
+  @Test
+  public void testUnknownSeekPolicyS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "undefined")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+  }
+
+  /**
+   * The S3A option also supports a list of values.
+   */
+  @Test
+  public void testSeekPolicyListS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "hbase, random")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  /**
+   * Verify that if a list of policies is supplied in a configuration,
+   * the first recognized policy will be adopted.
+   */
+  @Test
+  public void testSeekPolicyExtractionFromList() throws Throwable {
+    String plist = "a, b, RandOm, other ";
+    Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
+    Collection<String> options = conf.getTrimmedStringCollection(
+        FS_OPTION_OPENFILE_READ_POLICY);
+    Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, null))
+        .describedAs("Policy from " + plist)
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  @Test
+  public void testAdaptiveSeekPolicyRecognized() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
+        .describedAs("adaptive")
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  @Test
+  public void testUnknownSeekPolicyFallback() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null))
+        .describedAs("unkown policy")
+        .isNull();
+  }
+
+  /**
+   * Test the mapping of the standard option names.
+   */
+  @Test
+  public void testInputPolicyMapping() throws Throwable {
+    Object[][] policyMapping = {
+        {"normal", S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_RANDOM, S3AInputPolicy.Random},
+        {FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, S3AInputPolicy.Sequential},
+    };
+    for (Object[] mapping : policyMapping) {
+      String name = (String) mapping[0];
+      Assertions.assertThat(S3AInputPolicy.getPolicy(name, null))
+          .describedAs("Policy %s", name)
+          .isEqualTo(mapping[1]);
+    }
+  }
+
+  /**
+   * Verify readahead range is picked up.
+   */
+  @Test
+  public void testReadahead() throws Throwable {
+    // readahead range option
+    assertOpenFile(READAHEAD_RANGE, "4096")
+        .extracting(f -> f.getReadAheadRange())
+        .isEqualTo(4096L);
+  }
+
+  /** is
+   * Verify readahead range is picked up.
+   */
+  @Test
+  public void testBufferSize() throws Throwable {
+    // readahead range option

Review Comment:
   nit: bufferSize.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete
+   */
+  @Test
+  public void testUnknownSeekPolicyS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "undefined")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+  }
+
+  /**
+   * The S3A option also supports a list of values.
+   */
+  @Test
+  public void testSeekPolicyListS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "hbase, random")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  /**
+   * Verify that if a list of policies is supplied in a configuration,
+   * the first recognized policy will be adopted.
+   */
+  @Test
+  public void testSeekPolicyExtractionFromList() throws Throwable {
+    String plist = "a, b, RandOm, other ";
+    Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
+    Collection<String> options = conf.getTrimmedStringCollection(
+        FS_OPTION_OPENFILE_READ_POLICY);
+    Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, null))
+        .describedAs("Policy from " + plist)
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  @Test
+  public void testAdaptiveSeekPolicyRecognized() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
+        .describedAs("adaptive")
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  @Test
+  public void testUnknownSeekPolicyFallback() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null))
+        .describedAs("unkown policy")
+        .isNull();
+  }
+
+  /**
+   * Test the mapping of the standard option names.
+   */
+  @Test
+  public void testInputPolicyMapping() throws Throwable {
+    Object[][] policyMapping = {
+        {"normal", S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_RANDOM, S3AInputPolicy.Random},
+        {FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, S3AInputPolicy.Sequential},
+    };
+    for (Object[] mapping : policyMapping) {
+      String name = (String) mapping[0];
+      Assertions.assertThat(S3AInputPolicy.getPolicy(name, null))
+          .describedAs("Policy %s", name)
+          .isEqualTo(mapping[1]);
+    }
+  }
+
+  /**
+   * Verify readahead range is picked up.
+   */
+  @Test
+  public void testReadahead() throws Throwable {
+    // readahead range option
+    assertOpenFile(READAHEAD_RANGE, "4096")
+        .extracting(f -> f.getReadAheadRange())
+        .isEqualTo(4096L);
+  }
+
+  /** is
+   * Verify readahead range is picked up.
+   */
+  @Test

Review Comment:
   nit: Javadoc correction.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+
+import java.io.EOFException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
+import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_IO;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Cost of openFile().
+ */
+public class ITestS3AOpenCost extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AOpenCost.class);
+
+  private Path testFile;
+
+  private FileStatus testFileStatus;
+
+  private long fileLength;
+
+  public ITestS3AOpenCost() {
+    super(true);
+  }
+
+  /**
+   * Setup creates a test file, saves is status and length
+   * to fields.
+   */
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    S3AFileSystem fs = getFileSystem();
+    testFile = methodPath();
+
+    writeTextFile(fs, testFile, "openfile", true);
+    testFileStatus = fs.getFileStatus(testFile);
+    fileLength = testFileStatus.getLen();
+  }
+
+  /**
+   * Test when openFile() performs GET requests when file status
+   * and length options are passed down.
+   * Note that the input streams only update the FS statistics
+   * in close(), so metrics cannot be verified until all operations
+   * on a stream are complete.
+   * This is slightly less than ideal.
+   */
+  @Test
+  public void testOpenFileWithStatusOfOtherFS() throws Throwable {
+    describe("Test cost of openFile with/without status; raw only");
+    S3AFileSystem fs = getFileSystem();
+
+    // now read that file back in using the openFile call.
+    // with a new FileStatus and a different path.
+    // this verifies that any FileStatus class/subclass is used
+    // as a source of the file length.
+    FileStatus st2 = new FileStatus(
+        fileLength, false,
+        testFileStatus.getReplication(),
+        testFileStatus.getBlockSize(),
+        testFileStatus.getModificationTime(),
+        testFileStatus.getAccessTime(),
+        testFileStatus.getPermission(),
+        testFileStatus.getOwner(),
+        testFileStatus.getGroup(),
+        new Path("gopher:///localhost/" + testFile.getName()));
+
+    // no IO in open
+    FSDataInputStream in = verifyMetrics(() ->
+            fs.openFile(testFile)
+                .withFileStatus(st2)
+                .build()
+                .get(),
+        always(NO_IO),
+        with(STREAM_READ_OPENED, 0));
+
+    // the stream gets opened during read
+    long readLen = verifyMetrics(() ->
+            readStream(in),
+        always(NO_IO),
+        with(STREAM_READ_OPENED, 1));
+    assertEquals("bytes read from file", fileLength, readLen);
+  }
+
+  @Test
+  public void testOpenFileShorterLength() throws Throwable {
+    // do a second read with the length declared as short.
+    // we now expect the bytes read to be shorter.
+    S3AFileSystem fs = getFileSystem();
+
+    S3ATestUtils.MetricDiff bytesDiscarded =
+        new S3ATestUtils.MetricDiff(fs, STREAM_READ_BYTES_READ_CLOSE);
+    int offset = 2;
+    long shortLen = fileLength - offset;
+    // open the file
+    FSDataInputStream in2 = verifyMetrics(() ->
+            fs.openFile(testFile)
+                .must(FS_OPTION_OPENFILE_READ_POLICY,
+                    FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+                .opt(FS_OPTION_OPENFILE_LENGTH, shortLen)
+                .build()
+                .get(),
+        always(NO_IO),
+        with(STREAM_READ_OPENED, 0));
+
+    // verify that the statistics are in rane

Review Comment:
   nit: "range"



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete
+   */
+  @Test
+  public void testUnknownSeekPolicyS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "undefined")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+  }
+
+  /**
+   * The S3A option also supports a list of values.
+   */
+  @Test
+  public void testSeekPolicyListS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "hbase, random")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  /**
+   * Verify that if a list of policies is supplied in a configuration,
+   * the first recognized policy will be adopted.
+   */
+  @Test
+  public void testSeekPolicyExtractionFromList() throws Throwable {
+    String plist = "a, b, RandOm, other ";
+    Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
+    Collection<String> options = conf.getTrimmedStringCollection(
+        FS_OPTION_OPENFILE_READ_POLICY);
+    Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, null))
+        .describedAs("Policy from " + plist)
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  @Test
+  public void testAdaptiveSeekPolicyRecognized() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
+        .describedAs("adaptive")
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  @Test
+  public void testUnknownSeekPolicyFallback() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null))
+        .describedAs("unkown policy")

Review Comment:
   typo: "unknown"



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+
+import java.io.EOFException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
+import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_IO;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Cost of openFile().
+ */
+public class ITestS3AOpenCost extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AOpenCost.class);
+
+  private Path testFile;
+
+  private FileStatus testFileStatus;
+
+  private long fileLength;
+
+  public ITestS3AOpenCost() {
+    super(true);
+  }
+
+  /**
+   * Setup creates a test file, saves is status and length
+   * to fields.
+   */
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    S3AFileSystem fs = getFileSystem();
+    testFile = methodPath();
+
+    writeTextFile(fs, testFile, "openfile", true);
+    testFileStatus = fs.getFileStatus(testFile);
+    fileLength = testFileStatus.getLen();
+  }
+
+  /**
+   * Test when openFile() performs GET requests when file status
+   * and length options are passed down.
+   * Note that the input streams only update the FS statistics
+   * in close(), so metrics cannot be verified until all operations
+   * on a stream are complete.
+   * This is slightly less than ideal.
+   */
+  @Test
+  public void testOpenFileWithStatusOfOtherFS() throws Throwable {
+    describe("Test cost of openFile with/without status; raw only");
+    S3AFileSystem fs = getFileSystem();
+
+    // now read that file back in using the openFile call.
+    // with a new FileStatus and a different path.
+    // this verifies that any FileStatus class/subclass is used
+    // as a source of the file length.
+    FileStatus st2 = new FileStatus(
+        fileLength, false,
+        testFileStatus.getReplication(),
+        testFileStatus.getBlockSize(),
+        testFileStatus.getModificationTime(),
+        testFileStatus.getAccessTime(),
+        testFileStatus.getPermission(),
+        testFileStatus.getOwner(),
+        testFileStatus.getGroup(),
+        new Path("gopher:///localhost/" + testFile.getName()));
+
+    // no IO in open
+    FSDataInputStream in = verifyMetrics(() ->
+            fs.openFile(testFile)
+                .withFileStatus(st2)
+                .build()
+                .get(),
+        always(NO_IO),
+        with(STREAM_READ_OPENED, 0));
+
+    // the stream gets opened during read
+    long readLen = verifyMetrics(() ->
+            readStream(in),
+        always(NO_IO),

Review Comment:
   Little doubt here: When we read after opening the file, are we still on 0 IO?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org