You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mt...@apache.org on 2022/06/20 22:40:38 UTC

[hadoop] branch feature-vectored-io updated: HADOOP-18106: Handle memory fragmentation in S3A Vectored IO. (#4445)

This is an automated email from the ASF dual-hosted git repository.

mthakur pushed a commit to branch feature-vectored-io
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/feature-vectored-io by this push:
     new dac8c48e914 HADOOP-18106: Handle memory fragmentation in S3A Vectored IO. (#4445)
dac8c48e914 is described below

commit dac8c48e9146e89c21293f79554268eeab0c6036
Author: Mukund Thakur <mt...@cloudera.com>
AuthorDate: Tue Jun 21 03:45:40 2022 +0530

    HADOOP-18106: Handle memory fragmentation in S3A Vectored IO. (#4445)
    
    * HADOOP-18106: Handle memory fragmentation in S3A Vectored IO implementation.
    
    part of HADOOP-18103.
    Handling memory fragmentation in S3A vectored IO implementation by
    allocating smaller user range requested size buffers and directly
    filling them from the remote S3 stream and skipping undesired
    data in between ranges.
    This patch also adds aborting active vectored reads when stream is
    closed or unbuffer() is called.
    
    Contributed By: Mukund Thakur
---
 .../org/apache/hadoop/fs/ChecksumFileSystem.java   |  11 +-
 .../main/java/org/apache/hadoop/fs/FileRange.java  |  12 ++
 .../org/apache/hadoop/fs/PositionedReadable.java   |   4 +-
 .../org/apache/hadoop/fs/RawLocalFileSystem.java   |  16 +-
 .../org/apache/hadoop/fs/StreamCapabilities.java   |   6 +
 .../hadoop/fs/{impl => }/VectoredReadUtils.java    |  91 +++++----
 .../apache/hadoop/fs/impl/CombinedFileRange.java   |   1 -
 .../apache/hadoop/fs/{ => impl}/FileRangeImpl.java |   9 +-
 .../site/markdown/filesystem/fsdatainputstream.md  |  11 +-
 .../fs/{impl => }/TestVectoredReadUtils.java       | 149 +++++++--------
 .../contract/AbstractContractVectoredReadTest.java | 198 ++++++++++++++------
 .../hadoop/fs/contract/ContractTestUtils.java      |  20 ++
 .../localfs/TestLocalFSContractVectoredRead.java   |  51 +++++
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   | 205 ++++++++++++++-------
 .../contract/s3a/ITestS3AContractVectoredRead.java |  63 ++++++-
 .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java    |  15 +-
 .../hadoop/benchmark/VectoredReadBenchmark.java    |   4 +-
 17 files changed, 609 insertions(+), 257 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
index a6bdc220ba2..1cca9fe2bfd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
-import org.apache.hadoop.fs.impl.VectoredReadUtils;
 import org.apache.hadoop.fs.impl.CombinedFileRange;
 import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
 import org.apache.hadoop.fs.impl.OpenFileParameters;
@@ -55,6 +54,7 @@ import org.apache.hadoop.util.Progressable;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
 import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
 import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
+import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges;
 
 /****************************************************************
  * Abstract Checksumed FileSystem.
@@ -166,7 +166,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
    * It verifies that data matches checksums.
    *******************************************************/
   private static class ChecksumFSInputChecker extends FSInputChecker implements
-      IOStatisticsSource {
+      IOStatisticsSource, StreamCapabilities {
     private ChecksumFileSystem fs;
     private FSDataInputStream datas;
     private FSDataInputStream sums;
@@ -408,7 +408,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       int minSeek = minSeekForVectorReads();
       int maxSize = maxReadSizeForVectorReads();
       List<CombinedFileRange> dataRanges =
-          VectoredReadUtils.sortAndMergeRanges(ranges, bytesPerSum,
+          VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum,
               minSeek, maxReadSizeForVectorReads());
       List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
           bytesPerSum, minSeek, maxSize);
@@ -435,6 +435,11 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
         }
       }
     }
+
+    @Override
+    public boolean hasCapability(String capability) {
+      return datas.hasCapability(capability);
+    }
   }
 
   private static class FSDataBoundedInputStream extends FSDataInputStream {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java
index 7388e462cc2..e55696e9650 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.fs;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.hadoop.fs.impl.FileRangeImpl;
+
 /**
  * A byte range of a file.
  * This is used for the asynchronous gather read API of
@@ -52,4 +54,14 @@ public interface FileRange {
    * @param data the future of the ByteBuffer that will have the data
    */
   void setData(CompletableFuture<ByteBuffer> data);
+
+  /**
+   * Factory method to create a FileRange object.
+   * @param offset starting offset of the range.
+   * @param length length of the range.
+   * @return a new instance of FileRangeImpl.
+   */
+  static FileRange createFileRange(long offset, int length) {
+    return new FileRangeImpl(offset, length);
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java
index 7e543ebf226..de760905127 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java
@@ -25,7 +25,6 @@ import java.util.function.IntFunction;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.impl.VectoredReadUtils;
 
 /**
  * Stream that permits positional reading.
@@ -121,7 +120,6 @@ public interface PositionedReadable {
    */
   default void readVectored(List<? extends FileRange> ranges,
                             IntFunction<ByteBuffer> allocate) throws IOException {
-    VectoredReadUtils.readVectored(this, ranges, allocate,  minSeekForVectorReads(),
-        maxReadSizeForVectorReads());
+    VectoredReadUtils.readVectored(this, ranges, allocate);
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
index 208d1668b62..f525c3cba78 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
@@ -20,7 +20,6 @@
 package org.apache.hadoop.fs;
 
 import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.fs.impl.VectoredReadUtils;
 
 import java.io.BufferedOutputStream;
 import java.io.DataOutput;
@@ -68,6 +67,7 @@ import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 
 import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
+import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges;
 import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
 import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
 import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS;
@@ -278,6 +278,7 @@ public class RawLocalFileSystem extends FileSystem {
       // new capabilities.
       switch (capability.toLowerCase(Locale.ENGLISH)) {
       case StreamCapabilities.IOSTATISTICS:
+      case StreamCapabilities.VECTOREDIO:
         return true;
       default:
         return false;
@@ -303,23 +304,24 @@ public class RawLocalFileSystem extends FileSystem {
     public void readVectored(List<? extends FileRange> ranges,
                              IntFunction<ByteBuffer> allocate) throws IOException {
 
+      List<? extends FileRange> sortedRanges = Arrays.asList(sortRanges(ranges));
       // Set up all of the futures, so that we can use them if things fail
-      for(FileRange range: ranges) {
+      for(FileRange range: sortedRanges) {
         VectoredReadUtils.validateRangeRequest(range);
         range.setData(new CompletableFuture<>());
       }
       try {
         AsynchronousFileChannel channel = getAsyncChannel();
-        ByteBuffer[] buffers = new ByteBuffer[ranges.size()];
-        AsyncHandler asyncHandler = new AsyncHandler(channel, ranges, buffers);
-        for(int i = 0; i < ranges.size(); ++i) {
-          FileRange range = ranges.get(i);
+        ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()];
+        AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers);
+        for(int i = 0; i < sortedRanges.size(); ++i) {
+          FileRange range = sortedRanges.get(i);
           buffers[i] = allocate.apply(range.getLength());
           channel.read(buffers[i], range.getOffset(), i, asyncHandler);
         }
       } catch (IOException ioe) {
         LOG.debug("Exception occurred during vectored read ", ioe);
-        for(FileRange range: ranges) {
+        for(FileRange range: sortedRanges) {
           range.getData().completeExceptionally(ioe);
         }
       }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
index 86117801950..d68ef505dc3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
@@ -80,6 +80,12 @@ public interface StreamCapabilities {
    */
   String IOSTATISTICS = "iostatistics";
 
+  /**
+   * Support for vectored IO api.
+   * See {@code PositionedReadable#readVectored(List, IntFunction)}.
+   */
+  String VECTOREDIO = "readvectored";
+
   /**
    * Stream abort() capability implemented by {@link Abortable#abort()}.
    * This matches the Path Capability
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java
similarity index 79%
rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java
rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java
index 9a16e6841dd..64107f1a18f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.fs.impl;
+package org.apache.hadoop.fs;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -28,9 +28,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.IntFunction;
 
-import org.apache.hadoop.fs.ByteBufferPositionedReadable;
-import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.impl.CombinedFileRange;
 import org.apache.hadoop.util.Preconditions;
 
 /**
@@ -68,35 +66,19 @@ public final class VectoredReadUtils {
 
 
   /**
-   * Read fully a list of file ranges asynchronously from this file.
-   * The default iterates through the ranges to read each synchronously, but
-   * the intent is that subclasses can make more efficient readers.
+   * This is the default implementation which iterates through the ranges
+   * to read each synchronously, but the intent is that subclasses
+   * can make more efficient readers.
    * The data or exceptions are pushed into {@link FileRange#getData()}.
    * @param stream the stream to read the data from
    * @param ranges the byte ranges to read
    * @param allocate the byte buffer allocation
-   * @param minimumSeek the minimum number of bytes to seek over
-   * @param maximumRead the largest number of bytes to combine into a single read
    */
   public static void readVectored(PositionedReadable stream,
                                   List<? extends FileRange> ranges,
-                                  IntFunction<ByteBuffer> allocate,
-                                  int minimumSeek,
-                                  int maximumRead) {
-    if (isOrderedDisjoint(ranges, 1, minimumSeek)) {
-      for(FileRange range: ranges) {
-        range.setData(readRangeFrom(stream, range, allocate));
-      }
-    } else {
-      for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek,
-          maximumRead)) {
-        CompletableFuture<ByteBuffer> read =
-            readRangeFrom(stream, range, allocate);
-        for(FileRange child: range.getUnderlying()) {
-          child.setData(read.thenApply(
-              (b) -> sliceTo(b, range.getOffset(), child)));
-        }
-      }
+                                  IntFunction<ByteBuffer> allocate) {
+    for (FileRange range: ranges) {
+      range.setData(readRangeFrom(stream, range, allocate));
     }
   }
 
@@ -166,7 +148,7 @@ public final class VectoredReadUtils {
                                           int chunkSize,
                                           int minimumSeek) {
     long previous = -minimumSeek;
-    for(FileRange range: input) {
+    for (FileRange range: input) {
       long offset = range.getOffset();
       long end = range.getOffset() + range.getLength();
       if (offset % chunkSize != 0 ||
@@ -209,7 +191,42 @@ public final class VectoredReadUtils {
   }
 
   /**
-   * Sort and merge ranges to optimize the access from the underlying file
+   * Check if the input ranges are overlapping in nature.
+   * We call two ranges to be overlapping when start offset
+   * of second is less than the end offset of first.
+   * End offset is calculated as start offset + length.
+   * @param input list if input ranges.
+   * @return true/false based on logic explained above.
+   */
+  public static List<? extends FileRange> validateNonOverlappingAndReturnSortedRanges(
+          List<? extends FileRange> input) {
+
+    if (input.size() <= 1) {
+      return input;
+    }
+    FileRange[] sortedRanges = sortRanges(input);
+    FileRange prev = sortedRanges[0];
+    for (int i=1; i<sortedRanges.length; i++) {
+      if (sortedRanges[i].getOffset() < prev.getOffset() + prev.getLength()) {
+        throw new UnsupportedOperationException("Overlapping ranges are not supported");
+      }
+    }
+    return Arrays.asList(sortedRanges);
+  }
+
+  /**
+   * Sort the input ranges by offset.
+   * @param input input ranges.
+   * @return sorted ranges.
+   */
+  public static FileRange[] sortRanges(List<? extends FileRange> input) {
+    FileRange[] sortedRanges = input.toArray(new FileRange[0]);
+    Arrays.sort(sortedRanges, Comparator.comparingLong(FileRange::getOffset));
+    return sortedRanges;
+  }
+
+  /**
+   * Merge sorted ranges to optimize the access from the underlying file
    * system.
    * The motivations are that:
    * <ul>
@@ -219,24 +236,22 @@ public final class VectoredReadUtils {
    *   <li>Some file systems want to round ranges to be at checksum boundaries.</li>
    * </ul>
    *
-   * @param input the list of input ranges
+   * @param sortedRanges already sorted list of ranges based on offset.
    * @param chunkSize round the start and end points to multiples of chunkSize
    * @param minimumSeek the smallest gap that we should seek over in bytes
    * @param maxSize the largest combined file range in bytes
    * @return the list of sorted CombinedFileRanges that cover the input
    */
-  public static List<CombinedFileRange> sortAndMergeRanges(List<? extends FileRange> input,
-                                                           int chunkSize,
-                                                           int minimumSeek,
-                                                           int maxSize) {
-    // sort the ranges by offset
-    FileRange[] ranges = input.toArray(new FileRange[0]);
-    Arrays.sort(ranges, Comparator.comparingLong(FileRange::getOffset));
+  public static List<CombinedFileRange> mergeSortedRanges(List<? extends FileRange> sortedRanges,
+                                                          int chunkSize,
+                                                          int minimumSeek,
+                                                          int maxSize) {
+
     CombinedFileRange current = null;
-    List<CombinedFileRange> result = new ArrayList<>(ranges.length);
+    List<CombinedFileRange> result = new ArrayList<>(sortedRanges.size());
 
     // now merge together the ones that merge
-    for(FileRange range: ranges) {
+    for (FileRange range: sortedRanges) {
       long start = roundDown(range.getOffset(), chunkSize);
       long end = roundUp(range.getOffset() + range.getLength(), chunkSize);
       if (current == null || !current.merge(start, end, range, minimumSeek, maxSize)) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
index 828a50b4f7b..516bbb2c70c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.fs.impl;
 
 import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java
similarity index 85%
rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java
rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java
index ef5851154be..041e5f0a8d2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java
@@ -15,15 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.fs;
+package org.apache.hadoop.fs.impl;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileRange;
+
 /**
  * A range of bytes from a file with an optional buffer to read those bytes
- * for zero copy.
+ * for zero copy. This shouldn't be created directly via constructor rather
+ * factory defined in {@code FileRange#createFileRange} should be used.
  */
+@InterfaceAudience.Private
 public class FileRangeImpl implements FileRange {
   private long offset;
   private int length;
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
index e4a2830967e..197b999c81f 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
@@ -449,7 +449,14 @@ Read fully data for a list of ranges asynchronously. The default implementation
 iterates through the ranges, tries to coalesce the ranges based on values of
 `minSeekForVectorReads` and `maxReadSizeForVectorReads` and then read each merged
 ranges synchronously, but the intent is sub classes can implement efficient
-implementation.
+implementation. Reading in both direct and heap byte buffers are supported.
+Also, clients are encouraged to use `WeakReferencedElasticByteBufferPool` for
+allocating buffers such that even direct buffers are garbage collected when
+they are no longer referenced.
+
+Note: Don't use direct buffers for reading from ChecksumFileSystem as that may
+lead to memory fragmentation explained in HADOOP-18296.
+
 
 #### Preconditions
 
@@ -467,7 +474,7 @@ For each requested range:
 
 ### `minSeekForVectorReads()`
 
-Smallest reasonable seek. Two ranges won't be merged together if the difference between
+The smallest reasonable seek. Two ranges won't be merged together if the difference between
 end of first and start of next range is more than this value.
 
 ### `maxReadSizeForVectorReads()`
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java
similarity index 74%
rename from hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java
rename to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java
index cfd366701be..5d08b02e113 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.fs.impl;
+package org.apache.hadoop.fs;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -31,12 +31,10 @@ import org.junit.Test;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
 
-import org.apache.hadoop.fs.ByteBufferPositionedReadable;
-import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
-import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.impl.CombinedFileRange;
 import org.apache.hadoop.test.HadoopTestBase;
 
+import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges;
 import static org.apache.hadoop.test.MoreAsserts.assertFutureCompletedSuccessfully;
 import static org.apache.hadoop.test.MoreAsserts.assertFutureFailedExceptionally;
 
@@ -56,7 +54,7 @@ public class TestVectoredReadUtils extends HadoopTestBase {
     }
     // ensure we don't make unnecessary slices
     ByteBuffer slice = VectoredReadUtils.sliceTo(buffer, 100,
-        new FileRangeImpl(100, size));
+        FileRange.createFileRange(100, size));
     Assertions.assertThat(buffer)
             .describedAs("Slicing on the same offset shouldn't " +
                     "create a new buffer")
@@ -67,7 +65,7 @@ public class TestVectoredReadUtils extends HadoopTestBase {
     final int sliceStart = 1024;
     final int sliceLength = 16 * 1024;
     slice = VectoredReadUtils.sliceTo(buffer, offset,
-        new FileRangeImpl(offset + sliceStart, sliceLength));
+        FileRange.createFileRange(offset + sliceStart, sliceLength));
     // make sure they aren't the same, but use the same backing data
     Assertions.assertThat(buffer)
             .describedAs("Slicing on new offset should " +
@@ -96,12 +94,12 @@ public class TestVectoredReadUtils extends HadoopTestBase {
 
   @Test
   public void testMerge() {
-    FileRange base = new FileRangeImpl(2000, 1000);
+    FileRange base = FileRange.createFileRange(2000, 1000);
     CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base);
 
     // test when the gap between is too big
     assertFalse("Large gap ranges shouldn't get merged", mergeBase.merge(5000, 6000,
-        new FileRangeImpl(5000, 1000), 2000, 4000));
+        FileRange.createFileRange(5000, 1000), 2000, 4000));
     assertEquals("Number of ranges in merged range shouldn't increase",
             1, mergeBase.getUnderlying().size());
     assertEquals("post merge offset", 2000, mergeBase.getOffset());
@@ -109,7 +107,7 @@ public class TestVectoredReadUtils extends HadoopTestBase {
 
     // test when the total size gets exceeded
     assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000,
-        new FileRangeImpl(5000, 1000), 2001, 3999));
+        FileRange.createFileRange(5000, 1000), 2001, 3999));
     assertEquals("Number of ranges in merged range shouldn't increase",
             1, mergeBase.getUnderlying().size());
     assertEquals("post merge offset", 2000, mergeBase.getOffset());
@@ -117,7 +115,7 @@ public class TestVectoredReadUtils extends HadoopTestBase {
 
     // test when the merge works
     assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000,
-        new FileRangeImpl(5000, 1000), 2001, 4000));
+        FileRange.createFileRange(5000, 1000), 2001, 4000));
     assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
     assertEquals("post merge offset", 2000, mergeBase.getOffset());
     assertEquals("post merge length", 4000, mergeBase.getLength());
@@ -127,7 +125,7 @@ public class TestVectoredReadUtils extends HadoopTestBase {
     assertEquals(200, mergeBase.getOffset());
     assertEquals(100, mergeBase.getLength());
     assertTrue("ranges should get merged ", mergeBase.merge(500, 600,
-        new FileRangeImpl(5000, 1000), 201, 400));
+        FileRange.createFileRange(5000, 1000), 201, 400));
     assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
     assertEquals("post merge offset", 200, mergeBase.getOffset());
     assertEquals("post merge length", 400, mergeBase.getLength());
@@ -136,42 +134,58 @@ public class TestVectoredReadUtils extends HadoopTestBase {
   @Test
   public void testSortAndMerge() {
     List<FileRange> input = Arrays.asList(
-        new FileRangeImpl(3000, 100),
-        new FileRangeImpl(2100, 100),
-        new FileRangeImpl(1000, 100)
+        FileRange.createFileRange(3000, 100),
+        FileRange.createFileRange(2100, 100),
+        FileRange.createFileRange(1000, 100)
         );
     assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
-    List<CombinedFileRange> outputList = VectoredReadUtils.sortAndMergeRanges(
-        input, 100, 1001, 2500);
-    assertEquals("merged range size", 1, outputList.size());
+    List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
+            Arrays.asList(sortRanges(input)), 100, 1001, 2500);
+    Assertions.assertThat(outputList)
+            .describedAs("merged range size")
+            .hasSize(1);
     CombinedFileRange output = outputList.get(0);
-    assertEquals("merged range underlying size", 3, output.getUnderlying().size());
+    Assertions.assertThat(output.getUnderlying())
+            .describedAs("merged range underlying size")
+            .hasSize(3);
     assertEquals("range[1000,3100)", output.toString());
     assertTrue("merged output ranges are disjoint",
             VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
 
     // the minSeek doesn't allow the first two to merge
-    assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 1000));
-    outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1000, 2100);
-    assertEquals("merged range size", 2, outputList.size());
+    assertFalse("Ranges are non disjoint",
+            VectoredReadUtils.isOrderedDisjoint(input, 100, 1000));
+    outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
+            100, 1000, 2100);
+    Assertions.assertThat(outputList)
+            .describedAs("merged range size")
+            .hasSize(2);
     assertEquals("range[1000,1100)", outputList.get(0).toString());
     assertEquals("range[2100,3100)", outputList.get(1).toString());
     assertTrue("merged output ranges are disjoint",
             VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000));
 
     // the maxSize doesn't allow the third range to merge
-    assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
-    outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1001, 2099);
-    assertEquals("merged range size", 2, outputList.size());
+    assertFalse("Ranges are non disjoint",
+            VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
+    outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
+            100, 1001, 2099);
+    Assertions.assertThat(outputList)
+            .describedAs("merged range size")
+            .hasSize(2);
     assertEquals("range[1000,2200)", outputList.get(0).toString());
     assertEquals("range[3000,3100)", outputList.get(1).toString());
     assertTrue("merged output ranges are disjoint",
             VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
 
     // test the round up and round down (the maxSize doesn't allow any merges)
-    assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 16, 700));
-    outputList = VectoredReadUtils.sortAndMergeRanges(input, 16, 1001, 100);
-    assertEquals("merged range size", 3, outputList.size());
+    assertFalse("Ranges are non disjoint",
+            VectoredReadUtils.isOrderedDisjoint(input, 16, 700));
+    outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
+            16, 1001, 100);
+    Assertions.assertThat(outputList)
+            .describedAs("merged range size")
+            .hasSize(3);
     assertEquals("range[992,1104)", outputList.get(0).toString());
     assertEquals("range[2096,2208)", outputList.get(1).toString());
     assertEquals("range[2992,3104)", outputList.get(2).toString());
@@ -182,26 +196,35 @@ public class TestVectoredReadUtils extends HadoopTestBase {
   @Test
   public void testSortAndMergeMoreCases() throws Exception {
     List<FileRange> input = Arrays.asList(
-            new FileRangeImpl(3000, 110),
-            new FileRangeImpl(3000, 100),
-            new FileRangeImpl(2100, 100),
-            new FileRangeImpl(1000, 100)
+            FileRange.createFileRange(3000, 110),
+            FileRange.createFileRange(3000, 100),
+            FileRange.createFileRange(2100, 100),
+            FileRange.createFileRange(1000, 100)
     );
-    assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
-    List<CombinedFileRange> outputList = VectoredReadUtils.sortAndMergeRanges(
-            input, 1, 1001, 2500);
-    assertEquals("merged range size", 1, outputList.size());
+    assertFalse("Ranges are non disjoint",
+            VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
+    List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
+            Arrays.asList(sortRanges(input)), 1, 1001, 2500);
+    Assertions.assertThat(outputList)
+            .describedAs("merged range size")
+            .hasSize(1);
     CombinedFileRange output = outputList.get(0);
-    assertEquals("merged range underlying size", 4, output.getUnderlying().size());
+    Assertions.assertThat(output.getUnderlying())
+            .describedAs("merged range underlying size")
+            .hasSize(4);
     assertEquals("range[1000,3110)", output.toString());
     assertTrue("merged output ranges are disjoint",
             VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
 
-    outputList = VectoredReadUtils.sortAndMergeRanges(
-            input, 100, 1001, 2500);
-    assertEquals("merged range size", 1, outputList.size());
+    outputList = VectoredReadUtils.mergeSortedRanges(
+            Arrays.asList(sortRanges(input)), 100, 1001, 2500);
+    Assertions.assertThat(outputList)
+            .describedAs("merged range size")
+            .hasSize(1);
     output = outputList.get(0);
-    assertEquals("merged range underlying size", 4, output.getUnderlying().size());
+    Assertions.assertThat(output.getUnderlying())
+            .describedAs("merged range underlying size")
+            .hasSize(4);
     assertEquals("range[1000,3200)", output.toString());
     assertTrue("merged output ranges are disjoint",
             VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
@@ -211,9 +234,9 @@ public class TestVectoredReadUtils extends HadoopTestBase {
   @Test
   public void testMaxSizeZeroDisablesMering() throws Exception {
     List<FileRange> randomRanges = Arrays.asList(
-            new FileRangeImpl(3000, 110),
-            new FileRangeImpl(3000, 100),
-            new FileRangeImpl(2100, 100)
+            FileRange.createFileRange(3000, 110),
+            FileRange.createFileRange(3000, 100),
+            FileRange.createFileRange(2100, 100)
     );
     assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0);
     assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0);
@@ -225,7 +248,7 @@ public class TestVectoredReadUtils extends HadoopTestBase {
                                                   int minimumSeek,
                                                   int maxSize) {
     List<CombinedFileRange> combinedFileRanges = VectoredReadUtils
-            .sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize);
+            .mergeSortedRanges(inputRanges, chunkSize, minimumSeek, maxSize);
     Assertions.assertThat(combinedFileRanges)
             .describedAs("Mismatch in number of ranges post merging")
             .hasSize(inputRanges.size());
@@ -251,7 +274,7 @@ public class TestVectoredReadUtils extends HadoopTestBase {
     }).when(stream).readFully(ArgumentMatchers.anyLong(),
                               ArgumentMatchers.any(ByteBuffer.class));
     CompletableFuture<ByteBuffer> result =
-        VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100),
+        VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100),
         ByteBuffer::allocate);
     assertFutureCompletedSuccessfully(result);
     ByteBuffer buffer = result.get();
@@ -267,7 +290,7 @@ public class TestVectoredReadUtils extends HadoopTestBase {
         .when(stream).readFully(ArgumentMatchers.anyLong(),
                                 ArgumentMatchers.any(ByteBuffer.class));
     result =
-        VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100),
+        VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100),
             ByteBuffer::allocate);
     assertFutureFailedExceptionally(result);
   }
@@ -286,7 +309,7 @@ public class TestVectoredReadUtils extends HadoopTestBase {
         ArgumentMatchers.any(), ArgumentMatchers.anyInt(),
         ArgumentMatchers.anyInt());
     CompletableFuture<ByteBuffer> result =
-        VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100),
+        VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100),
             allocate);
     assertFutureCompletedSuccessfully(result);
     ByteBuffer buffer = result.get();
@@ -303,7 +326,7 @@ public class TestVectoredReadUtils extends HadoopTestBase {
         ArgumentMatchers.any(), ArgumentMatchers.anyInt(),
         ArgumentMatchers.anyInt());
     result =
-        VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100),
+        VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100),
             ByteBuffer::allocate);
     assertFutureFailedExceptionally(result);
   }
@@ -328,9 +351,9 @@ public class TestVectoredReadUtils extends HadoopTestBase {
 
   @Test
   public void testReadVectored() throws Exception {
-    List<FileRange> input = Arrays.asList(new FileRangeImpl(0, 100),
-        new FileRangeImpl(100_000, 100),
-        new FileRangeImpl(200_000, 100));
+    List<FileRange> input = Arrays.asList(FileRange.createFileRange(0, 100),
+        FileRange.createFileRange(100_000, 100),
+        FileRange.createFileRange(200_000, 100));
     Stream stream = Mockito.mock(Stream.class);
     Mockito.doAnswer(invocation -> {
       fillBuffer(invocation.getArgument(1));
@@ -338,31 +361,11 @@ public class TestVectoredReadUtils extends HadoopTestBase {
     }).when(stream).readFully(ArgumentMatchers.anyLong(),
         ArgumentMatchers.any(ByteBuffer.class));
     // should not merge the ranges
-    VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 100, 100);
+    VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate);
     Mockito.verify(stream, Mockito.times(3))
         .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
     for(int b=0; b < input.size(); ++b) {
       validateBuffer("buffer " + b, input.get(b).getData().get(), 0);
     }
   }
-
-  @Test
-  public void testReadVectoredMerge() throws Exception {
-    List<FileRange> input = Arrays.asList(new FileRangeImpl(2000, 100),
-        new FileRangeImpl(1000, 100),
-        new FileRangeImpl(0, 100));
-    Stream stream = Mockito.mock(Stream.class);
-    Mockito.doAnswer(invocation -> {
-      fillBuffer(invocation.getArgument(1));
-      return null;
-    }).when(stream).readFully(ArgumentMatchers.anyLong(),
-        ArgumentMatchers.any(ByteBuffer.class));
-    // should merge the ranges into a single read
-    VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 1000, 2100);
-    Mockito.verify(stream, Mockito.times(1))
-        .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
-    for(int b=0; b < input.size(); ++b) {
-      validateBuffer("buffer " + b, input.get(b).getData().get(), (2 - b) * 1000);
-    }
-  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
index e8c86b5dbbc..77bcc496ff4 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.fs.contract;
 
 import java.io.EOFException;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -38,13 +37,18 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.impl.FutureIOSupport;
+import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
+import org.apache.hadoop.test.LambdaTestUtils;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
 
 @RunWith(Parameterized.class)
@@ -53,11 +57,14 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
   private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class);
 
   public static final int DATASET_LEN = 64 * 1024;
-  private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
+  protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
   protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
 
   private final IntFunction<ByteBuffer> allocate;
 
+  private final WeakReferencedElasticByteBufferPool pool =
+          new WeakReferencedElasticByteBufferPool();
+
   private final String bufferType;
 
   @Parameterized.Parameters(name = "Buffer type : {0}")
@@ -67,8 +74,14 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
 
   public AbstractContractVectoredReadTest(String bufferType) {
     this.bufferType = bufferType;
-    this.allocate = "array".equals(bufferType) ?
-            ByteBuffer::allocate : ByteBuffer::allocateDirect;
+    this.allocate = value -> {
+      boolean isDirect = !"array".equals(bufferType);
+      return pool.getBuffer(isDirect, value);
+    };
+  }
+
+  public IntFunction<ByteBuffer> getAllocate() {
+    return allocate;
   }
 
   @Override
@@ -79,12 +92,27 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     createFile(fs, path, true, DATASET);
   }
 
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    pool.release();
+  }
+
+  @Test
+  public void testVectoredReadCapability() throws Exception {
+    FileSystem fs = getFileSystem();
+    String[] vectoredReadCapability = new String[]{StreamCapabilities.VECTOREDIO};
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      assertCapabilities(in, vectoredReadCapability, null);
+    }
+  }
+
   @Test
   public void testVectoredReadMultipleRanges() throws Exception {
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
     for (int i = 0; i < 10; i++) {
-      FileRange fileRange = new FileRangeImpl(i * 100, 100);
+      FileRange fileRange = FileRange.createFileRange(i * 100, 100);
       fileRanges.add(fileRange);
     }
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
@@ -98,6 +126,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
       combinedFuture.get();
 
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
@@ -105,7 +134,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
   public void testVectoredReadAndReadFully()  throws Exception {
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(100, 100));
+    fileRanges.add(FileRange.createFileRange(100, 100));
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
       byte[] readFullRes = new byte[100];
@@ -114,6 +143,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
       Assertions.assertThat(vecRes)
               .describedAs("Result from vectored read and readFully must match")
               .isEqualByComparingTo(ByteBuffer.wrap(readFullRes));
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
@@ -125,12 +155,13 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
   public void testDisjointRanges() throws Exception {
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(0, 100));
-    fileRanges.add(new FileRangeImpl(4 * 1024 + 101, 100));
-    fileRanges.add(new FileRangeImpl(16 * 1024 + 101, 100));
+    fileRanges.add(FileRange.createFileRange(0, 100));
+    fileRanges.add(FileRange.createFileRange(4_000 + 101, 100));
+    fileRanges.add(FileRange.createFileRange(16_000 + 101, 100));
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
@@ -142,12 +173,13 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
   public void testAllRangesMergedIntoOne() throws Exception {
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(0, 100));
-    fileRanges.add(new FileRangeImpl(4 *1024 - 101, 100));
-    fileRanges.add(new FileRangeImpl(8*1024 - 101, 100));
+    fileRanges.add(FileRange.createFileRange(0, 100));
+    fileRanges.add(FileRange.createFileRange(4_000 - 101, 100));
+    fileRanges.add(FileRange.createFileRange(8_000 - 101, 100));
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
@@ -159,44 +191,80 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
   public void testSomeRangesMergedSomeUnmerged() throws Exception {
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(8*1024, 100));
-    fileRanges.add(new FileRangeImpl(14*1024, 100));
-    fileRanges.add(new FileRangeImpl(10*1024, 100));
-    fileRanges.add(new FileRangeImpl(2 *1024 - 101, 100));
-    fileRanges.add(new FileRangeImpl(40*1024, 1024));
-    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+    fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+    fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+    FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
+    CompletableFuture<FSDataInputStream> builder =
+            fs.openFile(path(VECTORED_READ_FILE_NAME))
+                    .withFileStatus(fileStatus)
+                    .build();
+    try (FSDataInputStream in = builder.get()) {
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
+    }
+  }
+
+  @Test
+  public void testOverlappingRanges() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = getSampleOverlappingRanges();
+    FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
+    CompletableFuture<FSDataInputStream> builder =
+            fs.openFile(path(VECTORED_READ_FILE_NAME))
+                    .withFileStatus(fileStatus)
+                    .build();
+    try (FSDataInputStream in = builder.get()) {
       in.readVectored(fileRanges, allocate);
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
   @Test
   public void testSameRanges() throws Exception {
+    // Same ranges are special case of overlapping only.
     FileSystem fs = getFileSystem();
-    List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(8*1024, 1000));
-    fileRanges.add(new FileRangeImpl(8*1024, 1000));
-    fileRanges.add(new FileRangeImpl(8*1024, 1000));
+    List<FileRange> fileRanges = getSampleSameRanges();
     CompletableFuture<FSDataInputStream> builder =
             fs.openFile(path(VECTORED_READ_FILE_NAME))
                     .build();
     try (FSDataInputStream in = builder.get()) {
       in.readVectored(fileRanges, allocate);
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
   @Test
-  public void testOverlappingRanges() throws Exception {
+  public void testSomeRandomNonOverlappingRanges() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(500, 100));
+    fileRanges.add(FileRange.createFileRange(1000, 200));
+    fileRanges.add(FileRange.createFileRange(50, 10));
+    fileRanges.add(FileRange.createFileRange(10, 5));
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
+    }
+  }
+
+  @Test
+  public void testConsecutiveRanges() throws Exception {
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(0, 1000));
-    fileRanges.add(new FileRangeImpl(90, 900));
-    fileRanges.add(new FileRangeImpl(50, 900));
-    fileRanges.add(new FileRangeImpl(10, 980));
+    fileRanges.add(FileRange.createFileRange(500, 100));
+    fileRanges.add(FileRange.createFileRange(600, 200));
+    fileRanges.add(FileRange.createFileRange(800, 100));
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
@@ -204,7 +272,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
   public void testEOFRanges()  throws Exception {
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(DATASET_LEN, 100));
+    fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100));
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
       for (FileRange res : fileRanges) {
@@ -227,22 +295,22 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
   public void testNegativeLengthRange()  throws Exception {
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(0, -50));
-    testExceptionalVectoredRead(fs, fileRanges, "Exception is expected");
+    fileRanges.add(FileRange.createFileRange(0, -50));
+    verifyExceptionalVectoredRead(fs, fileRanges, IllegalArgumentException.class);
   }
 
   @Test
   public void testNegativeOffsetRange()  throws Exception {
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(-1, 50));
-    testExceptionalVectoredRead(fs, fileRanges, "Exception is expected");
+    fileRanges.add(FileRange.createFileRange(-1, 50));
+    verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class);
   }
 
   @Test
   public void testNormalReadAfterVectoredRead() throws Exception {
     FileSystem fs = getFileSystem();
-    List<FileRange> fileRanges = createSomeOverlappingRanges();
+    List<FileRange> fileRanges = createSampleNonOverlappingRanges();
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
       // read starting 200 bytes
@@ -254,13 +322,14 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
               .describedAs("Vectored read shouldn't change file pointer.")
               .isEqualTo(200);
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
   @Test
   public void testVectoredReadAfterNormalRead() throws Exception {
     FileSystem fs = getFileSystem();
-    List<FileRange> fileRanges = createSomeOverlappingRanges();
+    List<FileRange> fileRanges = createSampleNonOverlappingRanges();
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       // read starting 200 bytes
       byte[] res = new byte[200];
@@ -272,43 +341,66 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
               .isEqualTo(200);
       in.readVectored(fileRanges, allocate);
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
   @Test
   public void testMultipleVectoredReads() throws Exception {
     FileSystem fs = getFileSystem();
-    List<FileRange> fileRanges1 = createSomeOverlappingRanges();
-    List<FileRange> fileRanges2 = createSomeOverlappingRanges();
+    List<FileRange> fileRanges1 = createSampleNonOverlappingRanges();
+    List<FileRange> fileRanges2 = createSampleNonOverlappingRanges();
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges1, allocate);
       in.readVectored(fileRanges2, allocate);
       validateVectoredReadResult(fileRanges2, DATASET);
       validateVectoredReadResult(fileRanges1, DATASET);
+      returnBuffersToPoolPostRead(fileRanges1, pool);
+      returnBuffersToPoolPostRead(fileRanges2, pool);
     }
   }
 
-  protected List<FileRange> createSomeOverlappingRanges() {
+  protected List<FileRange> createSampleNonOverlappingRanges() {
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(0, 100));
+    fileRanges.add(FileRange.createFileRange(110, 50));
+    return fileRanges;
+  }
+
+  protected List<FileRange> getSampleSameRanges() {
     List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(0, 100));
-    fileRanges.add(new FileRangeImpl(90, 50));
+    fileRanges.add(FileRange.createFileRange(8_000, 1000));
+    fileRanges.add(FileRange.createFileRange(8_000, 1000));
+    fileRanges.add(FileRange.createFileRange(8_000, 1000));
     return fileRanges;
   }
 
+  protected List<FileRange> getSampleOverlappingRanges() {
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(100, 500));
+    fileRanges.add(FileRange.createFileRange(400, 500));
+    return fileRanges;
+  }
 
-  protected void testExceptionalVectoredRead(FileSystem fs,
-                                             List<FileRange> fileRanges,
-                                             String s) throws IOException {
-    boolean exRaised = false;
-    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
-      // Can we intercept here as done in S3 tests ??
-      in.readVectored(fileRanges, allocate);
-    } catch (EOFException | IllegalArgumentException ex) {
-      // expected.
-      exRaised = true;
+  /**
+   * Validate that exceptions must be thrown during a vectored
+   * read operation with specific input ranges.
+   * @param fs FileSystem instance.
+   * @param fileRanges input file ranges.
+   * @param clazz type of exception expected.
+   * @throws Exception any other IOE.
+   */
+  protected <T extends Throwable> void verifyExceptionalVectoredRead(
+          FileSystem fs,
+          List<FileRange> fileRanges,
+          Class<T> clazz) throws Exception {
+
+    CompletableFuture<FSDataInputStream> builder =
+            fs.openFile(path(VECTORED_READ_FILE_NAME))
+                    .build();
+    try (FSDataInputStream in = builder.get()) {
+      LambdaTestUtils.intercept(clazz,
+          () -> in.readVectored(fileRanges, allocate));
     }
-    Assertions.assertThat(exRaised)
-            .describedAs(s)
-            .isTrue();
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index a316b02fb08..0bd147c43bb 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathCapabilities;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.functional.FutureIO;
 
@@ -1136,6 +1137,25 @@ public class ContractTestUtils extends Assert {
     }
   }
 
+  /**
+   * Utility to return buffers back to the pool once all
+   * data has been read for each file range.
+   * @param fileRanges list of file range.
+   * @param pool buffer pool.
+   * @throws IOException any IOE
+   * @throws TimeoutException ideally this should never occur.
+   */
+  public static void returnBuffersToPoolPostRead(List<FileRange> fileRanges,
+                                                 ByteBufferPool pool)
+          throws IOException, TimeoutException {
+    for (FileRange range : fileRanges) {
+      ByteBuffer buffer = FutureIO.awaitFuture(range.getData(),
+              VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+              TimeUnit.SECONDS);
+      pool.putBuffer(buffer);
+    }
+  }
+
 
   /**
    * Assert that the data read matches the dataset at the given offset.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
index 099e3b946d1..5d6ca3f8f0c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
@@ -18,9 +18,26 @@
 
 package org.apache.hadoop.fs.contract.localfs;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 public class TestLocalFSContractVectoredRead extends AbstractContractVectoredReadTest {
 
@@ -32,4 +49,38 @@ public class TestLocalFSContractVectoredRead extends AbstractContractVectoredRea
   protected AbstractFSContract createContract(Configuration conf) {
     return new LocalFSContract(conf);
   }
+
+  @Test
+  public void testChecksumValidationDuringVectoredRead() throws Exception {
+    Path testPath = path("big_range_checksum");
+    LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
+    final byte[] datasetCorrect = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
+    try (FSDataOutputStream out = localFs.create(testPath, true)){
+      out.write(datasetCorrect);
+    }
+    Path checksumPath = localFs.getChecksumFile(testPath);
+    Assertions.assertThat(localFs.exists(checksumPath))
+            .describedAs("Checksum file should be present")
+            .isTrue();
+    CompletableFuture<FSDataInputStream> fis = localFs.openFile(testPath).build();
+    List<FileRange> someRandomRanges = new ArrayList<>();
+    someRandomRanges.add(FileRange.createFileRange(10, 1024));
+    someRandomRanges.add(FileRange.createFileRange(1025, 1024));
+    try (FSDataInputStream in = fis.get()){
+      in.readVectored(someRandomRanges, getAllocate());
+      validateVectoredReadResult(someRandomRanges, datasetCorrect);
+    }
+    final byte[] datasetCorrupted = ContractTestUtils.dataset(DATASET_LEN, 'a', 64);
+    try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){
+      out.write(datasetCorrupted);
+    }
+    CompletableFuture<FSDataInputStream> fisN = localFs.openFile(testPath).build();
+    try (FSDataInputStream in = fisN.get()){
+      in.readVectored(someRandomRanges, getAllocate());
+      // Expect checksum exception when data is updated directly through
+      // raw local fs instance.
+      intercept(ChecksumException.class,
+          () -> validateVectoredReadResult(someRandomRanges, datasetCorrupted));
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 9d87f26c3c0..3069f172891 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -22,11 +22,13 @@ import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.IntFunction;
 
 import com.amazonaws.services.s3.model.GetObjectRequest;
@@ -46,8 +48,7 @@ import org.apache.hadoop.fs.FileRange;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.impl.CombinedFileRange;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
-import org.apache.hadoop.fs.impl.VectoredReadUtils;
+import org.apache.hadoop.fs.VectoredReadUtils;
 import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.statistics.DurationTracker;
@@ -59,9 +60,9 @@ import org.apache.hadoop.util.functional.CallableRaisingIOE;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
-import static org.apache.hadoop.fs.impl.VectoredReadUtils.isOrderedDisjoint;
-import static org.apache.hadoop.fs.impl.VectoredReadUtils.sliceTo;
-import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortAndMergeRanges;
+import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint;
+import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges;
+import static org.apache.hadoop.fs.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges;
 import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;
 import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
 import static org.apache.hadoop.util.StringUtils.toLowerCase;
@@ -107,6 +108,13 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    */
   private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024;
 
+  /**
+   * Atomic boolean variable to stop all ongoing vectored read operation
+   * for this input stream. This will be set to true when the stream is
+   * closed or unbuffer is called.
+   */
+  private final AtomicBoolean stopVectoredIOOperations = new AtomicBoolean(false);
+
   /**
    * This is the public position; the one set in {@link #seek(long)}
    * and returned in {@link #getPos()}.
@@ -589,6 +597,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     if (!closed) {
       closed = true;
       try {
+        stopVectoredIOOperations.set(true);
         // close or abort the stream; blocking
         awaitFuture(closeStream("close() operation", false, true));
         LOG.debug("Statistics of stream {}\n{}", key, streamStatistics);
@@ -940,31 +949,32 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
 
     LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges);
     checkNotClosed();
+    if (stopVectoredIOOperations.getAndSet(false)) {
+      LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+    }
+    List<? extends FileRange> sortedRanges = validateNonOverlappingAndReturnSortedRanges(ranges);
     for (FileRange range : ranges) {
       validateRangeRequest(range);
       CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
       range.setData(result);
     }
 
-    if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+    if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
       LOG.debug("Not merging the ranges as they are disjoint");
-      for(FileRange range: ranges) {
+      for (FileRange range: sortedRanges) {
         ByteBuffer buffer = allocate.apply(range.getLength());
         unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
       }
     } else {
       LOG.debug("Trying to merge the ranges as they are not disjoint");
-      List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
+      List<CombinedFileRange> combinedFileRanges = mergeSortedRanges(sortedRanges,
               1, minSeekForVectorReads(),
               maxReadSizeForVectorReads());
       LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
               ranges.size(), combinedFileRanges.size());
-      for(CombinedFileRange combinedFileRange: combinedFileRanges) {
-        CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
-        ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
-        combinedFileRange.setData(result);
+      for (CombinedFileRange combinedFileRange: combinedFileRanges) {
         unboundedThreadPool.submit(
-            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, buffer));
+            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate));
       }
     }
     LOG.debug("Finished submitting vectored read to threadpool" +
@@ -972,58 +982,102 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   }
 
   /**
-   * Read data in the combinedFileRange and update data in buffers
-   * of all underlying ranges.
-   * @param combinedFileRange combined range.
-   * @param buffer combined buffer.
+   * Read the data from S3 for the bigger combined file range and update all the
+   * underlying ranges.
+   * @param combinedFileRange big combined file range.
+   * @param allocate method to create byte buffers to hold result data.
    */
   private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange,
-                                                  ByteBuffer buffer) {
-    // Not putting read single range call inside try block as
-    // exception if any occurred during this call will be raised
-    // during awaitFuture call while getting the combined buffer.
-    readSingleRange(combinedFileRange, buffer);
+                                                  IntFunction<ByteBuffer> allocate) {
+    LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr);
+    // This reference is must be kept till all buffers are populated as this is a
+    // finalizable object which closes the internal stream when gc triggers.
+    S3Object objectRange = null;
+    S3ObjectInputStream objectContent = null;
     try {
-      // In case of single range we return the original byte buffer else
-      // we return slice byte buffers for each child ranges.
-      ByteBuffer combinedBuffer = FutureIOSupport.awaitFuture(combinedFileRange.getData());
-      if (combinedFileRange.getUnderlying().size() == 1) {
-        combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
-      } else {
-        for (FileRange child : combinedFileRange.getUnderlying()) {
-          updateOriginalRange(child, combinedBuffer, combinedFileRange);
-        }
+      checkIfVectoredIOStopped();
+      final String operationName = "readCombinedFileRange";
+      objectRange = getS3Object(operationName,
+              combinedFileRange.getOffset(),
+              combinedFileRange.getLength());
+      objectContent = objectRange.getObjectContent();
+      if (objectContent == null) {
+        throw new PathIOException(uri,
+                "Null IO stream received during " + operationName);
       }
+      populateChildBuffers(combinedFileRange, objectContent, allocate);
     } catch (Exception ex) {
-      LOG.warn("Exception occurred while reading combined range from file {}", pathStr, ex);
+      LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex);
       for(FileRange child : combinedFileRange.getUnderlying()) {
         child.getData().completeExceptionally(ex);
       }
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, objectRange, objectContent);
+    }
+    LOG.debug("Finished reading range {} from path {} ", combinedFileRange, pathStr);
+  }
+
+  /**
+   * Populate underlying buffers of the child ranges.
+   * @param combinedFileRange big combined file range.
+   * @param objectContent data from s3.
+   * @param allocate method to allocate child byte buffers.
+   * @throws IOException any IOE.
+   */
+  private void populateChildBuffers(CombinedFileRange combinedFileRange,
+                                    S3ObjectInputStream objectContent,
+                                    IntFunction<ByteBuffer> allocate) throws IOException {
+    // If the combined file range just contains a single child
+    // range, we only have to fill that one child buffer else
+    // we drain the intermediate data between consecutive ranges
+    // and fill the buffers one by one.
+    if (combinedFileRange.getUnderlying().size() == 1) {
+      FileRange child = combinedFileRange.getUnderlying().get(0);
+      ByteBuffer buffer = allocate.apply(child.getLength());
+      populateBuffer(child.getLength(), buffer, objectContent);
+      child.getData().complete(buffer);
+    } else {
+      FileRange prev = null;
+      for (FileRange child : combinedFileRange.getUnderlying()) {
+        if (prev != null && prev.getOffset() + prev.getLength() < child.getOffset()) {
+          long drainQuantity = child.getOffset() - prev.getOffset() - prev.getLength();
+          drainUnnecessaryData(objectContent, drainQuantity);
+        }
+        ByteBuffer buffer = allocate.apply(child.getLength());
+        populateBuffer(child.getLength(), buffer, objectContent);
+        child.getData().complete(buffer);
+        prev = child;
+      }
     }
   }
 
   /**
-   * Update data in child range from combined range.
-   * @param child child range.
-   * @param combinedBuffer combined buffer.
-   * @param combinedFileRange combined range.
+   * Drain unnecessary data in between ranges.
+   * @param objectContent s3 data stream.
+   * @param drainQuantity how many bytes to drain.
+   * @throws IOException any IOE.
    */
-  private void updateOriginalRange(FileRange child,
-                                   ByteBuffer combinedBuffer,
-                                   CombinedFileRange combinedFileRange) {
-    LOG.trace("Start Filling original range [{}, {}) from combined range [{}, {}) ",
-            child.getOffset(), child.getLength(),
-            combinedFileRange.getOffset(), combinedFileRange.getLength());
-    ByteBuffer childBuffer = sliceTo(combinedBuffer, combinedFileRange.getOffset(), child);
-    child.getData().complete(childBuffer);
-    LOG.trace("End Filling original range [{}, {}) from combined range [{}, {}) ",
-            child.getOffset(), child.getLength(),
-            combinedFileRange.getOffset(), combinedFileRange.getLength());
+  private void drainUnnecessaryData(S3ObjectInputStream objectContent, long drainQuantity)
+          throws IOException {
+    int drainBytes = 0;
+    int readCount;
+    while (drainBytes < drainQuantity) {
+      if (drainBytes + DRAIN_BUFFER_SIZE <= drainQuantity) {
+        byte[] drainBuffer = new byte[DRAIN_BUFFER_SIZE];
+        readCount = objectContent.read(drainBuffer);
+      } else {
+        byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)];
+        readCount = objectContent.read(drainBuffer);
+      }
+      drainBytes += readCount;
+    }
+    LOG.debug("{} bytes drained from stream ", drainBytes);
   }
 
   /**
-   *  // Check if we can use contentLength returned by http GET request.
    * Validates range parameters.
+   * In case of S3 we already have contentLength from the first GET request
+   * during an open file operation so failing fast here.
    * @param range requested range.
    * @throws EOFException end of file exception.
    */
@@ -1038,13 +1092,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   }
 
   /**
-   * TODO: Add retry in client.getObject(). not present in older reads why here??
-   * Okay retry is being done in the top layer during read.
-   * But if we do here in the top layer, one issue I am thinking is
-   * what if there is some error which happened during filling the buffer
-   * If we retry that old offsets of heap buffers can be overwritten ?
-   * I think retry should be only added in {@link S3AInputStream#getS3Object}
-   * Read data from S3 for this range and populate the bufffer.
+   * Read data from S3 for this range and populate the buffer.
    * @param range range of data to read.
    * @param buffer buffer to fill.
    */
@@ -1053,6 +1101,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     S3Object objectRange = null;
     S3ObjectInputStream objectContent = null;
     try {
+      checkIfVectoredIOStopped();
       long position = range.getOffset();
       int length = range.getLength();
       final String operationName = "readRange";
@@ -1089,6 +1138,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
       int offset = 0;
       byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE];
       while (readBytes < length) {
+        checkIfVectoredIOStopped();
         int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ?
                 TMP_BUFFER_MAX_SIZE
                 : length - readBytes;
@@ -1103,7 +1153,15 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     }
   }
 
-  public void readByteArray(S3ObjectInputStream objectContent,
+  /**
+   * Read data into destination buffer from s3 object content.
+   * @param objectContent result from S3.
+   * @param dest destination buffer.
+   * @param offset start offset of dest buffer.
+   * @param length number of bytes to fill in dest.
+   * @throws IOException any IOE.
+   */
+  private void readByteArray(S3ObjectInputStream objectContent,
                             byte[] dest,
                             int offset,
                             int length) throws IOException {
@@ -1120,14 +1178,14 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   }
 
   /**
-   * Read data from S3 using a http request.
-   * This also handles if file has been changed while http call
-   * is getting executed. If file has been changed RemoteFileChangedException
-   * is thrown.
+   * Read data from S3 using a http request with retries.
+   * This also handles if file has been changed while the
+   * http call is getting executed. If the file has been
+   * changed RemoteFileChangedException is thrown.
    * @param operationName name of the operation for which get object on S3 is called.
    * @param position position of the object to be read from S3.
    * @param length length from position of the object to be read from S3.
-   * @return S3Object
+   * @return S3Object result s3 object.
    * @throws IOException exception if any.
    */
   private S3Object getS3Object(String operationName, long position,
@@ -1140,7 +1198,11 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     Invoker invoker = context.getReadInvoker();
     try {
       objectRange = invoker.retry(operationName, pathStr, true,
-          () -> client.getObject(request));
+        () -> {
+          checkIfVectoredIOStopped();
+          return client.getObject(request);
+        });
+
     } catch (IOException ex) {
       tracker.failed();
       throw ex;
@@ -1152,6 +1214,19 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     return objectRange;
   }
 
+  /**
+   * Check if vectored io operation has been stooped. This happens
+   * when the stream is closed or unbuffer is called.
+   * @throws InterruptedIOException throw InterruptedIOException such
+   *                                that all running vectored io is
+   *                                terminated thus releasing resources.
+   */
+  private void checkIfVectoredIOStopped() throws InterruptedIOException {
+    if (stopVectoredIOOperations.get()) {
+      throw new InterruptedIOException("Stream closed or unbuffer is called");
+    }
+  }
+
   /**
    * Access the input stream statistics.
    * This is for internal testing and may be removed without warning.
@@ -1237,10 +1312,15 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   /**
    * Closes the underlying S3 stream, and merges the {@link #streamStatistics}
    * instance associated with the stream.
+   * Also sets the {@code stopVectoredIOOperations} flag to true such that
+   * active vectored read operations are terminated. However termination of
+   * old vectored reads are not guaranteed if a new vectored read operation
+   * is initiated after unbuffer is called.
    */
   @Override
   public synchronized void unbuffer() {
     try {
+      stopVectoredIOOperations.set(true);
       closeStream("unbuffer()", false, false);
     } finally {
       streamStatistics.unbuffered();
@@ -1253,6 +1333,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     case StreamCapabilities.IOSTATISTICS:
     case StreamCapabilities.READAHEAD:
     case StreamCapabilities.UNBUFFER:
+    case StreamCapabilities.VECTOREDIO:
       return true;
     default:
       return false;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
index 0752e75d247..18a727dcdce 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.io.EOFException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -26,14 +28,15 @@ import org.junit.Test;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
 import static org.apache.hadoop.test.MoreAsserts.assertEqual;
 
 public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
@@ -55,8 +58,8 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
   public void testEOFRanges() throws Exception {
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(new FileRangeImpl(DATASET_LEN, 100));
-    testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected");
+    fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100));
+    verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class);
   }
 
   @Test
@@ -99,4 +102,58 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
       }
     }
   }
+
+  @Test
+  public void testStopVectoredIoOperationsCloseStream() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = createSampleNonOverlappingRanges();
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){
+      in.readVectored(fileRanges, getAllocate());
+      in.close();
+      LambdaTestUtils.intercept(InterruptedIOException.class,
+          () -> validateVectoredReadResult(fileRanges, DATASET));
+    }
+    // reopening the stream should succeed.
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){
+      in.readVectored(fileRanges, getAllocate());
+      validateVectoredReadResult(fileRanges, DATASET);
+    }
+  }
+
+  @Test
+  public void testStopVectoredIoOperationsUnbuffer() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = createSampleNonOverlappingRanges();
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){
+      in.readVectored(fileRanges, getAllocate());
+      in.unbuffer();
+      LambdaTestUtils.intercept(InterruptedIOException.class,
+          () -> validateVectoredReadResult(fileRanges, DATASET));
+      // re-initiating the vectored reads after unbuffer should succeed.
+      in.readVectored(fileRanges, getAllocate());
+      validateVectoredReadResult(fileRanges, DATASET);
+    }
+
+  }
+
+  /**
+   * S3 vectored IO doesn't support overlapping ranges.
+   */
+  @Override
+  public void testOverlappingRanges() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = getSampleOverlappingRanges();
+    verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class);
+  }
+
+  /**
+   * S3 vectored IO doesn't support overlapping ranges.
+   */
+  @Override
+  public void testSameRanges() throws Exception {
+    // Same ranges are special case of overlapping only.
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = getSampleSameRanges();
+    verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class);
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
index 956e23a3f11..f8d47011de3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -459,13 +458,13 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
   public void test_045_vectoredIOHugeFile() throws Throwable {
     assumeHugeFileExists();
     List<FileRange> rangeList = new ArrayList<>();
-    rangeList.add(new FileRangeImpl(5856368, 1167716));
-    rangeList.add(new FileRangeImpl(3520861, 1167700));
-    rangeList.add(new FileRangeImpl(8191913, 1167775));
-    rangeList.add(new FileRangeImpl(1520861, 1167700));
-    rangeList.add(new FileRangeImpl(2520861, 116770));
-    rangeList.add(new FileRangeImpl(9191913, 116770));
-    rangeList.add(new FileRangeImpl(2820861, 156770));
+    rangeList.add(FileRange.createFileRange(5856368, 116770));
+    rangeList.add(FileRange.createFileRange(3520861, 116770));
+    rangeList.add(FileRange.createFileRange(8191913, 116770));
+    rangeList.add(FileRange.createFileRange(1520861, 116770));
+    rangeList.add(FileRange.createFileRange(2520861, 116770));
+    rangeList.add(FileRange.createFileRange(9191913, 116770));
+    rangeList.add(FileRange.createFileRange(2820861, 156770));
     IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
     FileSystem fs = getFileSystem();
     CompletableFuture<FSDataInputStream> builder =
diff --git a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java
index aaee951d72c..631842f78e2 100644
--- a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java
+++ b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.impl.FileRangeImpl;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -107,7 +107,7 @@ public class VectoredReadBenchmark {
     FSDataInputStream stream = fsChoice.fs.open(DATA_PATH);
     List<FileRange> ranges = new ArrayList<>();
     for(int m=0; m < 100; ++m) {
-      FileRangeImpl range = new FileRangeImpl(m * SEEK_SIZE, READ_SIZE);
+      FileRange range = FileRange.createFileRange(m * SEEK_SIZE, READ_SIZE);
       ranges.add(range);
     }
     stream.readVectored(ranges, bufferChoice.allocate);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org