You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mt...@apache.org on 2022/05/02 22:53:07 UTC

[hadoop] branch feature-vectored-io updated (d441c0f64f9 -> 7d1a2c3e288)

This is an automated email from the ASF dual-hosted git repository.

mthakur pushed a change to branch feature-vectored-io
in repository https://gitbox.apache.org/repos/asf/hadoop.git


 discard d441c0f64f9 HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads (#3964)
    omit 87682c976f0 HADOOP-11867. Add a high-performance vectored read API. (#3904)
     add 0463498adcd YARN-10944. AbstractCSQueue: Eliminate code duplication in overloaded versions of setMaxCapacity. Contributed by Andras Gyori
     add 365375412fe YARN-10590. Consider legacy auto queue creation absolute resource template to avoid rounding errors. Contributed by Andras Gyori
     add c18b646020a HADOOP-18071. ABFS: Set driver global timeout for ITestAzureBlobFileSystemBasics (#3866)
     add 6b07c851f3f HDFS-16397. Reconfig slow disk parameters for datanode (#3828)
     add 12fa38d546e HADOOP-18139: Allow configuration of zookeeper server principal.
     add 8aa568cea50 [SPS]: Fix bug for unit test of reconfiguring SPS mode (#3998)
     add b2f541a7492 Revert "[SPS]: Fix bug for unit test of reconfiguring SPS mode (#3998)" (#4038)
     add 5eab9719cbf HDFS-16480. Fix typo: indicies -> indices (#4020)
     add d05655d2adc Revert "HADOOP-18082.Add debug log when RPC#Reader gets a Call. (#3891). Contributed by JiangHua Zhu."
     add b56af00114c HADOOP-18075. ABFS: Fix failure caused by listFiles() in ITestAbfsRestOperationException  (#4040)
     add 91997872155 HDFS-16458. [SPS]: Fix bug for unit test of reconfiguring SPS mode (#4041)
     add 9e475aede6a YARN-10983. Follow-up changes for YARN-10904. Contributed by Benjamin Teke
     add 902a7935e97 HADOOP-18128. Fix typo issues of outputstream.md (#4025)
     add 356d337d1e5 YARN-11042. Fix testQueueSubmitWithACLsEnabledWithQueueMapping in TestAppManager. Contributed by Tamas Domok
     add f800b65b40e Make upstream aware of 3.3.2 release
     add 22fe79cee3a YARN-11076. Upgrade jQuery version in Yarn UI2. (#4046)
     add 1c27c69f44e Revert "YARN-10947. Simplify AbstractCSQueue#initializeQueueState. Contributed by Andras Gyori"
     add 379baa5eb65 YARN-10947. Simplify AbstractCSQueue#initializeQueueState. Contributed by Andras Gyori
     add 6995374b54e HADOOP-18150. Fix ITestAuditManagerDisabled test in S3A. (#4044)
     add 278568203b9 HDFS-16481. Provide support to set Http and Rpc ports in MiniJournalCluster (#4028). Contributed by Viraj Jasani.
     add 19561846dfb HADOOP-18151. Switch the baseurl for Centos 8 (#4047)
     add 8f07081789e HDFS-16462. Make HDFS get tool cross platform (#4003)
     add 851a4dc2d90 YARN-11049. MutableConfScheduler is referred as plain String instead of class name. Contributed by Szilard Nemeth
     add da5a7740189 HADOOP-17563. Upgrade BouncyCastle to 1.68 (#3980)
     add 66b72406bd8 HADOOP-18131. Upgrade maven enforcer plugin and relevant dependencies (#4000)
     add 49e4e4d0a50 HDFS-16496. Snapshot diff on snapshotable directory fails with not snapshottable error (#4051)
     add 56d807175d8 YARN-11081. TestYarnConfigurationFields consistently keeps failing (#4027)
     add 2ece95064b9 YARN-10945. Add javadoc to all methods of AbstractCSQueue. Contributed by Andras Gyori
     add db8ae4b6544 YARN-10918. Simplify method: CapacitySchedulerQueueManager#parseQueue. Contributed by Andras Gyori
     add db36747e831 HADOOP-17526 Use Slf4jRequestLog for HttpRequestLog (#4050)
     add 9539ff108aa YARN-11052. Improve code quality in TestRMWebServicesNodeLabels. Contributed by Szilard Nemeth
     add 383b73417df YARN-11036. Do not inherit from TestRMWebServicesCapacitySched. Contributed by Tamas Domok
     add d0fa9b57751 HADOOP-18155. Refactor tests in TestFileUtil (#4053)
     add 481da19494f YARN-10049. FIFOOrderingPolicy Improvements. Contributed by Benjamin Teke
     add ed65aa23240 YARN-11067. Resource overcommitment due to incorrect resource normalisation logical order. Contributed by Andras Gyori
     add 672e380c4f6 HADOOP-18112: Implement paging during multi object delete. (#4045)
     add a32cfc21693 HDFS-15382. Split one FsDatasetImpl lock to block pool grain locks. (#3941). Contributed by limingxiang.
     add 7b5eac27ff3 HDFS-16495: RBF should prepend the client ip rather than append it.
     add 8b8158f02df HADOOP-18144: getTrashRoot in ViewFileSystem should return a path in ViewFS.
     add 1c0bc35305a HDFS-16502. Reconfigure Block Invalidate limit (#4064)
     add a237526988b HDFS-16494.Removed reuse of AvailableSpaceVolumeChoosingPolicy#initLocks(). (#4048). Contributed by JiangHua Zhu.
     add 7f6a891f031 HDFS-16499. [SPS]: Should not start indefinitely while another SPS process is running (#4058)
     add 9037f9a3349 HADOOP-18162. hadoop-common support for MAPREDUCE-7341 Manifest Committer
     add 7328c34ba50 MAPREDUCE-7341. Add an intermediate manifest committer for Azure and GCS
     add 8294bd5a37c HADOOP-18163. hadoop-azure support for the Manifest Committer of MAPREDUCE-7341
     add da9970dd697 HADOOP-18129: Change URI to String in INodeLink to reduce memory footprint of ViewFileSystem
     add a631f45a99c HDFS-16470. Make HDFS find tool cross platform (#4076)
     add 62d59e516ee HDFS-16504. Add parameter for NameNode to process getBloks request. (#4068). Contributed by Max Xie.
     add e5549a2a68a HDFS-16503. Should verify whether the path name is valid in the WebHDFS (#4067). Contributed by tomscut.
     add 4537b34e1c0 YARN-11089. Fix typo in RM audit log. Contributed by Junfan Zhang.
     add e2701e227f8 YARN-11086. Add space in debug log of ParentQueue. Contributed by Junfan Zhang.
     add c3124a3f6e9 YARN-10565. Refactor CS queue initialization to simplify weight mode calculation. Contributed by Benjamin Teke.
     add 1d5650c4d0a HDFS-13248: Namenode needs to use the actual client IP when going through the RBF proxy. There is a new configuration knob dfs.namenode.ip-proxy-users that configures the list of users than can set their client ip address using the client context.
     add 2beb7296fb4 YARN-11087. Introduce the config to control the refresh interval in RMDelegatedNodeLabelsUpdater. Contributed by Junfan Zhang.
     add 708a0ce21bc HADOOP-13704. Optimized S3A getContentSummary()
     add 88975496d8a HDFS-14617. Improve oiv tool to parse fsimage file in parallel with delimited format. (#2918). Contributed by Hongbing Wang.
     add 59d07bdcc36 HADOOP-18160 Avoid shading wildfly.openssl runtime dependency (#4074)
     add 81879eb9cbf HDFS-16471. Make HDFS ls tool cross platform (#4086)
     add 26ba3846cc8 Revert "HDFS-14617. Improve oiv tool to parse fsimage file in parallel with delimited format. (#2918). Contributed by Hongbing Wang."
     add ef8bff0df9f HDFS-15987. Improve oiv tool to parse fsimage file in parallel with delimited format. (#2918). Contributed by Hongbing Wang.
     add 45ce1cce50c HDFS-16501. Print the exception when reporting a bad block (#4062)
     add 921267ca310 YARN-11084. Introduce new config to specify AM default node-label when not specified. Contributed by Junfan Zhang.
     add 9edfe30a60f HADOOP-14661. Add S3 requester pays bucket support to S3A (#3962)
     add 077c6c62d6c YARN-10547. Decouple job parsing logic from SLSRunner. Contributed by Szilard Nemeth.
     add 526142447ab YARN-10552. Eliminate code duplication in SLSCapacityScheduler and SLSFairScheduler. Contributed by Szilard Nemeth.
     add ffa0eab4886 YARN-11094. Follow up changes for YARN-10547. Contributed by Szilard Nemeth
     add 565e848d88c HDFS-16434. Add opname to read/write lock for remaining operations (#3915)
     add 08a77a765ba YARN-10548. Decouple AM runner logic from SLSRunner. Contributed by Szilard Nemeth.
     add da09d68056d YARN-11069. Dynamic Queue ACL handling in Legacy and Flexible Auto Created Queues. Contributed by Tamas Domok
     add 61e809b245c HADOOP-13386. Upgrade Avro to 1.9.2 (#3990)
     add adbaf48082d YARN-11100. Fix StackOverflowError in SLS scheduler event handling. Contributed by Szilard Nemeth.
     add 046a6204b4a HDFS-16355. Improve the description of dfs.block.scanner.volume.bytes.per.second (#3724)
     add 10876333acd Make upstream aware of 3.2.3 release.
     add 0fbd96a2449 Make upstream aware of 3.2.3 release.
     add eb164213863 HDFS-16517 Distance metric is wrong for non-DN machines in 2.10. Fixed in HADOOP-16161, but this test case adds value to ensure the two getWeight methods stay in sync.
     add a9b43966c07 HDFS-16518: Add shutdownhook to invalidate the KeyProviders in the cache
     add e386d6a6617 YARN-10549. Decouple RM runner logic from SLSRunner. Contributed by Szilard Nemeth.
     add 4e32318acee HDFS-16523. Fix dependency error in hadoop-hdfs on M1 Mac (#4112)
     add 6eea28c3f38 HDFS-16498. Fix NPE for checkBlockReportLease #4057. Contributed by tomscut.
     add 08e6d0ce608 HADOOP-18145. Fileutil's unzip method causes unzipped files to lose their original permissions (#4036)
     add dc4a680da8b MAPREDUCE-7373. Building MapReduce NativeTask fails on Fedora 34+ (#4120)
     add ac50657c378 HDFS-16413. Reconfig dfs usage parameters for datanode (#3863)
     add 6e00a799e77 YARN-11106. Fix the test failure due to missing conf of yarn.resourcemanager.node-labels.am.default-node-label-expression. Contributed by Junfan Zhang
     add ab8c3606204 YARN-10550. Decouple NM runner logic from SLSRunner. Contributed by Szilard Nemeth
     add 2bf78e24168 HDFS-16511. Improve lock type for ReplicaMap under fine-grain lock mode. (#4085). Contributed by limingxiang.
     add 9a4dddd6408 HDFS-16507. [SBN read] Avoid purging edit log which is in progress (#4082)
     add e044a46f97d YARN-11088. Introduce the config to control the AM allocated to non-exclusive nodes. Contributed by Junfan Zhang
     add 94031b729d0 YARN-11103. SLS cleanup after previously merged SLS refactor jiras. Contributed by Szilard Nemeth
     add 15a5ea2c955 HADOOP-18169. getDelegationTokens in ViewFs should also fetch the token from fallback FS (#4094)
     add 4b1a6bfb10c YARN-11102. Fix spotbugs error in hadoop-sls module. Contributed by Szilard Nemeth, Andras Gyori.
     add 34b3275bf4b HDFS-16477. [SPS]: Add metric PendingSPSPaths for getting the number of paths to be processed by SPS (#4009). Contributed by  tomscut.
     add 4ef1d3eef93 HDFS-16472. Make HDFS setrep tool cross platform (#4130)
     add 966b773a7c9 HDFS-16527. Add global timeout rule for TestRouterDistCpProcedure (#4129)
     add bbfe3500cf7 HDFS-16530. setReplication debug log creates a new string even if debug is disabled (#4142)
     add 61bbdfd3a73 HDFS-16529. Remove unnecessary setObserverRead in TestConsistentReadsObserver (#4131)
     add 7c20602b177 HDFS-16522. Set Http and Ipc ports for Datanodes in MiniDFSCluster (#4108)
     add 4b786c797a7 HADOOP-18178. Upgrade jackson to 2.13.2 and jackson-databind to 2.13.2.2 (#4111)
     add f70935522b0 HADOOP-18188. Support touch command for directory (#4135)
     add 807a428b556 HDFS-16457.Make fs.getspaceused.classname reconfigurable (#4069)
     add 5412fbf6d4b HDFS-16460. [SPS]: Handle failure retries for moving tasks (#4001)
     add bfde9102be7 HADOOP-18195. Make jackson 1 a runtime scope dependency (#4149)
     add 37650ced819 HDFS-16497. EC: Add param comment for liveBusyBlockIndices with HDFS-14768. Contributed by caozhiqiang.
     add b69ede7154d HADOOP-18191. Log retry count while handling exceptions in RetryInvocationHandler (#4133)
     add d5e97fe4d6b HDFS-16473. Make HDFS stat tool cross platform (#4145)
     add 5de78ceb0e7 HDFS-16516. Fix Fsshell wrong params (#4090). Contributed by GuoPhilipse.
     add e89fd9645bb HDFS-16474. Make HDFS tail tool cross platform (#4157)
     add 45394433a11 HDFS-16484. [SPS]: Fix an infinite loop bug in SPSPathIdProcessor thread (#4032)
     add a7b4e8f03e5 HDFS-14478. Add libhdfs APIs for openFile (#4166)
     add 3b46aae977e YARN-11107. When NodeLabel is enabled for a YARN cluster, AM blacklist program does not work properly. Contributed by zhangxiping1
     add 2efab92959c HDFS-16479. EC: NameNode should not send a reconstruction work when the source datanodes are insufficient (#4138)
     add c65c383b7eb HDFS-16509. Fix decommission UnsupportedOperationException (#4077). Contributed by daimin.
     add cee8c62498f HDFS-16456. EC: Decommission a rack with only on dn will fail when the rack number is equal with replication (#4126)
     add a6ebc42671f HADOOP-18201. Remove endpoint config overrides for ITestS3ARequesterPays (#4169)
     add d5cba5c3d11 YARN-11107. Addendum. When NodeLabel is enabled for a YARN cluster, AM blacklist program does not work properly. (#4175)
     add 8ea33583804 HADOOP-18196. Remove replace-guava from replacer plugin (#4152)
     add f14f3050517 HDFS-16526. Add metrics for slow DataNode (#4162)
     add cb975c3df6f HDFS-16534. Split FsDatasetImpl from block pool locks to volume grain locks. (#4141) Contributed by limingxiang.
     add dbeeee03639 HDFS-16531. Avoid setReplication writing an edit record if old replication equals the new value (#4148). Contributed by Stephen O'Donnell.
     add 35d4c02bccd HDFS-16535. SlotReleaser should reuse the domain socket based on socket paths (#4158)
     add 900682e7120 HDFS-16389.Improve NNThroughputBenchmark test mkdirs. (#3819)
     add f74f2416369 HDFS-16541. Fix a typo in NameNodeLayoutVersion (#4176)
     add d7fd61d6163 HADOOP-18202. create-release fails fatal: unsafe repository (#4188)
     add 52e152f8b0d HDFS-16538. EC decoding failed due to not enough valid inputs (#4167)
     add ec0ff1dc04b HDFS-16035. Remove DummyGroupMapping as it is not longer used anywhere. (#4183)
     add 98b9c435f24 HADOOP-18172: Changed scope for isRootInternalDir/getRootFallbackLink for InodeTree (#4106)
     add 76bbd173749 HDFS-16544. EC decoding failed due to invalid buffer (#4179)
     add 4ff8a5dc738 HDFS-16526. Addendum Add metrics for slow DataNode (#4191)
     add a4683be65ec Revert "HDFS-16531. Avoid setReplication writing an edit record if old replication equals the new value (#4148). Contributed by Stephen O'Donnell."
     add bf4730b4d69 HDFS-16500. Make asynchronous blocks deletion lock and unlock durtion threshold configurable. (#4061). Contributed by Chengwei Wang.
     add 56cfd606177 HADOOP-17551. Upgrade maven-site-plugin to 3.11.0 (#4196)
     add b35b744670f YARN-11111. Recovery failure when node-label configure-type transit from delegated-centralized to centralized (#4200)
     add f84b88dd6ba HADOOP-17564. Fix typo in UnixShellGuide.html (#4195)
     add 214f3690732 HDFS-16556. Fix typos in distcp (#4217)
     add 5ebbacc4802 HDFS-16552. Fix NPE for TestBlockManager (#4210)
     add aebd55f7883 HDFS-16519. Add throttler to EC reconstruction (#4101)
     add acc0e0a2101 HDFS-16488. [SPS]: Expose metrics to JMX for external SPS (#4035)
     add 17d64ba495e Revert "HDFS-16488. [SPS]: Expose metrics to JMX for external SPS (#4035)" (#4232)
     add 1b4dba99b5d HADOOP-16202. Enhanced openFile(): hadoop-common changes. (#2584/1)
     add 6999acf5200 HADOOP-16202. Enhanced openFile(): mapreduce and YARN changes. (#2584/2)
     add e0cd0a82e03 HADOOP-16202. Enhanced openFile(): hadoop-aws changes. (#2584/3)
     add 44ae2fa8e53 HADOOP-16202. Enhanced openFile(): hadoop-azure changes. (#2584/4)
     add 89279f0b90b HDFS-16255. RBF: Fix dead link to fedbalance document (#4193)
     add fb13c1e4a8a MAPREDUCE-7246. In MapredAppMasterRest#Mapreduce_Application_Master_Info_API, updating the datatype of appId to "string". (#4223)
     add 006767eb941 YARN-10303. Fixing rest api example, status code error and other typos in ResourceManagerRest.md (#4221)
     add f1e5f8e7643 HDFS-16488. [SPS]: Expose metrics to JMX for external SPS (#4234)
     add d60262fe009 HADOOP-18167. Add metrics to track delegation token secret manager op… (#4092)
     add 2632d49282a HADOOP-18216. io.file.buffer.size must be positive (#4220)
     add f187e9bcd52 HDFS-16554. Remove unused configuration dfs.namenode.block.deletion.increment. (#4213). Contributed by Chengwei Wang.
     add b4ff49a3943 HDFS-16539. RBF: Support refreshing/changing router fairness policy controller without rebooting router (#4168)
     add 63187083cc3 HADOOP-15983. Use jersey-json that is built to use jackson2 (#3988)
     add bda0881bf90 HDFS-16540 Data locality is lost when DataNode pod restarts in kubernetes (#4170)
     add 4e47eb66d12 Revert "HDFS-16540 Data locality is lost when DataNode pod restarts in kubernetes (#4170)"
     add 9ed8d60511d HDFS-16540. Data locality is lost when DataNode pod restarts in kubernetes (#4170)
     add b9ade7a4cd1 HDFS-16528. Reconfigure slow peer enable for Namenode (#4186)
     add 7bd7725532f HDFS-16553. Fix checkstyle for the length of BlockManager construction method over limit. (#4211). Contributed by Chengwei Wang.
     add 88155cebe94 HDFS-16468. Define ssize_t for Windows (#4228)
     add 23e1511cd0f HDFS-16256. Minor fix in HDFS Fedbalance document (#4192)
     add 4653fcd7046 HADOOP-18219. Fix shaded client test failure (#4254)
     add a74acc755ed YARN-10187. Removing hadoop-yarn-project/hadoop-yarn/README as it is no longer maintained. (#4222)
     add d4a91bd0c0d YARN-11116. Migrate Times util from SimpleDateFormat to thread-safe DateTimeFormatter class (#4242)
     new 8bf45aa5ba6 HADOOP-11867. Add a high-performance vectored read API. (#3904)
     new 7d1a2c3e288 HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads (#3964)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (d441c0f64f9)
            \
             N -- N -- N   refs/heads/feature-vectored-io (7d1a2c3e288)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .BUILDING.txt.swp                                  |  Bin 0 -> 16384 bytes
 LICENSE-binary                                     |   34 +-
 dev-support/bin/create-release                     |    4 +
 dev-support/docker/Dockerfile_centos_8             |    7 +
 .../pkg-resolver/set-vault-as-baseurl-centos.sh    |   26 +-
 hadoop-client-modules/hadoop-client-api/pom.xml    |    3 +
 .../hadoop-client-check-invariants/pom.xml         |    2 +-
 .../hadoop-client-check-test-invariants/pom.xml    |    2 +-
 .../hadoop-client-integration-tests/pom.xml        |    6 +
 .../hadoop-client-minicluster/pom.xml              |   48 +-
 hadoop-client-modules/hadoop-client/pom.xml        |    8 +-
 .../hadoop-cloud-storage/pom.xml                   |    4 +
 .../hadoop-huaweicloud/pom.xml                     |   12 +-
 hadoop-common-project/hadoop-common/pom.xml        |   39 +-
 .../hadoop-common/src/main/conf/log4j.properties   |   63 +-
 .../java/org/apache/hadoop/fs/AvroFSInput.java     |   11 +-
 .../org/apache/hadoop/fs/CachingGetSpaceUsed.java  |   13 +-
 .../org/apache/hadoop/fs/ChecksumFileSystem.java   |    4 +-
 .../apache/hadoop/fs/CommonConfigurationKeys.java  |    2 +
 .../main/java/org/apache/hadoop/fs/FSBuilder.java  |   14 +
 .../java/org/apache/hadoop/fs/FileContext.java     |   18 +-
 .../main/java/org/apache/hadoop/fs/FileSystem.java |   11 +-
 .../main/java/org/apache/hadoop/fs/FileUtil.java   |  167 +-
 .../hadoop/fs/FutureDataInputStreamBuilder.java    |    8 +-
 .../main/java/org/apache/hadoop/fs/Options.java    |  119 +
 .../org/apache/hadoop/fs/TrashPolicyDefault.java   |    4 +-
 .../hadoop/fs/impl/AbstractFSBuilderImpl.java      |   38 +-
 .../fs/impl/FileSystemMultipartUploader.java       |    9 +-
 .../fs/impl/FutureDataInputStreamBuilderImpl.java  |    8 +-
 .../org/apache/hadoop/fs/impl/FutureIOSupport.java |   83 +-
 .../apache/hadoop/fs/impl/OpenFileParameters.java  |   13 +
 .../apache/hadoop/fs/impl/WrappedIOException.java  |   11 +-
 .../hadoop/fs/shell/CommandWithDestination.java    |    9 +-
 .../org/apache/hadoop/fs/shell/CopyCommands.java   |    3 +-
 .../java/org/apache/hadoop/fs/shell/Display.java   |    3 +-
 .../main/java/org/apache/hadoop/fs/shell/Head.java |    8 +-
 .../java/org/apache/hadoop/fs/shell/PathData.java  |   35 +
 .../main/java/org/apache/hadoop/fs/shell/Tail.java |   11 +-
 .../org/apache/hadoop/fs/shell/TouchCommands.java  |    3 -
 .../hadoop/fs/statistics/StoreStatisticNames.java  |   15 +
 .../hadoop/fs/statistics/StreamStatisticNames.java |   19 +-
 .../fs/statistics/impl/IOStatisticsBinding.java    |   73 +-
 .../fs/statistics/impl/IOStatisticsStore.java      |   11 +
 .../impl/PairedDurationTrackerFactory.java         |    5 +
 .../statistics/impl/StatisticDurationTracker.java  |    7 +
 .../org/apache/hadoop/fs/viewfs/Constants.java     |    7 +-
 .../org/apache/hadoop/fs/viewfs/InodeTree.java     |   40 +-
 .../apache/hadoop/fs/viewfs/ViewFileSystem.java    |  208 +-
 .../java/org/apache/hadoop/fs/viewfs/ViewFs.java   |   21 +-
 .../org/apache/hadoop/http/HttpRequestLog.java     |   72 +-
 .../java/org/apache/hadoop/io/SequenceFile.java    |   14 +-
 .../hadoop/io/retry/RetryInvocationHandler.java    |   10 +-
 .../java/org/apache/hadoop/ipc/CallerContext.java  |    7 +-
 .../main/java/org/apache/hadoop/ipc/Server.java    |    3 -
 .../main/java/org/apache/hadoop/net/NetUtils.java  |   26 +
 .../org/apache/hadoop/net/NetworkTopology.java     |  115 +-
 .../hadoop/security/UserGroupInformation.java      |   14 +-
 .../AbstractDelegationTokenSecretManager.java      |  121 +-
 .../delegation/ZKDelegationTokenSecretManager.java |    5 +
 .../org/apache/hadoop/util/JsonSerialization.java  |   11 +-
 .../main/java/org/apache/hadoop/util/Lists.java    |   24 +
 .../RateLimiting.java}                             |   46 +-
 .../apache/hadoop/util/RateLimitingFactory.java    |  102 +
 .../hadoop/util/curator/ZKCuratorManager.java      |   29 +
 .../functional/CloseableTaskPoolSubmitter.java     |   74 +
 .../util/functional/CommonCallableSupplier.java    |    2 +-
 .../apache/hadoop/util/functional/FutureIO.java    |   90 +
 .../apache/hadoop/util/functional/TaskPool.java    |  613 +++++
 .../src/main/resources/core-default.xml            |    3 +-
 .../src/site/markdown/FileSystemShell.md           |    6 +-
 .../hadoop-common/src/site/markdown/Metrics.md     |    3 +
 .../src/site/markdown/UnixShellGuide.md            |    2 +-
 .../src/site/markdown/filesystem/filesystem.md     |  110 +-
 .../filesystem/fsdatainputstreambuilder.md         |  588 ++++-
 .../src/site/markdown/filesystem/index.md          |    1 +
 .../src/site/markdown/filesystem/openfile.md       |  122 +
 .../src/site/markdown/filesystem/outputstream.md   |   12 +-
 .../site/markdown/release/3.2.3/CHANGELOG.3.2.3.md |  386 ++++
 .../markdown/release/3.2.3/RELEASENOTES.3.2.3.md   |   71 +
 .../site/markdown/release/3.3.2/CHANGELOG.3.3.2.md |  350 +++
 .../markdown/release/3.3.2/RELEASENOTES.3.3.2.md   |   93 +
 .../java/org/apache/hadoop/fs/TestFileUtil.java    |  485 ++--
 .../org/apache/hadoop/fs/TestFsShellTouch.java     |   72 +
 .../AbstractContractContentSummaryTest.java        |   65 +
 .../AbstractContractMultipartUploaderTest.java     |    2 +-
 .../fs/contract/AbstractContractOpenTest.java      |   80 +-
 .../hadoop/fs/contract/ContractTestUtils.java      |   13 +-
 .../localfs/TestLocalFSContractContentSummary.java |   20 +-
 .../fs/statistics/IOStatisticAssertions.java       |   20 +
 .../hadoop/fs/statistics/TestDurationTracking.java |    3 +-
 .../viewfs/TestViewFileSystemLocalFileSystem.java  |    9 +
 ...ViewFileSystemWithAuthorityLocalFileSystem.java |   10 +
 .../hadoop/fs/viewfs/ViewFileSystemBaseTest.java   |  120 +-
 .../org/apache/hadoop/http/TestHttpRequestLog.java |   27 +-
 .../hadoop/http/TestHttpServerLifecycle.java       |   21 -
 .../java/org/apache/hadoop/io/AvroTestUtil.java    |    2 +-
 .../org/apache/hadoop/io/TestEnumSetWritable.java  |    2 +-
 .../org/apache/hadoop/net/TestClusterTopology.java |   38 +
 .../token/delegation/TestDelegationToken.java      |  136 ++
 .../java/org/apache/hadoop/util/TestLists.java     |   44 +
 .../hadoop/util/functional/TestTaskPool.java       |  585 +++++
 hadoop-hdfs-project/hadoop-hdfs-client/pom.xml     |    4 +
 .../org/apache/hadoop/hdfs/KeyProviderCache.java   |   25 +
 .../apache/hadoop/hdfs/StatefulStripeReader.java   |    9 +-
 .../hdfs/shortcircuit/ShortCircuitCache.java       |   25 +-
 .../apache/hadoop/hdfs/web/WebHdfsFileSystem.java  |    3 +-
 .../lib/service/security/DummyGroupMapping.java    |   65 -
 .../src/main/native/libhdfs-tests/CMakeLists.txt   |    1 +
 .../main/native/libhdfs-tests/test_libhdfs_ops.c   |   62 +
 .../src/main/native/libhdfs/CMakeLists.txt         |    1 +
 .../src/main/native/libhdfs/hdfs.c                 |  500 +++-
 .../src/main/native/libhdfs/include/hdfs/hdfs.h    |  135 ++
 .../src/main/native/libhdfs/jclasses.c             |    4 +
 .../src/main/native/libhdfs/jclasses.h             |    7 +
 .../src/main/native/libhdfs/jni_helper.c           |    1 +
 .../native/libhdfspp/examples/c/cat/CMakeLists.txt |    2 +-
 .../src/main/native/libhdfspp/examples/c/cat/cat.c |    1 +
 .../main/native/libhdfspp/examples/cc/cat/cat.cc   |    2 -
 .../src/main/native/libhdfspp/lib/fs/filehandle.cc |    1 +
 .../src/main/native/libhdfspp/lib/fs/filehandle.h  |    1 +
 .../libhdfspp/lib/x-platform/syscall_linux.cc      |    1 +
 .../main/native/libhdfspp/lib/x-platform/types.h   |   29 +-
 .../src/main/native/libhdfspp/tests/CMakeLists.txt |    1 -
 .../src/main/native/libhdfspp/tests/hdfs_shim.c    |   59 +
 .../libhdfspp/tests/libhdfs_wrapper_defines.h      |   17 +
 .../libhdfspp/tests/libhdfs_wrapper_undefs.h       |   17 +
 .../libhdfspp/tests/libhdfspp_wrapper_defines.h    |   17 +
 .../native/libhdfspp/tests/tools/CMakeLists.txt    |   18 +
 .../tests/tools/hdfs-create-snapshot-mock.cc       |    8 +-
 .../native/libhdfspp/tests/tools/hdfs-find-mock.cc |   93 +
 .../native/libhdfspp/tests/tools/hdfs-find-mock.h  |   69 +
 .../native/libhdfspp/tests/tools/hdfs-get-mock.cc  |   56 +
 .../native/libhdfspp/tests/tools/hdfs-get-mock.h   |   68 +
 .../native/libhdfspp/tests/tools/hdfs-ls-mock.cc   |   67 +
 .../native/libhdfspp/tests/tools/hdfs-ls-mock.h    |   68 +
 .../libhdfspp/tests/tools/hdfs-setrep-mock.cc      |   56 +
 .../libhdfspp/tests/tools/hdfs-setrep-mock.h       |   68 +
 .../native/libhdfspp/tests/tools/hdfs-stat-mock.cc |   55 +
 .../native/libhdfspp/tests/tools/hdfs-stat-mock.h  |   67 +
 .../native/libhdfspp/tests/tools/hdfs-tail-mock.cc |   64 +
 .../native/libhdfspp/tests/tools/hdfs-tail-mock.h  |   68 +
 .../libhdfspp/tests/tools/hdfs-tool-tests.cc       |  119 +-
 .../native/libhdfspp/tests/tools/hdfs-tool-tests.h |   57 +-
 .../libhdfspp/tests/x-platform/CMakeLists.txt      |    5 +
 .../libhdfspp/tests/x-platform/types_test.cc       |   19 +-
 .../native/libhdfspp/tests/x-platform/types_test.h |   78 +
 .../src/main/native/libhdfspp/tools/CMakeLists.txt |   39 +-
 .../tools/hdfs-copy-to-local/hdfs-copy-to-local.cc |   14 +-
 .../tools/hdfs-copy-to-local/hdfs-copy-to-local.h  |    5 +
 .../c/cat => tools/hdfs-find}/CMakeLists.txt       |   14 +-
 .../native/libhdfspp/tools/hdfs-find/hdfs-find.cc  |  193 ++
 .../hdfs-copy-to-local.h => hdfs-find/hdfs-find.h} |   36 +-
 .../main/native/libhdfspp/tools/hdfs-find/main.cc  |   58 +-
 .../c/cat => tools/hdfs-get}/CMakeLists.txt        |   14 +-
 .../native/libhdfspp/tools/hdfs-get/hdfs-get.cc    |   20 +-
 .../native/libhdfspp/tools/hdfs-get/hdfs-get.h     |   40 +-
 .../main/native/libhdfspp/tools/hdfs-get/main.cc   |   58 +-
 .../c/cat => tools/hdfs-ls}/CMakeLists.txt         |   14 +-
 .../main/native/libhdfspp/tools/hdfs-ls/hdfs-ls.cc |  156 ++
 .../hdfs-copy-to-local.h => hdfs-ls/hdfs-ls.h}     |   32 +-
 .../main/native/libhdfspp/tools/hdfs-ls/main.cc    |   58 +-
 .../c/cat => tools/hdfs-setrep}/CMakeLists.txt     |   14 +-
 .../libhdfspp/tools/hdfs-setrep/hdfs-setrep.cc     |  220 ++
 .../hdfs-setrep.h}                                 |   34 +-
 .../native/libhdfspp/tools/hdfs-setrep/main.cc     |   58 +-
 .../c/cat => tools/hdfs-stat}/CMakeLists.txt       |   14 +-
 .../native/libhdfspp/tools/hdfs-stat/hdfs-stat.cc  |  111 +
 .../hdfs-copy-to-local.h => hdfs-stat/hdfs-stat.h} |   30 +-
 .../main/native/libhdfspp/tools/hdfs-stat/main.cc  |   58 +-
 .../c/cat => tools/hdfs-tail}/CMakeLists.txt       |   14 +-
 .../native/libhdfspp/tools/hdfs-tail/hdfs-tail.cc  |  150 ++
 .../hdfs-copy-to-local.h => hdfs-tail/hdfs-tail.h} |   42 +-
 .../main/native/libhdfspp/tools/hdfs-tail/main.cc  |   58 +-
 .../src/main/native/libhdfspp/tools/hdfs_find.cc   |  146 --
 .../src/main/native/libhdfspp/tools/hdfs_get.cc    |   88 -
 .../src/main/native/libhdfspp/tools/hdfs_ls.cc     |  130 --
 .../src/main/native/libhdfspp/tools/hdfs_setrep.cc |  172 --
 .../src/main/native/libhdfspp/tools/hdfs_stat.cc   |   87 -
 .../src/main/native/libhdfspp/tools/hdfs_tail.cc   |  124 -
 .../tools/internal/set-replication-state.h         |   72 +
 .../main/native/libhdfspp/tools/tools_common.cc    |    2 -
 hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml        |    6 +
 .../AbstractRouterRpcFairnessPolicyController.java |    3 +-
 .../RefreshFairnessPolicyControllerHandler.java    |   41 +
 .../server/federation/metrics/FederationMBean.java |    7 +
 .../federation/metrics/NamenodeBeanMetrics.java    |   10 +
 .../hdfs/server/federation/metrics/RBFMetrics.java |    6 +
 .../resolver/MembershipNamenodeResolver.java       |    1 +
 .../federation/resolver/NamenodeStatusReport.java  |   18 +-
 .../router/NamenodeHeartbeatService.java           |    3 +-
 .../federation/router/RouterAdminServer.java       |    9 +
 .../server/federation/router/RouterRpcClient.java  |  112 +-
 .../server/federation/router/RouterRpcServer.java  |    6 +-
 .../federation/router/RouterWebHdfsMethods.java    |    5 +
 .../federation/store/records/MembershipStats.java  |    4 +
 .../records/impl/pb/MembershipStatsPBImpl.java     |   10 +
 .../src/main/proto/FederationProtocol.proto        |    1 +
 .../src/site/markdown/HDFSRouterFederation.md      |   16 +-
 .../TestRouterRefreshFairnessPolicyController.java |  234 ++
 .../server/federation/metrics/TestRBFMetrics.java  |    5 +
 .../server/federation/router/TestRouterRpc.java    |   13 +-
 .../router/TestRouterRpcMultiDestination.java      |    5 +-
 .../router/TestRouterWebHdfsMethods.java           |   24 +
 .../store/FederationStateStoreTestUtils.java       |    1 +
 .../dev-support/jdiff/Apache_Hadoop_HDFS_3.2.3.xml |  674 ++++++
 .../dev-support/jdiff/Apache_Hadoop_HDFS_3.3.2.xml |  835 +++++++
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   32 +-
 .../delegation/DelegationTokenSecretManager.java   |    4 +-
 .../hdfs/server/balancer/NameNodeConnector.java    |   12 +-
 .../hdfs/server/blockmanagement/BlockManager.java  |  221 +-
 .../blockmanagement/BlockManagerSafeMode.java      |    2 +-
 .../BlockPlacementPolicyDefault.java               |    7 +-
 .../BlockPlacementPolicyRackFaultTolerant.java     |   59 +-
 .../blockmanagement/CacheReplicationMonitor.java   |    2 +-
 .../DatanodeAdminBackoffMonitor.java               |   18 +-
 .../DatanodeAdminDefaultMonitor.java               |   10 +-
 .../blockmanagement/DatanodeAdminManager.java      |    6 +-
 .../server/blockmanagement/DatanodeManager.java    |  109 +-
 .../server/blockmanagement/ErasureCodingWork.java  |   30 +-
 .../server/blockmanagement/HeartbeatManager.java   |    4 +-
 .../blockmanagement/SlowPeerDisabledTracker.java   |   94 +
 .../server/blockmanagement/SlowPeerTracker.java    |   12 +-
 .../hdfs/server/common/sps/BlockDispatcher.java    |   10 +-
 .../hdfs/server/datanode/BPServiceActor.java       |   25 +-
 .../hadoop/hdfs/server/datanode/BlockReceiver.java |    4 +
 .../hadoop/hdfs/server/datanode/BlockSender.java   |    4 +-
 .../apache/hadoop/hdfs/server/datanode/DNConf.java |   15 +-
 .../hadoop/hdfs/server/datanode/DataNode.java      |  188 +-
 .../hadoop/hdfs/server/datanode/DiskBalancer.java  |   18 +-
 .../hdfs/server/datanode/FileIoProvider.java       |    4 +
 .../server/datanode/ProfilingFileIoEvents.java     |   37 +-
 .../hadoop/hdfs/server/datanode/VolumeScanner.java |    2 +-
 .../erasurecode/StripedBlockReconstructor.java     |    8 +
 .../server/datanode/erasurecode/StripedReader.java |    5 +
 .../AvailableSpaceVolumeChoosingPolicy.java        |    1 -
 .../server/datanode/fsdataset/FsDatasetSpi.java    |   26 +-
 .../datanode/fsdataset/impl/BlockPoolSlice.java    |   50 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |  462 ++--
 .../datanode/fsdataset/impl/FsVolumeImpl.java      |    4 +
 .../datanode/fsdataset/impl/FsVolumeList.java      |   22 +
 .../fsdataset/impl/ProvidedVolumeImpl.java         |    5 +-
 .../server/datanode/fsdataset/impl/ReplicaMap.java |  132 +-
 .../datanode/metrics/DataNodeDiskMetrics.java      |   35 +-
 .../server/datanode/metrics/DataNodeMetrics.java   |   10 +
 .../hadoop/hdfs/server/namenode/BackupImage.java   |    2 +-
 .../hadoop/hdfs/server/namenode/Checkpointer.java  |    2 +-
 .../server/namenode/EncryptionZoneManager.java     |    6 +-
 .../hadoop/hdfs/server/namenode/FSDirAttrOp.java   |    2 +-
 .../server/namenode/FSDirEncryptionZoneOp.java     |    4 +-
 .../hadoop/hdfs/server/namenode/FSEditLog.java     |   11 +-
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  |   42 +-
 .../hdfs/server/namenode/FsImageValidation.java    |    2 +-
 .../hadoop/hdfs/server/namenode/NameNode.java      |   86 +-
 .../server/namenode/NameNodeLayoutVersion.java     |    8 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java    |   34 +-
 .../hdfs/server/namenode/ReencryptionHandler.java  |    2 +-
 .../hdfs/server/namenode/SecondaryNameNode.java    |    2 +-
 .../hdfs/server/namenode/ha/EditLogTailer.java     |    2 +-
 .../server/namenode/metrics/FSNamesystemMBean.java |    7 +
 .../namenode/snapshot/SnapshotDeletionGc.java      |    2 +-
 .../server/namenode/snapshot/SnapshotManager.java  |    6 +-
 .../sps/BlockStorageMovementAttemptedItems.java    |   10 +
 .../namenode/sps/BlockStorageMovementNeeded.java   |   16 +-
 .../namenode/sps/StoragePolicySatisfier.java       |    4 +-
 .../namenode/sps/StoragePolicySatisfyManager.java  |   31 +-
 .../web/resources/NamenodeWebHdfsMethods.java      |    4 +
 .../sps/ExternalSPSBlockMoveTaskHandler.java       |   38 +-
 .../hadoop/hdfs/server/sps/ExternalSPSContext.java |   17 +
 .../hdfs/server/sps/ExternalSPSFaultInjector.java  |   52 +-
 .../server/sps/ExternalStoragePolicySatisfier.java |   24 +-
 .../server/sps/metrics/ExternalSPSBeanMetrics.java |  100 +
 .../hdfs/server/sps/metrics/ExternalSPSMXBean.java |   52 +
 .../hdfs/server/sps/metrics/package-info.java      |   17 +-
 .../offlineImageViewer/OfflineImageViewerPB.java   |   10 +-
 .../PBImageCorruptionDetector.java                 |    2 +-
 .../PBImageDelimitedTextWriter.java                |    8 +-
 .../offlineImageViewer/PBImageTextWriter.java      |  267 ++-
 .../java/org/apache/hadoop/hdfs/util/RwLock.java   |   12 +
 .../src/main/resources/hdfs-default.xml            |   66 +-
 .../hadoop/fs/viewfs/TestViewFileSystemHdfs.java   |    8 +
 .../hadoop/fs/viewfs/TestViewFsLinkFallback.java   |   22 +
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java     |   85 +-
 .../hadoop/hdfs/MiniDFSClusterWithNodeGroup.java   |    6 +-
 .../org/apache/hadoop/hdfs/TestDFSShellTouch.java  |  239 ++
 .../org/apache/hadoop/hdfs/TestDecommission.java   |   17 +
 .../apache/hadoop/hdfs/TestKeyProviderCache.java   |    9 +
 .../org/apache/hadoop/hdfs/TestMiniDFSCluster.java |   92 +
 .../hdfs/TestReadStripedFileWithDecoding.java      |   56 +
 .../hadoop/hdfs/qjournal/MiniJournalCluster.java   |   41 +-
 .../hdfs/qjournal/TestMiniJournalCluster.java      |   98 +-
 .../balancer/TestBalancerLongRunningTasks.java     |    2 +-
 .../balancer/TestBalancerWithHANameNodes.java      |   15 +-
 .../server/blockmanagement/TestBlockManager.java   |   98 +
 .../blockmanagement/TestBlockReportLease.java      |   45 +
 .../blockmanagement/TestDatanodeManager.java       |   29 +
 .../hdfs/server/datanode/SimulatedFSDataset.java   |   26 +-
 .../hdfs/server/datanode/TestBPOfferService.java   |   11 +
 .../hdfs/server/datanode/TestBlockRecovery2.java   |    7 +-
 .../hdfs/server/datanode/TestBlockScanner.java     |   16 +-
 .../hdfs/server/datanode/TestDataNodeMetrics.java  |   54 +
 .../datanode/TestDataNodeReconfiguration.java      |  233 ++
 .../hdfs/server/datanode/TestDirectoryScanner.java |   13 +-
 .../datanode/extdataset/ExternalDatasetImpl.java   |   16 +-
 .../datanode/extdataset/ExternalVolumeImpl.java    |    3 +-
 .../fsdataset/impl/FsDatasetImplTestUtils.java     |    2 +-
 .../datanode/fsdataset/impl/TestFsDatasetImpl.java |  177 +-
 .../datanode/fsdataset/impl/TestFsVolumeList.java  |    3 +-
 .../fsdataset/impl/TestInterDatanodeProtocol.java  |   24 +-
 .../datanode/fsdataset/impl/TestProvidedImpl.java  |   10 +-
 .../datanode/fsdataset/impl/TestReplicaMap.java    |    7 +-
 .../fsdataset/impl/TestWriteToReplica.java         |    3 +-
 .../apache/hadoop/hdfs/server/mover/TestMover.java |    6 +-
 .../hdfs/server/namenode/FileNameGenerator.java    |    4 +
 .../server/namenode/NNThroughputBenchmark.java     |   40 +-
 .../TestBlockPlacementPolicyRackFaultTolerant.java |  124 +
 .../server/namenode/TestLargeDirectoryDelete.java  |    1 -
 .../server/namenode/TestNameNodeReconfigure.java   |   66 +-
 .../server/namenode/TestNameNodeRpcServer.java     |   96 +-
 .../namenode/ha/TestConsistentReadsObserver.java   |    2 -
 .../namenode/snapshot/TestSnapshotDiffReport.java  |    3 +-
 .../TestStoragePolicySatisfierWithStripedFile.java |    4 +-
 .../sps/TestExternalStoragePolicySatisfier.java    |  190 +-
 .../hdfs/shortcircuit/TestShortCircuitCache.java   |   78 +
 .../org/apache/hadoop/hdfs/tools/TestDFSAdmin.java |   25 +-
 .../offlineImageViewer/TestOfflineImageViewer.java |   59 +-
 .../TestOfflineImageViewerForAcl.java              |    2 +-
 .../org/apache/hadoop/hdfs/web/TestWebHDFS.java    |   33 +
 .../org/apache/hadoop/net/TestNetworkTopology.java |   16 +
 .../dev-support/findbugs-exclude.xml               |    8 +
 .../hadoop-mapreduce-client-app/pom.xml            |    6 +
 .../jobhistory/JobHistoryCopyService.java          |   10 +-
 .../hadoop-mapreduce-client-core/pom.xml           |   13 +-
 .../org/apache/hadoop/mapred/LineRecordReader.java |   13 +-
 .../lib/input/FixedLengthRecordReader.java         |    7 +-
 .../mapreduce/lib/input/LineRecordReader.java      |   14 +-
 .../mapreduce/lib/input/NLineInputFormat.java      |    6 +-
 .../lib/output/PathOutputCommitterFactory.java     |   10 +-
 .../committer/manifest/ManifestCommitter.java      |  761 +++++++
 .../manifest/ManifestCommitterConfig.java          |  375 +++
 .../manifest/ManifestCommitterConstants.java       |  240 ++
 .../manifest/ManifestCommitterFactory.java         |   57 +
 .../manifest/ManifestCommitterStatisticNames.java  |  264 +++
 .../manifest/files/AbstractManifestData.java       |  137 ++
 .../committer/manifest/files/DiagnosticKeys.java   |   33 +-
 .../output/committer/manifest/files/DirEntry.java  |  202 ++
 .../committer/manifest/files/EntryStatus.java      |   87 +
 .../output/committer/manifest/files/FileEntry.java |  189 ++
 .../committer/manifest/files/ManifestPrinter.java  |  144 ++
 .../manifest/files/ManifestSuccessData.java        |  493 ++++
 .../committer/manifest/files/TaskManifest.java     |  365 +++
 .../committer/manifest/files/package-info.java     |   41 +
 .../manifest/impl/AuditingIntegration.java         |   94 +
 .../committer/manifest/impl/InternalConstants.java |  130 ++
 .../manifest/impl/ManifestCommitterSupport.java    |  374 +++
 .../manifest/impl/ManifestStoreOperations.java     |  291 +++
 .../ManifestStoreOperationsThroughFileSystem.java  |  187 ++
 .../manifest/impl/OutputValidationException.java   |   29 +-
 .../committer/manifest/impl/package-info.java      |   25 +-
 .../output/committer/manifest/package-info.java    |   29 +-
 .../committer/manifest/stages/AbortTaskStage.java  |   63 +
 .../manifest/stages/AbstractJobOrTaskStage.java    |  942 ++++++++
 .../committer/manifest/stages/CleanupJobStage.java |  511 +++++
 .../committer/manifest/stages/CommitJobStage.java  |  245 ++
 .../committer/manifest/stages/CommitTaskStage.java |  115 +
 .../stages/CreateOutputDirectoriesStage.java       |  423 ++++
 .../committer/manifest/stages/JobOrTaskStage.java  |   28 +-
 .../manifest/stages/LoadManifestsStage.java        |  293 +++
 .../manifest/stages/RenameFilesStage.java          |  173 ++
 .../manifest/stages/SaveSuccessFileStage.java      |   79 +
 .../manifest/stages/SaveTaskManifestStage.java     |   81 +
 .../committer/manifest/stages/SetupJobStage.java   |   63 +
 .../committer/manifest/stages/SetupTaskStage.java  |   54 +
 .../committer/manifest/stages/StageConfig.java     |  556 +++++
 .../manifest/stages/StageEventCallbacks.java       |   30 +-
 .../stages/TaskAttemptScanDirectoryStage.java      |  214 ++
 .../manifest/stages/ValidateRenamedFilesStage.java |  200 ++
 .../committer/manifest/stages/package-info.java    |   25 +-
 .../src/main/resources/mapred-default.xml          |   19 +
 .../src/site/markdown/MapredAppMasterRest.md       |    2 +-
 .../src/site/markdown/manifest_committer.md        |  605 +++++
 .../markdown/manifest_committer_architecture.md    |  335 +++
 .../site/markdown/manifest_committer_protocol.md   |  617 +++++
 .../manifest/AbstractManifestCommitterTest.java    | 1085 +++++++++
 .../committer/manifest/CommitterTestBinding.java   |  152 ++
 .../manifest/ManifestCommitterTestSupport.java     |  420 ++++
 .../committer/manifest/StubStoreOperations.java    |  109 +
 .../committer/manifest/TaggedFileStatus.java       |   54 +
 .../committer/manifest/TestCleanupStage.java       |  142 ++
 .../committer/manifest/TestCommitTaskStage.java    |  126 +
 .../manifest/TestCreateOutputDirectoriesStage.java |  307 +++
 .../manifest/TestJobThroughManifestCommitter.java  |  601 +++++
 .../committer/manifest/TestLoadManifestsStage.java |  141 ++
 .../manifest/TestManifestCommitProtocol.java       | 1801 +++++++++++++++
 .../committer/manifest/TestRenameStageFailure.java |  379 ++++
 .../committer/manifest/TestTaskManifestFileIO.java |  185 ++
 .../committer/manifest/TextOutputForTests.java     |  136 ++
 .../committer/manifest/ThreadLeakTracker.java      |   83 +
 .../impl/UnreliableManifestStoreOperations.java    |  380 ++++
 .../output/committer/manifest/package-info.java    |   24 +-
 .../src/test/resources/contract/localfs.xml        |  138 ++
 .../src/test/resources/core-site.xml               |   33 +
 .../hadoop-mapreduce-client-hs/pom.xml             |    6 +
 .../hadoop-mapreduce-client-jobclient/pom.xml      |    6 +
 .../src/CMakeLists.txt                             |    1 +
 .../hadoop-mapreduce-client/pom.xml                |   10 +
 .../hadoop-mapreduce-examples/pom.xml              |    6 +
 .../hadoop/examples/terasort/TeraInputFormat.java  |   16 +-
 hadoop-mapreduce-project/pom.xml                   |    4 +
 hadoop-maven-plugins/pom.xml                       |   18 +
 hadoop-minicluster/pom.xml                         |    6 +
 hadoop-project-dist/pom.xml                        |    2 +-
 hadoop-project/pom.xml                             |  121 +-
 hadoop-tools/hadoop-aliyun/pom.xml                 |    6 +
 hadoop-tools/hadoop-archive-logs/pom.xml           |    6 +
 hadoop-tools/hadoop-archives/pom.xml               |    6 +
 .../hadoop-aws/dev-support/findbugs-exclude.xml    |    5 +
 hadoop-tools/hadoop-aws/pom.xml                    |   16 +
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |   54 +-
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java      |    8 +
 .../java/org/apache/hadoop/fs/s3a/Invoker.java     |   28 +
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  376 ++-
 .../org/apache/hadoop/fs/s3a/S3AInputPolicy.java   |   93 +-
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   |  305 ++-
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   |   12 +-
 .../org/apache/hadoop/fs/s3a/S3AReadOpContext.java |  115 +-
 .../org/apache/hadoop/fs/s3a/S3ClientFactory.java  |    4 +-
 .../apache/hadoop/fs/s3a/S3ObjectAttributes.java   |   17 +-
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |   17 +
 .../apache/hadoop/fs/s3a/api/RequestFactory.java   |    5 +-
 .../hadoop/fs/s3a/commit/CommitOperations.java     |    8 +-
 .../hadoop/fs/s3a/commit/files/PendingSet.java     |   26 +-
 .../fs/s3a/commit/files/SinglePendingCommit.java   |   20 +-
 .../hadoop/fs/s3a/impl/AbstractStoreOperation.java |   26 +-
 .../hadoop/fs/s3a/impl/CallableSupplier.java       |    2 +-
 .../apache/hadoop/fs/s3a/impl/DeleteOperation.java |   45 +-
 .../fs/s3a/impl/GetContentSummaryOperation.java    |  104 +-
 .../hadoop/fs/s3a/impl/InternalConstants.java      |   21 +-
 .../apache/hadoop/fs/s3a/impl/OpenFileSupport.java |  600 +++++
 .../hadoop/fs/s3a/impl/OperationCallbacks.java     |   12 +-
 .../apache/hadoop/fs/s3a/impl/RenameOperation.java |    8 +-
 .../hadoop/fs/s3a/impl/RequestFactoryImpl.java     |   27 +-
 .../apache/hadoop/fs/s3a/s3guard/S3GuardTool.java  |    4 +-
 .../fs/s3a/select/InternalSelectConstants.java     |    2 +-
 .../apache/hadoop/fs/s3a/select/SelectTool.java    |    4 +-
 .../s3a/statistics/S3AInputStreamStatistics.java   |    7 +
 .../statistics/impl/EmptyS3AStatisticsContext.java |    4 +
 .../org/apache/hadoop/fs/s3a/tools/MarkerTool.java |    2 +-
 .../hadoop/fs/s3a/tools/MarkerToolOperations.java  |    9 +-
 .../fs/s3a/tools/MarkerToolOperationsImpl.java     |   10 +-
 .../src/site/markdown/tools/hadoop-aws/index.md    |   27 +
 .../src/site/markdown/tools/hadoop-aws/testing.md  |   25 +
 .../tools/hadoop-aws/troubleshooting_s3a.md        |   14 +
 .../s3a/ITestS3AContractContentSummary.java        |   70 +
 .../fs/contract/s3a/ITestS3AContractOpen.java      |   67 +
 .../fs/contract/s3a/ITestS3AContractSeek.java      |   15 +-
 .../apache/hadoop/fs/s3a/AbstractS3AMockTest.java  |    4 +
 .../hadoop/fs/s3a/ITestS3AConfiguration.java       |   11 -
 .../hadoop/fs/s3a/ITestS3AFailureHandling.java     |   38 +-
 .../hadoop/fs/s3a/ITestS3ARequesterPays.java       |  115 +
 .../org/apache/hadoop/fs/s3a/S3ATestConstants.java |   11 +
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  111 +-
 .../hadoop/fs/s3a/TestS3AInputStreamRetry.java     |   15 +-
 .../org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java  |    5 +-
 .../hadoop/fs/s3a/TestStreamChangeTracker.java     |    2 +-
 .../fs/s3a/audit/ITestAuditManagerDisabled.java    |    6 +-
 .../fs/s3a/impl/ITestPartialRenamesDeletes.java    |  105 -
 .../hadoop/fs/s3a/impl/TestOpenFileSupport.java    |  429 ++++
 .../hadoop/fs/s3a/impl/TestRequestFactory.java     |    2 +-
 .../s3a/performance/ITestS3AMiscOperationCost.java |    4 +-
 .../fs/s3a/performance/ITestS3AOpenCost.java       |  209 ++
 .../hadoop/fs/s3a/performance/OperationCost.java   |    6 +
 .../fs/s3a/scale/ITestS3ADeleteManyFiles.java      |    2 +-
 .../fs/s3a/scale/ITestS3ADirectoryPerformance.java |   33 +
 .../s3a/scale/ITestS3AInputStreamPerformance.java  |   58 +-
 .../hadoop/fs/s3a/select/AbstractS3SelectTest.java |    2 +-
 .../apache/hadoop/fs/s3a/select/ITestS3Select.java |    7 +-
 .../hadoop/fs/s3a/select/ITestS3SelectMRJob.java   |    4 +-
 .../fs/s3a/test/MinimalOperationCallbacks.java     |    9 +-
 hadoop-tools/hadoop-azure/pom.xml                  |   81 +-
 .../src/config/checkstyle-suppressions.xml         |    3 +
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |    9 +
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  205 +-
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |   36 +-
 .../commit/AbfsManifestStoreOperations.java        |  130 ++
 .../commit/AzureManifestCommitterFactory.java      |   58 +
 .../azurebfs/commit/ResilientCommitByRename.java   |  101 +
 .../hadoop/fs/azurebfs/commit/package-info.java    |   23 +-
 .../fs/azurebfs/constants/ConfigurationKeys.java   |    3 +
 .../constants/FileSystemConfigurations.java        |    5 +
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |   99 +-
 .../services/AbfsListStatusRemoteIterator.java     |   36 +-
 .../fs/azurebfs/services/SimpleKeyProvider.java    |   10 +-
 .../fs/azurebfs/AbstractAbfsIntegrationTest.java   |   11 +-
 .../ITestAbfsListStatusRemoteIterator.java         |   34 +-
 .../fs/azurebfs/ITestAbfsReadWriteAndSeek.java     |   13 +-
 .../azurebfs/ITestAbfsRestOperationException.java  |    5 +-
 .../ITestAzureBlobFileSystemDelegationSAS.java     |    5 +-
 .../fs/azurebfs/ITestCustomerProvidedKey.java      |    3 +-
 .../fs/azurebfs/commit/AbfsCommitTestHelper.java   |   49 +
 .../azurebfs/commit/AbstractAbfsClusterITest.java  |  260 +++
 .../fs/azurebfs/commit/ITestAbfsCleanupStage.java  |   54 +
 .../azurebfs/commit/ITestAbfsCommitTaskStage.java  |   54 +
 .../ITestAbfsCreateOutputDirectoriesStage.java     |   54 +
 .../ITestAbfsJobThroughManifestCommitter.java      |  101 +
 .../commit/ITestAbfsLoadManifestsStage.java        |   55 +
 .../commit/ITestAbfsManifestCommitProtocol.java    |   62 +
 .../commit/ITestAbfsManifestStoreOperations.java   |  175 ++
 .../commit/ITestAbfsRenameStageFailure.java        |   69 +
 .../commit/ITestAbfsTaskManifestFileIO.java        |   54 +
 .../fs/azurebfs/commit/ITestAbfsTerasort.java      |  353 +++
 .../hadoop/fs/azurebfs/commit/package-info.java    |   21 +-
 .../azurebfs/contract/AbfsFileSystemContract.java  |    2 +-
 .../contract/ITestAbfsFileSystemContractSeek.java  |    2 +-
 .../contract/ITestAzureBlobFileSystemBasics.java   |    3 +
 .../hadoop-azure/src/test/resources/core-site.xml  |   25 +
 hadoop-tools/hadoop-datajoin/pom.xml               |    6 +
 hadoop-tools/hadoop-distcp/pom.xml                 |    6 +
 .../tools/mapred/RetriableFileCopyCommand.java     |    9 +-
 .../hadoop-distcp/src/site/markdown/DistCp.md.vm   |   14 +-
 .../hadoop-dynamometer-infra/pom.xml               |    6 +
 .../tools/dynamometer/SimulatedDataNodes.java      |    2 +-
 hadoop-tools/hadoop-extras/pom.xml                 |   16 +
 hadoop-tools/hadoop-federation-balance/pom.xml     |    6 +
 .../src/site/markdown/HDFSFederationBalance.md     |    2 +-
 .../tools/fedbalance/TestDistCpProcedure.java      |   24 +-
 hadoop-tools/hadoop-fs2img/pom.xml                 |    6 +
 hadoop-tools/hadoop-gridmix/pom.xml                |    6 +
 hadoop-tools/hadoop-openstack/pom.xml              |    6 +
 hadoop-tools/hadoop-resourceestimator/pom.xml      |   18 +-
 hadoop-tools/hadoop-rumen/pom.xml                  |    6 +
 .../org/apache/hadoop/yarn/sls/AMDefinition.java   |  105 +
 .../hadoop/yarn/sls/AMDefinitionFactory.java       |  133 ++
 .../apache/hadoop/yarn/sls/AMDefinitionRumen.java  |  167 ++
 .../apache/hadoop/yarn/sls/AMDefinitionSLS.java    |  185 ++
 .../apache/hadoop/yarn/sls/AMDefinitionSynth.java  |  146 ++
 .../java/org/apache/hadoop/yarn/sls/AMRunner.java  |  299 +++
 .../org/apache/hadoop/yarn/sls/JobDefinition.java  |   87 +
 .../java/org/apache/hadoop/yarn/sls/NMRunner.java  |  238 ++
 .../java/org/apache/hadoop/yarn/sls/RMRunner.java  |  137 ++
 .../hadoop/yarn/sls/ReservationClientUtil.java     |    4 +-
 .../hadoop/yarn/sls/RumenToSLSConverter.java       |   12 +-
 .../java/org/apache/hadoop/yarn/sls/SLSRunner.java |  921 ++------
 .../hadoop/yarn/sls/TaskContainerDefinition.java   |  247 ++
 .../hadoop/yarn/sls/appmaster/AMSimulator.java     |   39 +-
 .../hadoop/yarn/sls/appmaster/DAGAMSimulator.java  |   20 +-
 .../hadoop/yarn/sls/appmaster/MRAMSimulator.java   |   17 +-
 .../yarn/sls/appmaster/StreamAMSimulator.java      |   23 +-
 .../hadoop/yarn/sls/nodemanager/NMSimulator.java   |    4 +-
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java      |    6 +-
 .../yarn/sls/resourcemanager/MockAMLauncher.java   |   18 +-
 .../yarn/sls/scheduler/ContainerSimulator.java     |   37 +-
 .../hadoop/yarn/sls/scheduler/RMNodeWrapper.java   |    1 -
 .../yarn/sls/scheduler/SLSCapacityScheduler.java   |  350 +--
 .../yarn/sls/scheduler/SLSFairScheduler.java       |  305 +--
 ...FairScheduler.java => SLSSchedulerCommons.java} |  305 ++-
 .../yarn/sls/scheduler/SchedulerMetrics.java       |   26 +-
 .../yarn/sls/scheduler/SchedulerWrapper.java       |   24 +
 .../apache/hadoop/yarn/sls/synthetic/SynthJob.java |    4 +-
 .../yarn/sls/synthetic/SynthTraceJobProducer.java  |    3 +-
 .../org/apache/hadoop/yarn/sls/utils/SLSUtils.java |    8 +-
 .../apache/hadoop/yarn/sls/BaseSLSRunnerTest.java  |    2 +-
 .../apache/hadoop/yarn/sls/TestDagAMSimulator.java |   16 +-
 .../hadoop/yarn/sls/TestSynthJobGeneration.java    |    5 +-
 .../hadoop/yarn/sls/appmaster/TestAMSimulator.java |   95 +-
 .../apache/hadoop/yarn/sls/utils/TestSLSUtils.java |    4 +-
 hadoop-tools/hadoop-streaming/pom.xml              |    6 +
 .../streaming/mapreduce/StreamInputFormat.java     |    6 +-
 hadoop-tools/hadoop-tools-dist/pom.xml             |    6 +
 hadoop-yarn-project/hadoop-yarn/README             |   78 -
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |   12 +
 .../pom.xml                                        |   19 +-
 .../pom.xml                                        |   12 +
 .../hadoop-yarn-services-core/pom.xml              |   12 +
 .../hadoop-yarn/hadoop-yarn-client/pom.xml         |    6 +
 .../yarn/client/api/impl/AHSv2ClientImpl.java      |    4 +-
 .../yarn/client/api/impl/TestAHSv2ClientImpl.java  |   70 -
 .../hadoop-yarn/hadoop-yarn-common/pom.xml         |   18 +-
 .../yarn/logaggregation/AggregatedLogFormat.java   |   17 +-
 .../nodelabels/store/op/NodeLabelMirrorOp.java     |   27 +-
 .../hadoop/yarn/security/PrivilegedEntity.java     |    5 +
 .../org/apache/hadoop/yarn/util/FSDownload.java    |   24 +-
 .../java/org/apache/hadoop/yarn/util/Times.java    |   33 +-
 .../src/main/resources/yarn-default.xml            |   50 +-
 .../org/apache/hadoop/yarn/util/TestTimes.java     |   17 +
 .../pom.xml                                        |   32 +-
 .../hadoop-yarn-server-common/pom.xml              |    6 +
 .../hadoop-yarn-server-nodemanager/pom.xml         |   18 +-
 .../hadoop-yarn-server-resourcemanager/pom.xml     |   18 +-
 .../resourcemanager/DefaultAMSProcessor.java       |   16 +-
 .../yarn/server/resourcemanager/RMAppManager.java  |   87 +-
 .../server/resourcemanager/ResourceManager.java    |    4 +-
 .../nodelabels/RMDelegatedNodeLabelsUpdater.java   |    5 +-
 .../ApplicationPlacementAllocatorFactory.java      |    3 +-
 .../scheduler/capacity/AbstractCSQueue.java        |  185 +-
 .../scheduler/capacity/AbstractLeafQueue.java      |   15 +
 .../scheduler/capacity/AutoCreatedLeafQueue.java   |   23 +-
 .../capacity/AutoCreatedLeafQueueConfig.java       |   15 +
 .../scheduler/capacity/CSQueue.java                |    7 +
 .../scheduler/capacity/CSQueueUtils.java           |   14 +-
 .../capacity/CapacitySchedulerConfiguration.java   |   84 +
 .../capacity/CapacitySchedulerQueueManager.java    |  159 +-
 .../scheduler/capacity/ManagedParentQueue.java     |   41 +-
 .../scheduler/capacity/ParentQueue.java            |   55 +-
 .../scheduler/capacity/PlanQueue.java              |   19 +
 .../capacity/QueueAppLifetimeAndLimitSettings.java |   14 +-
 .../capacity/QueueNodeLabelsSettings.java          |   23 +-
 .../scheduler/capacity/QueueStateHelper.java       |   98 +
 .../scheduler/capacity/ReservationQueue.java       |    6 +
 .../allocator/RegularContainerAllocator.java       |    7 +-
 .../GuaranteedOrZeroCapacityOverTimePolicy.java    |   10 +-
 .../scheduler/policy/FifoComparator.java           |    5 +
 .../resourcemanager/webapp/RMWebServices.java      |   19 +-
 .../webapp/dao/CapacitySchedulerInfo.java          |    2 +-
 .../webapp/dao/CapacitySchedulerQueueInfo.java     |   10 +-
 .../server/resourcemanager/TestAppManager.java     |  732 +++++-
 .../TestApplicationMasterServiceCapacity.java      |  163 ++
 .../TestAMAllocatedToNonExclusivePartition.java    |  123 +
 .../TestAbsoluteResourceConfiguration.java         |   47 +
 .../TestAbsoluteResourceWithAutoQueue.java         |    8 +-
 .../TestCapacitySchedulerAutoCreatedQueueBase.java |   28 +-
 .../scheduler/policy/TestFifoOrderingPolicy.java   |   91 +-
 .../resourcemanager/webapp/TestRMWebServices.java  |    5 +-
 .../webapp/TestRMWebServicesCapacitySched.java     |   47 +-
 ...estRMWebServicesCapacitySchedDynamicConfig.java |  116 +-
 .../webapp/TestRMWebServicesNodeLabels.java        |  938 ++++----
 .../TestRMWebServicesSchedulerActivities.java      |   31 +-
 ...er-response-AbsoluteModeLegacyAutoCreation.json |   24 +-
 .../webapp/scheduler-response-WeightMode.json      |  531 ++++-
 ...onse-WeightModeWithAutoCreatedQueues-After.json | 2397 ++++++++++++++++++--
 ...nse-WeightModeWithAutoCreatedQueues-Before.json |  449 +++-
 .../pom.xml                                        |   12 +
 .../pom.xml                                        |   14 +
 .../pom.xml                                        |    6 +
 .../src/site/markdown/CapacityScheduler.md         |    8 +-
 .../src/site/markdown/NodeLabel.md                 |    8 +
 .../src/site/markdown/ResourceManagerRest.md       |   26 +-
 .../src/main/webapp/bower-shrinkwrap.json          |    9 +-
 .../hadoop-yarn-ui/src/main/webapp/bower.json      |    6 +-
 pom.xml                                            |   58 +-
 639 files changed, 43972 insertions(+), 7455 deletions(-)
 create mode 100644 .BUILDING.txt.swp
 copy hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/cat/CMakeLists.txt => dev-support/docker/pkg-resolver/set-vault-as-baseurl-centos.sh (59%)
 copy hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/{fs/impl/WrappedIOException.java => util/RateLimiting.java} (50%)
 create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java
 create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CloseableTaskPoolSubmitter.java
 create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java
 create mode 100644 hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md
 create mode 100644 hadoop-common-project/hadoop-common/src/site/markdown/release/3.2.3/CHANGELOG.3.2.3.md
 create mode 100644 hadoop-common-project/hadoop-common/src/site/markdown/release/3.2.3/RELEASENOTES.3.2.3.md
 create mode 100644 hadoop-common-project/hadoop-common/src/site/markdown/release/3.3.2/CHANGELOG.3.3.2.md
 create mode 100644 hadoop-common-project/hadoop-common/src/site/markdown/release/3.3.2/RELEASENOTES.3.3.2.md
 create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractContentSummaryTest.java
 copy hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java => hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractContentSummary.java (70%)
 create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestTaskPool.java
 delete mode 100644 hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/security/DummyGroupMapping.java
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/x-platform/types.h (66%)
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-find-mock.cc
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-find-mock.h
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-get-mock.cc
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-get-mock.h
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-ls-mock.cc
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-ls-mock.h
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-setrep-mock.cc
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-setrep-mock.h
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-stat-mock.cc
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-stat-mock.h
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-tail-mock.cc
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/tools/hdfs-tail-mock.h
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/x-platform/types_test.cc (67%)
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/x-platform/types_test.h
 copy hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/{examples/c/cat => tools/hdfs-find}/CMakeLists.txt (62%)
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-find/hdfs-find.cc
 copy hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/{hdfs-copy-to-local/hdfs-copy-to-local.h => hdfs-find/hdfs-find.h} (64%)
 copy hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpRequestLogAppender.java => hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-find/main.cc (52%)
 copy hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/{examples/c/cat => tools/hdfs-get}/CMakeLists.txt (61%)
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-get/hdfs-get.cc (67%)
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-get/hdfs-get.h (58%)
 copy hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpRequestLogAppender.java => hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-get/main.cc (52%)
 copy hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/{examples/c/cat => tools/hdfs-ls}/CMakeLists.txt (63%)
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-ls/hdfs-ls.cc
 copy hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/{hdfs-copy-to-local/hdfs-copy-to-local.h => hdfs-ls/hdfs-ls.h} (66%)
 copy hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpRequestLogAppender.java => hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-ls/main.cc (52%)
 copy hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/{examples/c/cat => tools/hdfs-setrep}/CMakeLists.txt (61%)
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/hdfs-setrep.cc
 copy hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/{hdfs-copy-to-local/hdfs-copy-to-local.h => hdfs-setrep/hdfs-setrep.h} (66%)
 copy hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpRequestLogAppender.java => hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-setrep/main.cc (51%)
 copy hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/{examples/c/cat => tools/hdfs-stat}/CMakeLists.txt (62%)
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-stat/hdfs-stat.cc
 copy hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/{hdfs-copy-to-local/hdfs-copy-to-local.h => hdfs-stat/hdfs-stat.h} (66%)
 copy hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpRequestLogAppender.java => hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-stat/main.cc (51%)
 copy hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/{examples/c/cat => tools/hdfs-tail}/CMakeLists.txt (62%)
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-tail/hdfs-tail.cc
 copy hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/{hdfs-copy-to-local/hdfs-copy-to-local.h => hdfs-tail/hdfs-tail.h} (64%)
 copy hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpRequestLogAppender.java => hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs-tail/main.cc (51%)
 delete mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_find.cc
 delete mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_get.cc
 delete mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_ls.cc
 delete mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_setrep.cc
 delete mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_stat.cc
 delete mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/hdfs_tail.cc
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/internal/set-replication-state.h
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RefreshFairnessPolicyControllerHandler.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/dev-support/jdiff/Apache_Hadoop_HDFS_3.2.3.xml
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/dev-support/jdiff/Apache_Hadoop_HDFS_3.3.2.xml
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerDisabledTracker.java
 rename hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpRequestLogAppender.java => hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFaultInjector.java (52%)
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/metrics/ExternalSPSBeanMetrics.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/metrics/ExternalSPSMXBean.java
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/metrics/package-info.java (67%)
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShellTouch.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterFactory.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterStatisticNames.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/AbstractManifestData.java
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/DiagnosticKeys.java (52%)
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/DirEntry.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/EntryStatus.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/FileEntry.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/ManifestPrinter.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/ManifestSuccessData.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/TaskManifest.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/package-info.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/AuditingIntegration.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/InternalConstants.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestCommitterSupport.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperationsThroughFileSystem.java
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/OutputValidationException.java (55%)
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/package-info.java (66%)
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/package-info.java (59%)
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbortTaskStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitJobStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitTaskStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java
 rename hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/http/TestHttpRequestLogAppender.java => hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/JobOrTaskStage.java (57%)
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/LoadManifestsStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/RenameFilesStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveSuccessFileStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveTaskManifestStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SetupJobStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SetupTaskStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/StageConfig.java
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/StageEventCallbacks.java (67%)
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/TaskAttemptScanDirectoryStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/ValidateRenamedFilesStage.java
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/package-info.java (67%)
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_protocol.md
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractManifestCommitterTest.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/CommitterTestBinding.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterTestSupport.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/StubStoreOperations.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TaggedFileStatus.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCleanupStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCommitTaskStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCreateOutputDirectoriesStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestJobThroughManifestCommitter.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestRenameStageFailure.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestTaskManifestFileIO.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TextOutputForTests.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ThreadLeakTracker.java
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/UnreliableManifestStoreOperations.java
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/package-info.java (65%)
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/contract/localfs.xml
 create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/core-site.xml
 create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java
 create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractContentSummary.java
 create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
 create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java
 create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
 create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java
 create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java
 create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java (67%)
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCleanupStage.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCommitTaskStage.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCreateOutputDirectoriesStage.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsJobThroughManifestCommitter.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsLoadManifestsStage.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestCommitProtocol.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsRenameStageFailure.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTaskManifestFileIO.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java
 copy hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java => hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java (67%)
 create mode 100644 hadoop-tools/hadoop-azure/src/test/resources/core-site.xml
 create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java
 create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java
 create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java
 create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java
 create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java
 create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java
 create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/JobDefinition.java
 create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java
 create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/RMRunner.java
 create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java
 copy hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/{SLSFairScheduler.java => SLSSchedulerCommons.java} (76%)
 delete mode 100644 hadoop-yarn-project/hadoop-yarn/README
 create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueStateHelper.java
 create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAMAllocatedToNonExclusivePartition.java


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/02: HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads (#3964)

Posted by mt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mthakur pushed a commit to branch feature-vectored-io
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 7d1a2c3e2885f861e4cb2cd4278e2c04baac29bc
Author: Mukund Thakur <mt...@cloudera.com>
AuthorDate: Sat Apr 30 04:17:33 2022 +0530

    HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads (#3964)
    
    Part of HADOOP-18103.
    Introducing fs.s3a.vectored.read.min.seek.size and fs.s3a.vectored.read.max.merged.size
    to configure min seek and max read during a vectored IO operation in S3A connector.
    These properties actually define how the ranges will be merged. To completely
    disable merging set fs.s3a.max.readsize.vectored.read to 0.
    
    Contributed By: Mukund Thakur
---
 .../site/markdown/filesystem/fsdatainputstream.md  |  1 +
 .../contract/AbstractContractVectoredReadTest.java | 21 +++---
 .../hadoop/fs/impl/TestVectoredReadUtils.java      | 24 +++++++
 .../java/org/apache/hadoop/test/MoreAsserts.java   | 12 ++++
 .../java/org/apache/hadoop/fs/s3a/Constants.java   | 27 ++++++++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 25 ++++++-
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   | 21 ++++++
 .../org/apache/hadoop/fs/s3a/S3AReadOpContext.java | 21 +++++-
 .../apache/hadoop/fs/s3a/VectoredIOContext.java    | 78 ++++++++++++++++++++++
 .../site/markdown/tools/hadoop-aws/performance.md  | 30 +++++++++
 .../contract/s3a/ITestS3AContractVectoredRead.java | 54 ++++++++++++++-
 11 files changed, 298 insertions(+), 16 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
index 0fe1772d266..e4a2830967e 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
@@ -474,6 +474,7 @@ end of first and start of next range is more than this value.
 
 Maximum number of bytes which can be read in one go after merging the ranges.
 Two ranges won't be merged if the combined data to be read is more than this value.
+Essentially setting this to 0 will disable the merging of ranges.
 
 ## Consistency
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
index eee4b11e739..756c3de85cc 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -18,15 +18,6 @@
 
 package org.apache.hadoop.fs.contract;
 
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -42,6 +33,15 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.FutureIOSupport;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 
@@ -52,7 +52,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
 
   public static final int DATASET_LEN = 64 * 1024;
   private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
-  private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
+  protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
   private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
   private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);
 
@@ -172,6 +172,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     }
   }
 
+  @Test
   public void testSameRanges() throws Exception {
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java
index f789f361905..cfd366701be 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java
@@ -207,6 +207,30 @@ public class TestVectoredReadUtils extends HadoopTestBase {
             VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
 
   }
+
+  @Test
+  public void testMaxSizeZeroDisablesMering() throws Exception {
+    List<FileRange> randomRanges = Arrays.asList(
+            new FileRangeImpl(3000, 110),
+            new FileRangeImpl(3000, 100),
+            new FileRangeImpl(2100, 100)
+    );
+    assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0);
+    assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0);
+    assertEqualRangeCountsAfterMerging(randomRanges, 1, 100, 0);
+  }
+
+  private void assertEqualRangeCountsAfterMerging(List<FileRange> inputRanges,
+                                                  int chunkSize,
+                                                  int minimumSeek,
+                                                  int maxSize) {
+    List<CombinedFileRange> combinedFileRanges = VectoredReadUtils
+            .sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize);
+    Assertions.assertThat(combinedFileRanges)
+            .describedAs("Mismatch in number of ranges post merging")
+            .hasSize(inputRanges.size());
+  }
+
   interface Stream extends PositionedReadable, ByteBufferPositionedReadable {
     // nothing
   }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java
index f83ef9e63d1..f6e6055d78e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java
@@ -86,4 +86,16 @@ public class MoreAsserts {
                     "completed exceptionally")
             .isTrue();
   }
+
+  /**
+   * Assert two same type of values.
+   * @param actual actual value.
+   * @param expected expected value.
+   * @param message error message to print in case of mismatch.
+   */
+  public static <T> void assertEqual(T actual, T expected, String message) {
+    Assertions.assertThat(actual)
+            .describedAs("Mismatch in %s", message)
+            .isEqualTo(expected);
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index e5369b84883..02b23ba8f36 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1108,4 +1108,31 @@ public final class Constants {
    * Require that all S3 access is made through Access Points.
    */
   public static final String AWS_S3_ACCESSPOINT_REQUIRED = "fs.s3a.accesspoint.required";
+
+  /**
+   * What is the smallest reasonable seek in bytes such
+   * that we group ranges together during vectored read operation.
+   * Value : {@value}.
+   */
+  public static final String AWS_S3_VECTOR_READS_MIN_SEEK_SIZE =
+          "fs.s3a.vectored.read.min.seek.size";
+
+  /**
+   * What is the largest merged read size in bytes such
+   * that we group ranges together during vectored read.
+   * Setting this value to 0 will disable merging of ranges.
+   * Value : {@value}.
+   */
+  public static final String AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE =
+          "fs.s3a.vectored.read.max.merged.size";
+
+  /**
+   * Default minimum seek in bytes during vectored reads : {@value}.
+   */
+  public static final int DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE = 4896; // 4K
+
+  /**
+   * Default maximum read size in bytes during vectored reads : {@value}.
+   */
+  public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index b18090f3dfe..3558ee5d2d1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -306,6 +306,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * {@code openFile()}.
    */
   private S3AInputPolicy inputPolicy;
+  /** Vectored IO context. */
+  private VectoredIOContext vectoredIOContext;
+
+  private long readAhead;
   private ChangeDetectionPolicy changeDetectionPolicy;
   private final AtomicBoolean closed = new AtomicBoolean(false);
   private volatile boolean isClosed = false;
@@ -577,6 +581,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
                         DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
           inputPolicy);
+      vectoredIOContext = populateVectoredIOContext(conf);
     } catch (AmazonClientException e) {
       // amazon client exception: stop all services then throw the translation
       cleanupWithLogger(LOG, span);
@@ -590,6 +595,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     }
   }
 
+  /**
+   * Populates the configurations related to vectored IO operation
+   * in the context which has to passed down to input streams.
+   * @param conf configuration object.
+   * @return VectoredIOContext.
+   */
+  private VectoredIOContext populateVectoredIOContext(Configuration conf) {
+    final int minSeekVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
+            DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, 0);
+    final int maxReadSizeVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
+            DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, 0);
+    return new VectoredIOContext()
+            .setMinSeekForVectoredReads(minSeekVectored)
+            .setMaxReadSizeForVectoredReads(maxReadSizeVectored)
+            .build();
+  }
+
   /**
    * Set the client side encryption gauge to 0 or 1, indicating if CSE is
    * enabled through the gauge or not.
@@ -1532,7 +1554,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         invoker,
         statistics,
         statisticsContext,
-        fileStatus)
+        fileStatus,
+        vectoredIOContext)
         .withAuditSpan(auditSpan);
     openFileHelper.applyDefaultOptions(roc);
     return roc.build();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 05d9c7f9fe9..23f31df1645 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -145,6 +145,9 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   private S3AInputPolicy inputPolicy;
   private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
 
+  /** Vectored IO context. */
+  private final VectoredIOContext vectoredIOContext;
+
   /**
    * This is the actual position within the object, used by
    * lazy seek to decide whether to seek on the next read or not.
@@ -212,6 +215,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     setReadahead(ctx.getReadahead());
     this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
     this.unboundedThreadPool = unboundedThreadPool;
+    this.vectoredIOContext = context.getVectoredIOContext();
   }
 
   /**
@@ -859,6 +863,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
       sb.append(" remainingInCurrentRequest=")
           .append(remainingInCurrentRequest());
       sb.append(" ").append(changeTracker);
+      sb.append(" ").append(vectoredIOContext);
       sb.append('\n').append(s);
       sb.append('}');
       return sb.toString();
@@ -905,6 +910,22 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     }
   }
 
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int minSeekForVectorReads() {
+    return vectoredIOContext.getMinSeekForVectorReads();
+  }
+
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int maxReadSizeForVectorReads() {
+    return vectoredIOContext.getMaxReadSizeForVectorReads();
+  }
+
   /**
    * {@inheritDoc}
    * Vectored read implementation for S3AInputStream.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
index f416cf9485d..29e3df1af12 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
@@ -64,6 +64,12 @@ public class S3AReadOpContext extends S3AOpContext {
    */
   private long asyncDrainThreshold;
 
+  /**
+   * Vectored IO context for vectored read api
+   * in {@code S3AInputStream#readVectored(List, IntFunction)}.
+   */
+  private final VectoredIOContext vectoredIOContext;
+
   /**
    * Instantiate.
    * @param path path of read
@@ -71,17 +77,19 @@ public class S3AReadOpContext extends S3AOpContext {
    * @param stats Fileystem statistics (may be null)
    * @param instrumentation statistics context
    * @param dstFileStatus target file status
+   * @param vectoredIOContext context for vectored read operation.
    */
   public S3AReadOpContext(
       final Path path,
       Invoker invoker,
       @Nullable FileSystem.Statistics stats,
       S3AStatisticsContext instrumentation,
-      FileStatus dstFileStatus) {
-
+      FileStatus dstFileStatus,
+      VectoredIOContext vectoredIOContext) {
     super(invoker, stats, instrumentation,
         dstFileStatus);
     this.path = requireNonNull(path);
+    this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext");
   }
 
   /**
@@ -145,6 +153,7 @@ public class S3AReadOpContext extends S3AOpContext {
   }
 
   /**
+<<<<<<< HEAD
    * Set builder value.
    * @param value new value
    * @return the builder
@@ -199,6 +208,14 @@ public class S3AReadOpContext extends S3AOpContext {
     return asyncDrainThreshold;
   }
 
+  /**
+   * Get Vectored IO context for this this read op.
+   * @return vectored IO context.
+   */
+  public VectoredIOContext getVectoredIOContext() {
+    return vectoredIOContext;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/VectoredIOContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/VectoredIOContext.java
new file mode 100644
index 00000000000..31f0ae4cb55
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/VectoredIOContext.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.util.List;
+import java.util.function.IntFunction;
+
+/**
+ * Context related to vectored IO operation.
+ * See {@link S3AInputStream#readVectored(List, IntFunction)}.
+ */
+public class VectoredIOContext {
+
+  /**
+   * What is the smallest reasonable seek that we should group
+   * ranges together during vectored read operation.
+   */
+  private int minSeekForVectorReads;
+
+  /**
+   * What is the largest size that we should group ranges
+   * together during vectored read operation.
+   * Setting this value 0 will disable merging of ranges.
+   */
+  private int maxReadSizeForVectorReads;
+
+  /**
+   * Default no arg constructor.
+   */
+  public VectoredIOContext() {
+  }
+
+  public VectoredIOContext setMinSeekForVectoredReads(int minSeek) {
+    this.minSeekForVectorReads = minSeek;
+    return this;
+  }
+
+  public VectoredIOContext setMaxReadSizeForVectoredReads(int maxSize) {
+    this.maxReadSizeForVectorReads = maxSize;
+    return this;
+  }
+
+  public VectoredIOContext build() {
+    return this;
+  }
+
+  public int getMinSeekForVectorReads() {
+    return minSeekForVectorReads;
+  }
+
+  public int getMaxReadSizeForVectorReads() {
+    return maxReadSizeForVectorReads;
+  }
+
+  @Override
+  public String toString() {
+    return "VectoredIOContext{" +
+            "minSeekForVectorReads=" + minSeekForVectorReads +
+            ", maxReadSizeForVectorReads=" + maxReadSizeForVectorReads +
+            '}';
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
index f398c4cbcbe..06eb137cd9b 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
@@ -55,6 +55,36 @@ it isn't, and some attempts to preserve the metaphor are "aggressively suboptima
 
 To make most efficient use of S3, care is needed.
 
+## <a name="vectoredIO"></a> Improving read performance using Vectored IO
+The S3A FileSystem supports implementation of vectored read api using which
+a client can provide a list of file ranges to read returning a future read
+object associated with each range. For full api specification please see
+[FSDataInputStream](../../hadoop-common-project/hadoop-common/filesystem/fsdatainputstream.html).
+
+The following properties can be configured to optimise vectored reads based
+on the client requirements.
+
+```xml
+<property>
+  <name>fs.s3a.vectored.read.min.seek.size</name>
+  <value>4K</value>
+  <description>
+     What is the smallest reasonable seek in bytes such
+     that we group ranges together during vectored
+     read operation.
+   </description>
+</property>
+<property>
+<name>fs.s3a.vectored.read.max.merged.size</name>
+<value>1M</value>
+<description>
+   What is the largest merged read size in bytes such
+   that we group ranges together during vectored read.
+   Setting this value to 0 will disable merging of ranges.
+</description>
+</property>
+```
+
 ## <a name="fadvise"></a> Improving data input performance through fadvise
 
 The S3A Filesystem client supports the notion of input policies, similar
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
index 255cc6501c2..0752e75d247 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -18,15 +18,23 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileRange;
 import org.apache.hadoop.fs.FileRangeImpl;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 
-import java.util.ArrayList;
-import java.util.List;
+import static org.apache.hadoop.test.MoreAsserts.assertEqual;
 
 public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
 
@@ -42,7 +50,6 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
   /**
    * Overriding in S3 vectored read api fails fast in case of EOF
    * requested range.
-   * @throws Exception
    */
   @Override
   public void testEOFRanges() throws Exception {
@@ -51,4 +58,45 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
     fileRanges.add(new FileRangeImpl(DATASET_LEN, 100));
     testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected");
   }
+
+  @Test
+  public void testMinSeekAndMaxSizeConfigsPropagation() throws Exception {
+    Configuration conf = getFileSystem().getConf();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf,
+            Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
+            Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
+    S3ATestUtils.disableFilesystemCaching(conf);
+    final int configuredMinSeek = 2 * 1024;
+    final int configuredMaxSize = 10 * 1024 * 1024;
+    conf.set(Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, "2K");
+    conf.set(Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, "10M");
+    try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+      try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) {
+        int newMinSeek = fis.minSeekForVectorReads();
+        int newMaxSize = fis.maxReadSizeForVectorReads();
+        assertEqual(newMinSeek, configuredMinSeek,
+                "configured s3a min seek for vectored reads");
+        assertEqual(newMaxSize, configuredMaxSize,
+                "configured s3a max size for vectored reads");
+      }
+    }
+  }
+
+  @Test
+  public void testMinSeekAndMaxSizeDefaultValues() throws Exception {
+    Configuration conf = getFileSystem().getConf();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf,
+            Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
+            Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE);
+    try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+      try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) {
+        int minSeek = fis.minSeekForVectorReads();
+        int maxSize = fis.maxReadSizeForVectorReads();
+        assertEqual(minSeek, Constants.DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
+                "default s3a min seek for vectored reads");
+        assertEqual(maxSize, Constants.DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
+                "default s3a max read size for vectored reads");
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/02: HADOOP-11867. Add a high-performance vectored read API. (#3904)

Posted by mt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mthakur pushed a commit to branch feature-vectored-io
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 8bf45aa5ba654f79c46394798cf24d2737c0db04
Author: Mukund Thakur <mt...@cloudera.com>
AuthorDate: Tue Feb 1 19:52:38 2022 +0530

    HADOOP-11867. Add a high-performance vectored read API. (#3904)
    
    part of HADOOP-18103.
    Add support for multiple ranged vectored read api in PositionedReadable.
    The default iterates through the ranges to read each synchronously,
    but the intent is that FSDataInputStream subclasses can make more
    efficient readers especially in object stores implementation.
    
    Also added implementation in S3A where smaller ranges are merged and
    sliced byte buffers are returned to the readers. All the merged ranged are
    fetched from S3 asynchronously.
    
    Contributed By: Owen O'Malley and Mukund Thakur
---
 dev-support/Jenkinsfile                            |   2 +-
 .../apache/hadoop/fs/BufferedFSInputStream.java    |  27 +-
 .../org/apache/hadoop/fs/ChecksumFileSystem.java   | 208 +++++++++---
 .../org/apache/hadoop/fs/FSDataInputStream.java    |  22 +-
 .../main/java/org/apache/hadoop/fs/FileRange.java  |  55 +++
 .../java/org/apache/hadoop/fs/FileRangeImpl.java   |  69 ++++
 .../org/apache/hadoop/fs/PositionedReadable.java   |  43 ++-
 .../org/apache/hadoop/fs/RawLocalFileSystem.java   | 108 +++++-
 .../apache/hadoop/fs/impl/CombinedFileRange.java   |  71 ++++
 .../apache/hadoop/fs/impl/VectoredReadUtils.java   | 277 +++++++++++++++
 .../site/markdown/filesystem/fsdatainputstream.md  |  31 ++
 .../contract/AbstractContractVectoredReadTest.java | 375 +++++++++++++++++++++
 .../localfs/TestLocalFSContractVectoredRead.java   |  35 ++
 .../rawlocal/TestRawLocalContractVectoredRead.java |  35 ++
 .../hadoop/fs/impl/TestVectoredReadUtils.java      | 344 +++++++++++++++++++
 .../java/org/apache/hadoop/test/MoreAsserts.java   |  37 +-
 hadoop-common-project/pom.xml                      |   1 -
 hadoop-project/pom.xml                             |  11 +
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  14 +-
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   | 288 ++++++++++++++--
 .../contract/s3a/ITestS3AContractVectoredRead.java |  54 +++
 .../hadoop/fs/s3a/TestS3AInputStreamRetry.java     |   3 +-
 .../hadoop-aws/src/test/resources/log4j.properties |   2 +-
 hadoop-tools/hadoop-benchmark/pom.xml              |  94 ++++++
 .../hadoop-benchmark/src/main/assembly/uber.xml    |  33 ++
 .../hadoop-benchmark/src/main/findbugs/exclude.xml |  22 ++
 .../hadoop/benchmark/VectoredReadBenchmark.java    | 245 ++++++++++++++
 .../org/apache/hadoop/benchmark/package-info.java  |  22 ++
 hadoop-tools/pom.xml                               |   1 +
 pom.xml                                            |   1 +
 30 files changed, 2440 insertions(+), 90 deletions(-)

diff --git a/dev-support/Jenkinsfile b/dev-support/Jenkinsfile
index 0ec32e385d2..3f6331b412e 100644
--- a/dev-support/Jenkinsfile
+++ b/dev-support/Jenkinsfile
@@ -47,7 +47,7 @@ pipeline {
 
     options {
         buildDiscarder(logRotator(numToKeepStr: '5'))
-        timeout (time: 24, unit: 'HOURS')
+        timeout (time: 48, unit: 'HOURS')
         timestamps()
         checkoutToSubdirectory('src')
     }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java
index 59345f5d25c..7f3171235c8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,6 +22,9 @@ import java.io.EOFException;
 import java.io.FileDescriptor;
 import java.io.IOException;
 import java.util.StringJoiner;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.IntFunction;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -158,8 +161,24 @@ public class BufferedFSInputStream extends BufferedInputStream
   @Override
   public String toString() {
     return new StringJoiner(", ",
-        BufferedFSInputStream.class.getSimpleName() + "[", "]")
-        .add("in=" + in)
-        .toString();
+            BufferedFSInputStream.class.getSimpleName() + "[", "]")
+            .add("in=" + in)
+            .toString();
+  }
+
+  @Override
+  public int minSeekForVectorReads() {
+    return ((PositionedReadable) in).minSeekForVectorReads();
+  }
+
+  @Override
+  public int maxReadSizeForVectorReads() {
+    return ((PositionedReadable) in).maxReadSizeForVectorReads();
+  }
+
+  @Override
+  public void readVectored(List<? extends FileRange> ranges,
+                           IntFunction<ByteBuffer> allocate) throws IOException {
+    ((PositionedReadable) in).readVectored(ranges, allocate);
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
index 59ffe00bcb2..ad0529a4870 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
@@ -22,17 +22,25 @@ import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
 import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.IntFunction;
+import java.util.zip.CRC32;
 
 import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
+import org.apache.hadoop.fs.impl.VectoredReadUtils;
+import org.apache.hadoop.fs.impl.CombinedFileRange;
 import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
 import org.apache.hadoop.fs.impl.OpenFileParameters;
 import org.apache.hadoop.fs.permission.AclEntry;
@@ -66,7 +74,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
   public static double getApproxChkSumLength(long size) {
     return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
   }
-  
+
   public ChecksumFileSystem(FileSystem fs) {
     super(fs);
   }
@@ -82,7 +90,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
           bytesPerChecksum);
     }
   }
-  
+
   /**
    * Set whether to verify checksum.
    */
@@ -95,7 +103,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
   public void setWriteChecksum(boolean writeChecksum) {
     this.writeChecksum = writeChecksum;
   }
-  
+
   /** get the raw file system */
   @Override
   public FileSystem getRawFileSystem() {
@@ -113,7 +121,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
     return name.startsWith(".") && name.endsWith(".crc");
   }
 
-  /** Return the length of the checksum file given the size of the 
+  /** Return the length of the checksum file given the size of the
    * actual file.
    **/
   public long getChecksumFileLength(Path file, long fileSize) {
@@ -143,18 +151,18 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
     private ChecksumFileSystem fs;
     private FSDataInputStream datas;
     private FSDataInputStream sums;
-    
+
     private static final int HEADER_LENGTH = 8;
-    
+
     private int bytesPerSum = 1;
-    
+
     public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
       throws IOException {
       this(fs, file, fs.getConf().getInt(
-                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 
+                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
                        LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
     }
-    
+
     public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
       throws IOException {
       super( file, fs.getFileStatus(file).getReplication() );
@@ -170,7 +178,8 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
         if (!Arrays.equals(version, CHECKSUM_VERSION))
           throw new IOException("Not a checksum file: "+sumFile);
         this.bytesPerSum = sums.readInt();
-        set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4);
+        set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum,
+            FSInputChecker.CHECKSUM_SIZE);
       } catch (IOException e) {
         // mincing the message is terrible, but java throws permission
         // exceptions as FNF because that's all the method signatures allow!
@@ -182,21 +191,21 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
         set(fs.verifyChecksum, null, 1, 0);
       }
     }
-    
+
     private long getChecksumFilePos( long dataPos ) {
-      return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
+      return HEADER_LENGTH + FSInputChecker.CHECKSUM_SIZE*(dataPos/bytesPerSum);
     }
-    
+
     @Override
     protected long getChunkPosition( long dataPos ) {
       return dataPos/bytesPerSum*bytesPerSum;
     }
-    
+
     @Override
     public int available() throws IOException {
       return datas.available() + super.available();
     }
-    
+
     @Override
     public int read(long position, byte[] b, int off, int len)
       throws IOException {
@@ -214,7 +223,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       }
       return nread;
     }
-    
+
     @Override
     public void close() throws IOException {
       datas.close();
@@ -223,7 +232,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       }
       set(fs.verifyChecksum, null, 1, 0);
     }
-    
+
 
     @Override
     public boolean seekToNewSource(long targetPos) throws IOException {
@@ -246,7 +255,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
         final int checksumsToRead = Math.min(
           len/bytesPerSum, // number of checksums based on len to read
           checksum.length / CHECKSUM_SIZE); // size of checksum buffer
-        long checksumPos = getChecksumFilePos(pos); 
+        long checksumPos = getChecksumFilePos(pos);
         if(checksumPos != sums.getPos()) {
           sums.seek(checksumPos);
         }
@@ -286,8 +295,129 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
     public IOStatistics getIOStatistics() {
       return IOStatisticsSupport.retrieveIOStatistics(datas);
     }
+
+    public static long findChecksumOffset(long dataOffset,
+                                          int bytesPerSum) {
+      return HEADER_LENGTH + (dataOffset/bytesPerSum) * FSInputChecker.CHECKSUM_SIZE;
+    }
+
+    /**
+     * Find the checksum ranges that correspond to the given data ranges.
+     * @param dataRanges the input data ranges, which are assumed to be sorted
+     *                   and non-overlapping
+     * @return a list of AsyncReaderUtils.CombinedFileRange that correspond to
+     *         the checksum ranges
+     */
+    public static List<CombinedFileRange> findChecksumRanges(
+        List<? extends FileRange> dataRanges,
+        int bytesPerSum,
+        int minSeek,
+        int maxSize) {
+      List<CombinedFileRange> result = new ArrayList<>();
+      CombinedFileRange currentCrc = null;
+      for(FileRange range: dataRanges) {
+        long crcOffset = findChecksumOffset(range.getOffset(), bytesPerSum);
+        long crcEnd = findChecksumOffset(range.getOffset() + range.getLength() +
+                                             bytesPerSum - 1, bytesPerSum);
+        if (currentCrc == null ||
+                !currentCrc.merge(crcOffset, crcEnd, range, minSeek, maxSize)) {
+          currentCrc = new CombinedFileRange(crcOffset, crcEnd, range);
+          result.add(currentCrc);
+        }
+      }
+      return result;
+    }
+
+    /**
+     * Check the data against the checksums.
+     * @param sumsBytes the checksum data
+     * @param sumsOffset where from the checksum file this buffer started
+     * @param data the file data
+     * @param dataOffset where the file data started (must be a multiple of
+     *                  bytesPerSum)
+     * @param bytesPerSum how many bytes per a checksum
+     * @param file the path of the filename
+     * @return the data buffer
+     * @throws CompletionException if the checksums don't match
+     */
+    static ByteBuffer checkBytes(ByteBuffer sumsBytes,
+                                 long sumsOffset,
+                                 ByteBuffer data,
+                                 long dataOffset,
+                                 int bytesPerSum,
+                                 Path file) {
+      // determine how many bytes we need to skip at the start of the sums
+      int offset =
+          (int) (findChecksumOffset(dataOffset, bytesPerSum) - sumsOffset);
+      IntBuffer sums = sumsBytes.asIntBuffer();
+      sums.position(offset / FSInputChecker.CHECKSUM_SIZE);
+      ByteBuffer current = data.duplicate();
+      int numChunks = data.remaining() / bytesPerSum;
+      CRC32 crc = new CRC32();
+      // check each chunk to ensure they match
+      for(int c = 0; c < numChunks; ++c) {
+        // set the buffer position and the limit
+        current.limit((c + 1) * bytesPerSum);
+        current.position(c * bytesPerSum);
+        // compute the crc
+        crc.reset();
+        crc.update(current);
+        int expected = sums.get();
+        int calculated = (int) crc.getValue();
+
+        if (calculated != expected) {
+          // cast of c added to silence findbugs
+          long errPosn = dataOffset + (long) c * bytesPerSum;
+          throw new CompletionException(new ChecksumException(
+              "Checksum error: " + file + " at " + errPosn +
+                  " exp: " + expected + " got: " + calculated, errPosn));
+        }
+      }
+      // if everything matches, we return the data
+      return data;
+    }
+
+    @Override
+    public void readVectored(List<? extends FileRange> ranges,
+                             IntFunction<ByteBuffer> allocate) throws IOException {
+      // If the stream doesn't have checksums, just delegate.
+      VectoredReadUtils.validateVectoredReadRanges(ranges);
+      if (sums == null) {
+        datas.readVectored(ranges, allocate);
+        return;
+      }
+      int minSeek = minSeekForVectorReads();
+      int maxSize = maxReadSizeForVectorReads();
+      List<CombinedFileRange> dataRanges =
+          VectoredReadUtils.sortAndMergeRanges(ranges, bytesPerSum,
+              minSeek, maxReadSizeForVectorReads());
+      List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
+          bytesPerSum, minSeek, maxSize);
+      sums.readVectored(checksumRanges, allocate);
+      datas.readVectored(dataRanges, allocate);
+      // Data read is correct. I have verified content of dataRanges.
+      // There is some bug below here as test (testVectoredReadMultipleRanges)
+      // is failing, should be
+      // somewhere while slicing the merged data into smaller user ranges.
+      // Spend some time figuring out but it is a complex code.
+      for(CombinedFileRange checksumRange: checksumRanges) {
+        for(FileRange dataRange: checksumRange.getUnderlying()) {
+          // when we have both the ranges, validate the checksum
+          CompletableFuture<ByteBuffer> result =
+              checksumRange.getData().thenCombineAsync(dataRange.getData(),
+                  (sumBuffer, dataBuffer) ->
+                      checkBytes(sumBuffer, checksumRange.getOffset(),
+                          dataBuffer, dataRange.getOffset(), bytesPerSum, file));
+          // Now, slice the read data range to the user's ranges
+          for(FileRange original: ((CombinedFileRange) dataRange).getUnderlying()) {
+            original.setData(result.thenApply(
+                (b) -> VectoredReadUtils.sliceTo(b, dataRange.getOffset(), original)));
+          }
+        }
+      }
+    }
   }
-  
+
   private static class FSDataBoundedInputStream extends FSDataInputStream {
     private FileSystem fs;
     private Path file;
@@ -298,12 +428,12 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       this.fs = fs;
       this.file = file;
     }
-    
+
     @Override
     public boolean markSupported() {
       return false;
     }
-    
+
     /* Return the file length */
     private long getFileLength() throws IOException {
       if( fileLen==-1L ) {
@@ -311,7 +441,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       }
       return fileLen;
     }
-    
+
     /**
      * Skips over and discards <code>n</code> bytes of data from the
      * input stream.
@@ -335,11 +465,11 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       }
       return super.skip(n);
     }
-    
+
     /**
      * Seek to the given position in the stream.
      * The next read() will be from that position.
-     * 
+     *
      * <p>This method does not allow seek past the end of the file.
      * This produces IOException.
      *
@@ -404,22 +534,22 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
    */
   public static long getChecksumLength(long size, int bytesPerSum) {
     //the checksum length is equal to size passed divided by bytesPerSum +
-    //bytes written in the beginning of the checksum file.  
-    return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
-             CHECKSUM_VERSION.length + 4;  
+    //bytes written in the beginning of the checksum file.
+    return ((size + bytesPerSum - 1) / bytesPerSum) * FSInputChecker.CHECKSUM_SIZE +
+             ChecksumFSInputChecker.HEADER_LENGTH;
   }
 
   /** This class provides an output stream for a checksummed file.
    * It generates checksums for data. */
   private static class ChecksumFSOutputSummer extends FSOutputSummer
       implements IOStatisticsSource, StreamCapabilities {
-    private FSDataOutputStream datas;    
+    private FSDataOutputStream datas;
     private FSDataOutputStream sums;
     private static final float CHKSUM_AS_FRACTION = 0.01f;
     private boolean isClosed = false;
-    
-    public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
-                          Path file, 
+
+    ChecksumFSOutputSummer(ChecksumFileSystem fs,
+                          Path file,
                           boolean overwrite,
                           int bufferSize,
                           short replication,
@@ -440,7 +570,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
       sums.writeInt(bytesPerSum);
     }
-    
+
     @Override
     public void close() throws IOException {
       try {
@@ -451,7 +581,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
         isClosed = true;
       }
     }
-    
+
     @Override
     protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
         int ckoff, int cklen)
@@ -707,7 +837,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
         value = fs.rename(srcCheckFile, dstCheckFile);
       } else if (fs.exists(dstCheckFile)) {
         // no src checksum, so remove dst checksum
-        value = fs.delete(dstCheckFile, true); 
+        value = fs.delete(dstCheckFile, true);
       }
 
       return value;
@@ -739,7 +869,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       return fs.delete(f, true);
     }
   }
-    
+
   final private static PathFilter DEFAULT_FILTER = new PathFilter() {
     @Override
     public boolean accept(Path file) {
@@ -750,7 +880,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
   /**
    * List the statuses of the files/directories in the given path if the path is
    * a directory.
-   * 
+   *
    * @param f
    *          given path
    * @return the statuses of the files/directories in the given path
@@ -771,7 +901,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
   /**
    * List the statuses of the files/directories in the given path if the path is
    * a directory.
-   * 
+   *
    * @param f
    *          given path
    * @return the statuses of the files/directories in the given patch
@@ -782,7 +912,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
   throws IOException {
     return fs.listLocatedStatus(f, DEFAULT_FILTER);
   }
-  
+
   @Override
   public boolean mkdirs(Path f) throws IOException {
     return fs.mkdirs(f);
@@ -832,7 +962,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
     } else {
       FileStatus[] srcs = listStatus(src);
       for (FileStatus srcFile : srcs) {
-        copyToLocalFile(srcFile.getPath(), 
+        copyToLocalFile(srcFile.getPath(),
                         new Path(dst, srcFile.getPath().getName()), copyCrc);
       }
     }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
index b143a4cb63d..52644402ca4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -26,6 +26,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
+import java.util.List;
+import java.util.function.IntFunction;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -51,7 +53,7 @@ public class FSDataInputStream extends DataInputStream
    */
   private final IdentityHashStore<ByteBuffer, ByteBufferPool>
     extendedReadBuffers
-      = new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
+      = new IdentityHashStore<>(0);
 
   public FSDataInputStream(InputStream in) {
     super(in);
@@ -279,4 +281,20 @@ public class FSDataInputStream extends DataInputStream
   public IOStatistics getIOStatistics() {
     return IOStatisticsSupport.retrieveIOStatistics(in);
   }
+
+  @Override
+  public int minSeekForVectorReads() {
+    return ((PositionedReadable) in).minSeekForVectorReads();
+  }
+
+  @Override
+  public int maxReadSizeForVectorReads() {
+    return ((PositionedReadable) in).maxReadSizeForVectorReads();
+  }
+
+  @Override
+  public void readVectored(List<? extends FileRange> ranges,
+                           IntFunction<ByteBuffer> allocate) throws IOException {
+    ((PositionedReadable) in).readVectored(ranges, allocate);
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java
new file mode 100644
index 00000000000..7388e462cc2
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A byte range of a file.
+ * This is used for the asynchronous gather read API of
+ * {@link PositionedReadable#readVectored}.
+ */
+public interface FileRange {
+
+  /**
+   * Get the starting offset of the range.
+   * @return the byte offset of the start
+   */
+  long getOffset();
+
+  /**
+   * Get the length of the range.
+   * @return the number of bytes in the range.
+   */
+  int getLength();
+
+  /**
+   * Get the future data for this range.
+   * @return the future for the {@link ByteBuffer} that contains the data
+   */
+  CompletableFuture<ByteBuffer> getData();
+
+  /**
+   * Set a future for this range's data.
+   * This method is called by {@link PositionedReadable#readVectored} to store the
+   * data for the user to pick up later via {@link #getData}.
+   * @param data the future of the ByteBuffer that will have the data
+   */
+  void setData(CompletableFuture<ByteBuffer> data);
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java
new file mode 100644
index 00000000000..ef5851154be
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A range of bytes from a file with an optional buffer to read those bytes
+ * for zero copy.
+ */
+public class FileRangeImpl implements FileRange {
+  private long offset;
+  private int length;
+  private CompletableFuture<ByteBuffer> reader;
+
+  public FileRangeImpl(long offset, int length) {
+    this.offset = offset;
+    this.length = length;
+  }
+
+  @Override
+  public String toString() {
+    return "range[" + offset + "," + (offset + length) + ")";
+  }
+
+  @Override
+  public long getOffset() {
+    return offset;
+  }
+
+  @Override
+  public int getLength() {
+    return length;
+  }
+
+  public void setOffset(long offset) {
+    this.offset = offset;
+  }
+
+  public void setLength(int length) {
+    this.length = length;
+  }
+
+  @Override
+  public void setData(CompletableFuture<ByteBuffer> pReader) {
+    this.reader = pReader;
+  }
+
+  @Override
+  public CompletableFuture<ByteBuffer> getData() {
+    return reader;
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java
index 6744d17a726..7e543ebf226 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,10 +17,15 @@
  */
 package org.apache.hadoop.fs;
 
-import java.io.*;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.IntFunction;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.impl.VectoredReadUtils;
 
 /**
  * Stream that permits positional reading.
@@ -85,4 +90,38 @@ public interface PositionedReadable {
    * the read operation completed
    */
   void readFully(long position, byte[] buffer) throws IOException;
+
+  /**
+   * What is the smallest reasonable seek?
+   * @return the minimum number of bytes
+   */
+  default int minSeekForVectorReads() {
+    return 4 * 1024;
+  }
+
+  /**
+   * What is the largest size that we should group ranges together as?
+   * @return the number of bytes to read at once
+   */
+  default int maxReadSizeForVectorReads() {
+    return 1024 * 1024;
+  }
+
+  /**
+   * Read fully a list of file ranges asynchronously from this file.
+   * The default iterates through the ranges to read each synchronously, but
+   * the intent is that FSDataInputStream subclasses can make more efficient
+   * readers.
+   * As a result of the call, each range will have FileRange.setData(CompletableFuture)
+   * called with a future that when complete will have a ByteBuffer with the
+   * data from the file's range.
+   * @param ranges the byte ranges to read
+   * @param allocate the function to allocate ByteBuffer
+   * @throws IOException any IOE.
+   */
+  default void readVectored(List<? extends FileRange> ranges,
+                            IntFunction<ByteBuffer> allocate) throws IOException {
+    VectoredReadUtils.readVectored(this, ranges, allocate,  minSeekForVectorReads(),
+        maxReadSizeForVectorReads());
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
index edcc4a8b99e..ef0abce4cea 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,6 +20,7 @@
 package org.apache.hadoop.fs;
 
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.impl.VectoredReadUtils;
 
 import java.io.BufferedOutputStream;
 import java.io.DataOutput;
@@ -33,8 +34,11 @@ import java.io.OutputStream;
 import java.io.FileDescriptor;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.CompletionHandler;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
+import java.nio.file.StandardOpenOption;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.nio.file.attribute.BasicFileAttributeView;
 import java.nio.file.attribute.FileTime;
@@ -44,6 +48,9 @@ import java.util.Locale;
 import java.util.Optional;
 import java.util.StringTokenizer;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.IntFunction;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -125,7 +132,9 @@ public class RawLocalFileSystem extends FileSystem {
   class LocalFSFileInputStream extends FSInputStream implements
       HasFileDescriptor, IOStatisticsSource, StreamCapabilities {
     private FileInputStream fis;
+    private final File name;
     private long position;
+    private AsynchronousFileChannel asyncChannel = null;
 
     /**
      * Minimal set of counters.
@@ -143,7 +152,8 @@ public class RawLocalFileSystem extends FileSystem {
     private final AtomicLong bytesRead;
 
     public LocalFSFileInputStream(Path f) throws IOException {
-      fis = new FileInputStream(pathToFile(f));
+      name = pathToFile(f);
+      fis = new FileInputStream(name);
       bytesRead = ioStatistics.getCounterReference(
           STREAM_READ_BYTES);
     }
@@ -174,10 +184,16 @@ public class RawLocalFileSystem extends FileSystem {
     @Override
     public int available() throws IOException { return fis.available(); }
     @Override
-    public void close() throws IOException { fis.close(); }
-    @Override
     public boolean markSupported() { return false; }
-    
+
+    @Override
+    public void close() throws IOException {
+      fis.close();
+      if (asyncChannel != null) {
+        asyncChannel.close();
+      }
+    }
+
     @Override
     public int read() throws IOException {
       try {
@@ -267,8 +283,88 @@ public class RawLocalFileSystem extends FileSystem {
     public IOStatistics getIOStatistics() {
       return ioStatistics;
     }
+
+    AsynchronousFileChannel getAsyncChannel() throws IOException {
+      if (asyncChannel == null) {
+        synchronized (this) {
+          asyncChannel = AsynchronousFileChannel.open(name.toPath(),
+                  StandardOpenOption.READ);
+        }
+      }
+      return asyncChannel;
+    }
+
+    @Override
+    public void readVectored(List<? extends FileRange> ranges,
+                             IntFunction<ByteBuffer> allocate) throws IOException {
+
+      // Set up all of the futures, so that we can use them if things fail
+      for(FileRange range: ranges) {
+        VectoredReadUtils.validateRangeRequest(range);
+        range.setData(new CompletableFuture<>());
+      }
+      try {
+        AsynchronousFileChannel channel = getAsyncChannel();
+        ByteBuffer[] buffers = new ByteBuffer[ranges.size()];
+        AsyncHandler asyncHandler = new AsyncHandler(channel, ranges, buffers);
+        for(int i = 0; i < ranges.size(); ++i) {
+          FileRange range = ranges.get(i);
+          buffers[i] = allocate.apply(range.getLength());
+          channel.read(buffers[i], range.getOffset(), i, asyncHandler);
+        }
+      } catch (IOException ioe) {
+        LOG.debug("Exception occurred during vectored read ", ioe);
+        for(FileRange range: ranges) {
+          range.getData().completeExceptionally(ioe);
+        }
+      }
+    }
   }
-  
+
+  /**
+   * A CompletionHandler that implements readFully and translates back
+   * into the form of CompletionHandler that our users expect.
+   */
+  static class AsyncHandler implements CompletionHandler<Integer, Integer> {
+    private final AsynchronousFileChannel channel;
+    private final List<? extends FileRange> ranges;
+    private final ByteBuffer[] buffers;
+
+    AsyncHandler(AsynchronousFileChannel channel,
+                 List<? extends FileRange> ranges,
+                 ByteBuffer[] buffers) {
+      this.channel = channel;
+      this.ranges = ranges;
+      this.buffers = buffers;
+    }
+
+    @Override
+    public void completed(Integer result, Integer r) {
+      FileRange range = ranges.get(r);
+      ByteBuffer buffer = buffers[r];
+      if (result == -1) {
+        failed(new EOFException("Read past End of File"), r);
+      } else {
+        if (buffer.remaining() > 0) {
+          // issue a read for the rest of the buffer
+          // QQ: What if this fails? It has the same handler.
+          channel.read(buffer, range.getOffset() + buffer.position(), r, this);
+        } else {
+          // QQ: Why  is this required? I think because we don't want the
+          // user to read data beyond limit.
+          buffer.flip();
+          range.getData().complete(buffer);
+        }
+      }
+    }
+
+    @Override
+    public void failed(Throwable exc, Integer r) {
+      LOG.debug("Failed while reading range {} ", r, exc);
+      ranges.get(r).getData().completeExceptionally(exc);
+    }
+  }
+
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
     getFileStatus(f);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
new file mode 100644
index 00000000000..828a50b4f7b
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileRangeImpl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A file range that represents a set of underlying file ranges.
+ * This is used when we combine the user's FileRange objects
+ * together into a single read for efficiency.
+ */
+public class CombinedFileRange extends FileRangeImpl {
+  private ArrayList<FileRange> underlying = new ArrayList<>();
+
+  public CombinedFileRange(long offset, long end, FileRange original) {
+    super(offset, (int) (end - offset));
+    this.underlying.add(original);
+  }
+
+  /**
+   * Get the list of ranges that were merged together to form this one.
+   * @return the list of input ranges
+   */
+  public List<FileRange> getUnderlying() {
+    return underlying;
+  }
+
+  /**
+   * Merge this input range into the current one, if it is compatible.
+   * It is assumed that otherOffset is greater or equal the current offset,
+   * which typically happens by sorting the input ranges on offset.
+   * @param otherOffset the offset to consider merging
+   * @param otherEnd the end to consider merging
+   * @param other the underlying FileRange to add if we merge
+   * @param minSeek the minimum distance that we'll seek without merging the
+   *                ranges together
+   * @param maxSize the maximum size that we'll merge into a single range
+   * @return true if we have merged the range into this one
+   */
+  public boolean merge(long otherOffset, long otherEnd, FileRange other,
+                       int minSeek, int maxSize) {
+    long end = this.getOffset() + this.getLength();
+    long newEnd = Math.max(end, otherEnd);
+    if (otherOffset - end >= minSeek || newEnd - this.getOffset() > maxSize) {
+      return false;
+    }
+    this.setLength((int) (newEnd - this.getOffset()));
+    underlying.add(other);
+    return true;
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java
new file mode 100644
index 00000000000..9a16e6841dd
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.IntFunction;
+
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.util.Preconditions;
+
+/**
+ * Utility class which implements helper methods used
+ * in vectored IO implementation.
+ */
+public final class VectoredReadUtils {
+
+  /**
+   * Validate a single range.
+   * @param range file range.
+   * @throws EOFException any EOF Exception.
+   */
+  public static void validateRangeRequest(FileRange range)
+          throws EOFException {
+
+    Preconditions.checkArgument(range.getLength() >= 0, "length is negative");
+    if (range.getOffset() < 0) {
+      throw new EOFException("position is negative");
+    }
+  }
+
+  /**
+   * Validate a list of vectored read ranges.
+   * @param ranges list of ranges.
+   * @throws EOFException any EOF exception.
+   */
+  public static void validateVectoredReadRanges(List<? extends FileRange> ranges)
+          throws EOFException {
+    for (FileRange range : ranges) {
+      validateRangeRequest(range);
+    }
+  }
+
+
+
+  /**
+   * Read fully a list of file ranges asynchronously from this file.
+   * The default iterates through the ranges to read each synchronously, but
+   * the intent is that subclasses can make more efficient readers.
+   * The data or exceptions are pushed into {@link FileRange#getData()}.
+   * @param stream the stream to read the data from
+   * @param ranges the byte ranges to read
+   * @param allocate the byte buffer allocation
+   * @param minimumSeek the minimum number of bytes to seek over
+   * @param maximumRead the largest number of bytes to combine into a single read
+   */
+  public static void readVectored(PositionedReadable stream,
+                                  List<? extends FileRange> ranges,
+                                  IntFunction<ByteBuffer> allocate,
+                                  int minimumSeek,
+                                  int maximumRead) {
+    if (isOrderedDisjoint(ranges, 1, minimumSeek)) {
+      for(FileRange range: ranges) {
+        range.setData(readRangeFrom(stream, range, allocate));
+      }
+    } else {
+      for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek,
+          maximumRead)) {
+        CompletableFuture<ByteBuffer> read =
+            readRangeFrom(stream, range, allocate);
+        for(FileRange child: range.getUnderlying()) {
+          child.setData(read.thenApply(
+              (b) -> sliceTo(b, range.getOffset(), child)));
+        }
+      }
+    }
+  }
+
+  /**
+   * Synchronously reads a range from the stream dealing with the combinations
+   * of ByteBuffers buffers and PositionedReadable streams.
+   * @param stream the stream to read from
+   * @param range the range to read
+   * @param allocate the function to allocate ByteBuffers
+   * @return the CompletableFuture that contains the read data
+   */
+  public static CompletableFuture<ByteBuffer> readRangeFrom(PositionedReadable stream,
+                                                            FileRange range,
+                                                            IntFunction<ByteBuffer> allocate) {
+    CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
+    try {
+      ByteBuffer buffer = allocate.apply(range.getLength());
+      if (stream instanceof ByteBufferPositionedReadable) {
+        ((ByteBufferPositionedReadable) stream).readFully(range.getOffset(),
+            buffer);
+        buffer.flip();
+      } else {
+        readNonByteBufferPositionedReadable(stream, range, buffer);
+      }
+      result.complete(buffer);
+    } catch (IOException ioe) {
+      result.completeExceptionally(ioe);
+    }
+    return result;
+  }
+
+  private static void readNonByteBufferPositionedReadable(PositionedReadable stream,
+                                                          FileRange range,
+                                                          ByteBuffer buffer) throws IOException {
+    if (buffer.isDirect()) {
+      buffer.put(readInDirectBuffer(stream, range));
+      buffer.flip();
+    } else {
+      stream.readFully(range.getOffset(), buffer.array(),
+              buffer.arrayOffset(), range.getLength());
+    }
+  }
+
+  private static byte[] readInDirectBuffer(PositionedReadable stream,
+                                           FileRange range) throws IOException {
+    // if we need to read data from a direct buffer and the stream doesn't
+    // support it, we allocate a byte array to use.
+    byte[] tmp = new byte[range.getLength()];
+    stream.readFully(range.getOffset(), tmp, 0, tmp.length);
+    return tmp;
+  }
+
+  /**
+   * Is the given input list.
+   * <ul>
+   *   <li>already sorted by offset</li>
+   *   <li>each range is more than minimumSeek apart</li>
+   *   <li>the start and end of each range is a multiple of chunkSize</li>
+   * </ul>
+   *
+   * @param input the list of input ranges.
+   * @param chunkSize the size of the chunks that the offset and end must align to.
+   * @param minimumSeek the minimum distance between ranges.
+   * @return true if we can use the input list as is.
+   */
+  public static boolean isOrderedDisjoint(List<? extends FileRange> input,
+                                          int chunkSize,
+                                          int minimumSeek) {
+    long previous = -minimumSeek;
+    for(FileRange range: input) {
+      long offset = range.getOffset();
+      long end = range.getOffset() + range.getLength();
+      if (offset % chunkSize != 0 ||
+              end % chunkSize != 0 ||
+              (offset - previous < minimumSeek)) {
+        return false;
+      }
+      previous = end;
+    }
+    return true;
+  }
+
+  /**
+   * Calculates floor value of offset based on chunk size.
+   * @param offset file offset.
+   * @param chunkSize file chunk size.
+   * @return  floor value.
+   */
+  public static long roundDown(long offset, int chunkSize) {
+    if (chunkSize > 1) {
+      return offset - (offset % chunkSize);
+    } else {
+      return offset;
+    }
+  }
+
+  /**
+   * Calculates the ceil value of offset based on chunk size.
+   * @param offset file offset.
+   * @param chunkSize file chunk size.
+   * @return ceil value.
+   */
+  public static long roundUp(long offset, int chunkSize) {
+    if (chunkSize > 1) {
+      long next = offset + chunkSize - 1;
+      return next - (next % chunkSize);
+    } else {
+      return offset;
+    }
+  }
+
+  /**
+   * Sort and merge ranges to optimize the access from the underlying file
+   * system.
+   * The motivations are that:
+   * <ul>
+   *   <li>Upper layers want to pass down logical file ranges.</li>
+   *   <li>Fewer reads have better performance.</li>
+   *   <li>Applications want callbacks as ranges are read.</li>
+   *   <li>Some file systems want to round ranges to be at checksum boundaries.</li>
+   * </ul>
+   *
+   * @param input the list of input ranges
+   * @param chunkSize round the start and end points to multiples of chunkSize
+   * @param minimumSeek the smallest gap that we should seek over in bytes
+   * @param maxSize the largest combined file range in bytes
+   * @return the list of sorted CombinedFileRanges that cover the input
+   */
+  public static List<CombinedFileRange> sortAndMergeRanges(List<? extends FileRange> input,
+                                                           int chunkSize,
+                                                           int minimumSeek,
+                                                           int maxSize) {
+    // sort the ranges by offset
+    FileRange[] ranges = input.toArray(new FileRange[0]);
+    Arrays.sort(ranges, Comparator.comparingLong(FileRange::getOffset));
+    CombinedFileRange current = null;
+    List<CombinedFileRange> result = new ArrayList<>(ranges.length);
+
+    // now merge together the ones that merge
+    for(FileRange range: ranges) {
+      long start = roundDown(range.getOffset(), chunkSize);
+      long end = roundUp(range.getOffset() + range.getLength(), chunkSize);
+      if (current == null || !current.merge(start, end, range, minimumSeek, maxSize)) {
+        current = new CombinedFileRange(start, end, range);
+        result.add(current);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Slice the data that was read to the user's request.
+   * This function assumes that the user's request is completely subsumed by the
+   * read data. This always creates a new buffer pointing to the same underlying
+   * data but with its own mark and position fields such that reading one buffer
+   * can't effect other's mark and position.
+   * @param readData the buffer with the readData
+   * @param readOffset the offset in the file for the readData
+   * @param request the user's request
+   * @return the readData buffer that is sliced to the user's request
+   */
+  public static ByteBuffer sliceTo(ByteBuffer readData, long readOffset,
+                                   FileRange request) {
+    int offsetChange = (int) (request.getOffset() - readOffset);
+    int requestLength = request.getLength();
+    readData = readData.slice();
+    readData.position(offsetChange);
+    readData.limit(offsetChange + requestLength);
+    return readData;
+  }
+
+  /**
+   * private constructor.
+   */
+  private VectoredReadUtils() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
index 090696483be..0fe1772d266 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
@@ -443,6 +443,37 @@ The semantics of this are exactly equivalent to
 That is, the buffer is filled entirely with the contents of the input source
 from position `position`
 
+### `default void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)`
+
+Read fully data for a list of ranges asynchronously. The default implementation
+iterates through the ranges, tries to coalesce the ranges based on values of
+`minSeekForVectorReads` and `maxReadSizeForVectorReads` and then read each merged
+ranges synchronously, but the intent is sub classes can implement efficient
+implementation.
+
+#### Preconditions
+
+For each requested range:
+
+    range.getOffset >= 0 else raise IllegalArgumentException
+    range.getLength >= 0 else raise EOFException
+
+#### Postconditions
+
+For each requested range:
+
+    range.getData() returns CompletableFuture<ByteBuffer> which will have data
+    from range.getOffset to range.getLength.
+
+### `minSeekForVectorReads()`
+
+Smallest reasonable seek. Two ranges won't be merged together if the difference between
+end of first and start of next range is more than this value.
+
+### `maxReadSizeForVectorReads()`
+
+Maximum number of bytes which can be read in one go after merging the ranges.
+Two ranges won't be merged if the combined data to be read is more than this value.
 
 ## Consistency
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
new file mode 100644
index 00000000000..eee4b11e739
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.FutureIOSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+
+@RunWith(Parameterized.class)
+public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class);
+
+  public static final int DATASET_LEN = 64 * 1024;
+  private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
+  private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
+  private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
+  private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);
+
+  private final IntFunction<ByteBuffer> allocate;
+
+  private final String bufferType;
+
+  @Parameterized.Parameters(name = "Buffer type : {0}")
+  public static List<String> params() {
+    return Arrays.asList("direct", "array");
+  }
+
+  public AbstractContractVectoredReadTest(String bufferType) {
+    this.bufferType = bufferType;
+    this.allocate = "array".equals(bufferType) ?
+            ByteBuffer::allocate : ByteBuffer::allocateDirect;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    Path path = path(VECTORED_READ_FILE_NAME);
+    FileSystem fs = getFileSystem();
+    createFile(fs, path, true, DATASET);
+    Path bigFile = path(VECTORED_READ_FILE_1MB_NAME);
+    createFile(fs, bigFile, true, DATASET_MB);
+  }
+
+  @Test
+  public void testVectoredReadMultipleRanges() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      FileRange fileRange = new FileRangeImpl(i * 100, 100);
+      fileRanges.add(fileRange);
+    }
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
+      int i = 0;
+      for (FileRange res : fileRanges) {
+        completableFutures[i++] = res.getData();
+      }
+      CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
+      combinedFuture.get();
+
+      validateVectoredReadResult(fileRanges);
+    }
+  }
+
+  @Test
+  public void testVectoredReadAndReadFully()  throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(new FileRangeImpl(100, 100));
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      byte[] readFullRes = new byte[100];
+      in.readFully(100, readFullRes);
+      ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData());
+      Assertions.assertThat(vecRes)
+              .describedAs("Result from vectored read and readFully must match")
+              .isEqualByComparingTo(ByteBuffer.wrap(readFullRes));
+    }
+  }
+
+  /**
+   * As the minimum seek value is 4*1024,none of the below ranges
+   * will get merged.
+   */
+  @Test
+  public void testDisjointRanges() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(new FileRangeImpl(0, 100));
+    fileRanges.add(new FileRangeImpl(4 * 1024 + 101, 100));
+    fileRanges.add(new FileRangeImpl(16 * 1024 + 101, 100));
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges);
+    }
+  }
+
+  /**
+   * As the minimum seek value is 4*1024, all the below ranges
+   * will get merged into one.
+   */
+  @Test
+  public void testAllRangesMergedIntoOne() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(new FileRangeImpl(0, 100));
+    fileRanges.add(new FileRangeImpl(4 *1024 - 101, 100));
+    fileRanges.add(new FileRangeImpl(8*1024 - 101, 100));
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges);
+    }
+  }
+
+  /**
+   * As the minimum seek value is 4*1024, the first three ranges will be
+   * merged into and other two will remain as it is.
+   */
+  @Test
+  public void testSomeRangesMergedSomeUnmerged() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(new FileRangeImpl(8*1024, 100));
+    fileRanges.add(new FileRangeImpl(14*1024, 100));
+    fileRanges.add(new FileRangeImpl(10*1024, 100));
+    fileRanges.add(new FileRangeImpl(2 *1024 - 101, 100));
+    fileRanges.add(new FileRangeImpl(40*1024, 1024));
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges);
+    }
+  }
+
+  public void testSameRanges() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(new FileRangeImpl(8*1024, 1000));
+    fileRanges.add(new FileRangeImpl(8*1024, 1000));
+    fileRanges.add(new FileRangeImpl(8*1024, 1000));
+    CompletableFuture<FSDataInputStream> builder =
+            fs.openFile(path(VECTORED_READ_FILE_NAME))
+                    .build();
+    try (FSDataInputStream in = builder.get()) {
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges);
+    }
+  }
+
+  @Test
+  public void testVectoredRead1MBFile()  throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(new FileRangeImpl(1293, 25837));
+    CompletableFuture<FSDataInputStream> builder =
+            fs.openFile(path(VECTORED_READ_FILE_1MB_NAME))
+            .build();
+    try (FSDataInputStream in = builder.get()) {
+      in.readVectored(fileRanges, allocate);
+      ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData());
+      FileRange resRange = fileRanges.get(0);
+      assertDatasetEquals((int) resRange.getOffset(), "vecRead",
+              vecRes, resRange.getLength(), DATASET_MB);
+    }
+  }
+
+  @Test
+  public void testOverlappingRanges() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(new FileRangeImpl(0, 1000));
+    fileRanges.add(new FileRangeImpl(90, 900));
+    fileRanges.add(new FileRangeImpl(50, 900));
+    fileRanges.add(new FileRangeImpl(10, 980));
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges);
+    }
+  }
+
+  @Test
+  public void testEOFRanges()  throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(new FileRangeImpl(DATASET_LEN, 100));
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      for (FileRange res : fileRanges) {
+        CompletableFuture<ByteBuffer> data = res.getData();
+        try {
+          ByteBuffer buffer = data.get();
+          // Shouldn't reach here.
+          Assert.fail("EOFException must be thrown while reading EOF");
+        } catch (ExecutionException ex) {
+          // ignore as expected.
+        } catch (Exception ex) {
+          LOG.error("Exception while running vectored read ", ex);
+          Assert.fail("Exception while running vectored read " + ex);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testNegativeLengthRange()  throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(new FileRangeImpl(0, -50));
+    testExceptionalVectoredRead(fs, fileRanges, "Exception is expected");
+  }
+
+  @Test
+  public void testNegativeOffsetRange()  throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(new FileRangeImpl(-1, 50));
+    testExceptionalVectoredRead(fs, fileRanges, "Exception is expected");
+  }
+
+  @Test
+  public void testNormalReadAfterVectoredRead() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = createSomeOverlappingRanges();
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      // read starting 200 bytes
+      byte[] res = new byte[200];
+      in.read(res, 0, 200);
+      ByteBuffer buffer = ByteBuffer.wrap(res);
+      assertDatasetEquals(0, "normal_read", buffer, 200, DATASET);
+      Assertions.assertThat(in.getPos())
+              .describedAs("Vectored read shouldn't change file pointer.")
+              .isEqualTo(200);
+      validateVectoredReadResult(fileRanges);
+    }
+  }
+
+  @Test
+  public void testVectoredReadAfterNormalRead() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = createSomeOverlappingRanges();
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      // read starting 200 bytes
+      byte[] res = new byte[200];
+      in.read(res, 0, 200);
+      ByteBuffer buffer = ByteBuffer.wrap(res);
+      assertDatasetEquals(0, "normal_read", buffer, 200, DATASET);
+      Assertions.assertThat(in.getPos())
+              .describedAs("Vectored read shouldn't change file pointer.")
+              .isEqualTo(200);
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges);
+    }
+  }
+
+  @Test
+  public void testMultipleVectoredReads() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges1 = createSomeOverlappingRanges();
+    List<FileRange> fileRanges2 = createSomeOverlappingRanges();
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges1, allocate);
+      in.readVectored(fileRanges2, allocate);
+      validateVectoredReadResult(fileRanges2);
+      validateVectoredReadResult(fileRanges1);
+    }
+  }
+
+  protected List<FileRange> createSomeOverlappingRanges() {
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(new FileRangeImpl(0, 100));
+    fileRanges.add(new FileRangeImpl(90, 50));
+    return fileRanges;
+  }
+
+  protected void validateVectoredReadResult(List<FileRange> fileRanges)
+          throws ExecutionException, InterruptedException {
+    CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
+    int i = 0;
+    for (FileRange res : fileRanges) {
+      completableFutures[i++] = res.getData();
+    }
+    CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
+    combinedFuture.get();
+
+    for (FileRange res : fileRanges) {
+      CompletableFuture<ByteBuffer> data = res.getData();
+      try {
+        ByteBuffer buffer = FutureIOSupport.awaitFuture(data);
+        assertDatasetEquals((int) res.getOffset(), "vecRead", buffer, res.getLength(), DATASET);
+      } catch (Exception ex) {
+        LOG.error("Exception while running vectored read ", ex);
+        Assert.fail("Exception while running vectored read " + ex);
+      }
+    }
+  }
+
+  protected void testExceptionalVectoredRead(FileSystem fs,
+                                             List<FileRange> fileRanges,
+                                             String s) throws IOException {
+    boolean exRaised = false;
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      // Can we intercept here as done in S3 tests ??
+      in.readVectored(fileRanges, allocate);
+    } catch (EOFException | IllegalArgumentException ex) {
+      // expected.
+      exRaised = true;
+    }
+    Assertions.assertThat(exRaised)
+            .describedAs(s)
+            .isTrue();
+  }
+
+  /**
+   * Assert that the data read matches the dataset at the given offset.
+   * This helps verify that the seek process is moving the read pointer
+   * to the correct location in the file.
+   *  @param readOffset the offset in the file where the read began.
+   * @param operation  operation name for the assertion.
+   * @param data       data read in.
+   * @param length     length of data to check.
+   * @param originalData
+   */
+  private void assertDatasetEquals(
+          final int readOffset, final String operation,
+          final ByteBuffer data,
+          int length, byte[] originalData) {
+    for (int i = 0; i < length; i++) {
+      int o = readOffset + i;
+      assertEquals(operation + " with read offset " + readOffset
+                      + ": data[" + i + "] != DATASET[" + o + "]",
+              originalData[o], data.get());
+    }
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
new file mode 100644
index 00000000000..099e3b946d1
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.localfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestLocalFSContractVectoredRead extends AbstractContractVectoredReadTest {
+
+  public TestLocalFSContractVectoredRead(String bufferType) {
+    super(bufferType);
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new LocalFSContract(conf);
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractVectoredRead.java
new file mode 100644
index 00000000000..cbb31ffe27a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractVectoredRead.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.rawlocal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestRawLocalContractVectoredRead extends AbstractContractVectoredReadTest {
+
+  public TestRawLocalContractVectoredRead(String bufferType) {
+    super(bufferType);
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new RawlocalFSContract(conf);
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java
new file mode 100644
index 00000000000..f789f361905
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.IntFunction;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.test.MoreAsserts.assertFutureCompletedSuccessfully;
+import static org.apache.hadoop.test.MoreAsserts.assertFutureFailedExceptionally;
+
+/**
+ * Test behavior of {@link VectoredReadUtils}.
+ */
+public class TestVectoredReadUtils extends HadoopTestBase {
+
+  @Test
+  public void testSliceTo() {
+    final int size = 64 * 1024;
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    // fill the buffer with data
+    IntBuffer intBuffer = buffer.asIntBuffer();
+    for(int i=0; i < size / Integer.BYTES; ++i) {
+      intBuffer.put(i);
+    }
+    // ensure we don't make unnecessary slices
+    ByteBuffer slice = VectoredReadUtils.sliceTo(buffer, 100,
+        new FileRangeImpl(100, size));
+    Assertions.assertThat(buffer)
+            .describedAs("Slicing on the same offset shouldn't " +
+                    "create a new buffer")
+            .isEqualTo(slice);
+
+    // try slicing a range
+    final int offset = 100;
+    final int sliceStart = 1024;
+    final int sliceLength = 16 * 1024;
+    slice = VectoredReadUtils.sliceTo(buffer, offset,
+        new FileRangeImpl(offset + sliceStart, sliceLength));
+    // make sure they aren't the same, but use the same backing data
+    Assertions.assertThat(buffer)
+            .describedAs("Slicing on new offset should " +
+                    "create a new buffer")
+            .isNotEqualTo(slice);
+    Assertions.assertThat(buffer.array())
+            .describedAs("Slicing should use the same underlying " +
+                    "data")
+            .isEqualTo(slice.array());
+    // test the contents of the slice
+    intBuffer = slice.asIntBuffer();
+    for(int i=0; i < sliceLength / Integer.BYTES; ++i) {
+      assertEquals("i = " + i, i + sliceStart / Integer.BYTES, intBuffer.get());
+    }
+  }
+
+  @Test
+  public void testRounding() {
+    for(int i=5; i < 10; ++i) {
+      assertEquals("i = "+ i, 5, VectoredReadUtils.roundDown(i, 5));
+      assertEquals("i = "+ i, 10, VectoredReadUtils.roundUp(i+1, 5));
+    }
+    assertEquals("Error while roundDown", 13, VectoredReadUtils.roundDown(13, 1));
+    assertEquals("Error while roundUp", 13, VectoredReadUtils.roundUp(13, 1));
+  }
+
+  @Test
+  public void testMerge() {
+    FileRange base = new FileRangeImpl(2000, 1000);
+    CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base);
+
+    // test when the gap between is too big
+    assertFalse("Large gap ranges shouldn't get merged", mergeBase.merge(5000, 6000,
+        new FileRangeImpl(5000, 1000), 2000, 4000));
+    assertEquals("Number of ranges in merged range shouldn't increase",
+            1, mergeBase.getUnderlying().size());
+    assertEquals("post merge offset", 2000, mergeBase.getOffset());
+    assertEquals("post merge length", 1000, mergeBase.getLength());
+
+    // test when the total size gets exceeded
+    assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000,
+        new FileRangeImpl(5000, 1000), 2001, 3999));
+    assertEquals("Number of ranges in merged range shouldn't increase",
+            1, mergeBase.getUnderlying().size());
+    assertEquals("post merge offset", 2000, mergeBase.getOffset());
+    assertEquals("post merge length", 1000, mergeBase.getLength());
+
+    // test when the merge works
+    assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000,
+        new FileRangeImpl(5000, 1000), 2001, 4000));
+    assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
+    assertEquals("post merge offset", 2000, mergeBase.getOffset());
+    assertEquals("post merge length", 4000, mergeBase.getLength());
+
+    // reset the mergeBase and test with a 10:1 reduction
+    mergeBase = new CombinedFileRange(200, 300, base);
+    assertEquals(200, mergeBase.getOffset());
+    assertEquals(100, mergeBase.getLength());
+    assertTrue("ranges should get merged ", mergeBase.merge(500, 600,
+        new FileRangeImpl(5000, 1000), 201, 400));
+    assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
+    assertEquals("post merge offset", 200, mergeBase.getOffset());
+    assertEquals("post merge length", 400, mergeBase.getLength());
+  }
+
+  @Test
+  public void testSortAndMerge() {
+    List<FileRange> input = Arrays.asList(
+        new FileRangeImpl(3000, 100),
+        new FileRangeImpl(2100, 100),
+        new FileRangeImpl(1000, 100)
+        );
+    assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
+    List<CombinedFileRange> outputList = VectoredReadUtils.sortAndMergeRanges(
+        input, 100, 1001, 2500);
+    assertEquals("merged range size", 1, outputList.size());
+    CombinedFileRange output = outputList.get(0);
+    assertEquals("merged range underlying size", 3, output.getUnderlying().size());
+    assertEquals("range[1000,3100)", output.toString());
+    assertTrue("merged output ranges are disjoint",
+            VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
+
+    // the minSeek doesn't allow the first two to merge
+    assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 1000));
+    outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1000, 2100);
+    assertEquals("merged range size", 2, outputList.size());
+    assertEquals("range[1000,1100)", outputList.get(0).toString());
+    assertEquals("range[2100,3100)", outputList.get(1).toString());
+    assertTrue("merged output ranges are disjoint",
+            VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000));
+
+    // the maxSize doesn't allow the third range to merge
+    assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
+    outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1001, 2099);
+    assertEquals("merged range size", 2, outputList.size());
+    assertEquals("range[1000,2200)", outputList.get(0).toString());
+    assertEquals("range[3000,3100)", outputList.get(1).toString());
+    assertTrue("merged output ranges are disjoint",
+            VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
+
+    // test the round up and round down (the maxSize doesn't allow any merges)
+    assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 16, 700));
+    outputList = VectoredReadUtils.sortAndMergeRanges(input, 16, 1001, 100);
+    assertEquals("merged range size", 3, outputList.size());
+    assertEquals("range[992,1104)", outputList.get(0).toString());
+    assertEquals("range[2096,2208)", outputList.get(1).toString());
+    assertEquals("range[2992,3104)", outputList.get(2).toString());
+    assertTrue("merged output ranges are disjoint",
+            VectoredReadUtils.isOrderedDisjoint(outputList, 16, 700));
+  }
+
+  @Test
+  public void testSortAndMergeMoreCases() throws Exception {
+    List<FileRange> input = Arrays.asList(
+            new FileRangeImpl(3000, 110),
+            new FileRangeImpl(3000, 100),
+            new FileRangeImpl(2100, 100),
+            new FileRangeImpl(1000, 100)
+    );
+    assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
+    List<CombinedFileRange> outputList = VectoredReadUtils.sortAndMergeRanges(
+            input, 1, 1001, 2500);
+    assertEquals("merged range size", 1, outputList.size());
+    CombinedFileRange output = outputList.get(0);
+    assertEquals("merged range underlying size", 4, output.getUnderlying().size());
+    assertEquals("range[1000,3110)", output.toString());
+    assertTrue("merged output ranges are disjoint",
+            VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
+
+    outputList = VectoredReadUtils.sortAndMergeRanges(
+            input, 100, 1001, 2500);
+    assertEquals("merged range size", 1, outputList.size());
+    output = outputList.get(0);
+    assertEquals("merged range underlying size", 4, output.getUnderlying().size());
+    assertEquals("range[1000,3200)", output.toString());
+    assertTrue("merged output ranges are disjoint",
+            VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
+
+  }
+  interface Stream extends PositionedReadable, ByteBufferPositionedReadable {
+    // nothing
+  }
+
+  static void fillBuffer(ByteBuffer buffer) {
+    byte b = 0;
+    while (buffer.remaining() > 0) {
+      buffer.put(b++);
+    }
+  }
+
+  @Test
+  public void testReadRangeFromByteBufferPositionedReadable() throws Exception {
+    Stream stream = Mockito.mock(Stream.class);
+    Mockito.doAnswer(invocation -> {
+      fillBuffer(invocation.getArgument(1));
+      return null;
+    }).when(stream).readFully(ArgumentMatchers.anyLong(),
+                              ArgumentMatchers.any(ByteBuffer.class));
+    CompletableFuture<ByteBuffer> result =
+        VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100),
+        ByteBuffer::allocate);
+    assertFutureCompletedSuccessfully(result);
+    ByteBuffer buffer = result.get();
+    assertEquals("Size of result buffer", 100, buffer.remaining());
+    byte b = 0;
+    while (buffer.remaining() > 0) {
+      assertEquals("remain = " + buffer.remaining(), b++, buffer.get());
+    }
+
+    // test an IOException
+    Mockito.reset(stream);
+    Mockito.doThrow(new IOException("foo"))
+        .when(stream).readFully(ArgumentMatchers.anyLong(),
+                                ArgumentMatchers.any(ByteBuffer.class));
+    result =
+        VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100),
+            ByteBuffer::allocate);
+    assertFutureFailedExceptionally(result);
+  }
+
+  static void runReadRangeFromPositionedReadable(IntFunction<ByteBuffer> allocate)
+          throws Exception {
+    PositionedReadable stream = Mockito.mock(PositionedReadable.class);
+    Mockito.doAnswer(invocation -> {
+      byte b=0;
+      byte[] buffer = invocation.getArgument(1);
+      for(int i=0; i < buffer.length; ++i) {
+        buffer[i] = b++;
+      }
+      return null;
+    }).when(stream).readFully(ArgumentMatchers.anyLong(),
+        ArgumentMatchers.any(), ArgumentMatchers.anyInt(),
+        ArgumentMatchers.anyInt());
+    CompletableFuture<ByteBuffer> result =
+        VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100),
+            allocate);
+    assertFutureCompletedSuccessfully(result);
+    ByteBuffer buffer = result.get();
+    assertEquals("Size of result buffer", 100, buffer.remaining());
+    byte b = 0;
+    while (buffer.remaining() > 0) {
+      assertEquals("remain = " + buffer.remaining(), b++, buffer.get());
+    }
+
+    // test an IOException
+    Mockito.reset(stream);
+    Mockito.doThrow(new IOException("foo"))
+        .when(stream).readFully(ArgumentMatchers.anyLong(),
+        ArgumentMatchers.any(), ArgumentMatchers.anyInt(),
+        ArgumentMatchers.anyInt());
+    result =
+        VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100),
+            ByteBuffer::allocate);
+    assertFutureFailedExceptionally(result);
+  }
+
+  @Test
+  public void testReadRangeArray() throws Exception {
+    runReadRangeFromPositionedReadable(ByteBuffer::allocate);
+  }
+
+  @Test
+  public void testReadRangeDirect() throws Exception {
+    runReadRangeFromPositionedReadable(ByteBuffer::allocateDirect);
+  }
+
+  static void validateBuffer(String message, ByteBuffer buffer, int start) {
+    byte expected = (byte) start;
+    while (buffer.remaining() > 0) {
+      assertEquals(message + " remain: " + buffer.remaining(), expected++,
+          buffer.get());
+    }
+  }
+
+  @Test
+  public void testReadVectored() throws Exception {
+    List<FileRange> input = Arrays.asList(new FileRangeImpl(0, 100),
+        new FileRangeImpl(100_000, 100),
+        new FileRangeImpl(200_000, 100));
+    Stream stream = Mockito.mock(Stream.class);
+    Mockito.doAnswer(invocation -> {
+      fillBuffer(invocation.getArgument(1));
+      return null;
+    }).when(stream).readFully(ArgumentMatchers.anyLong(),
+        ArgumentMatchers.any(ByteBuffer.class));
+    // should not merge the ranges
+    VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 100, 100);
+    Mockito.verify(stream, Mockito.times(3))
+        .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
+    for(int b=0; b < input.size(); ++b) {
+      validateBuffer("buffer " + b, input.get(b).getData().get(), 0);
+    }
+  }
+
+  @Test
+  public void testReadVectoredMerge() throws Exception {
+    List<FileRange> input = Arrays.asList(new FileRangeImpl(2000, 100),
+        new FileRangeImpl(1000, 100),
+        new FileRangeImpl(0, 100));
+    Stream stream = Mockito.mock(Stream.class);
+    Mockito.doAnswer(invocation -> {
+      fillBuffer(invocation.getArgument(1));
+      return null;
+    }).when(stream).readFully(ArgumentMatchers.anyLong(),
+        ArgumentMatchers.any(ByteBuffer.class));
+    // should merge the ranges into a single read
+    VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 1000, 2100);
+    Mockito.verify(stream, Mockito.times(1))
+        .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
+    for(int b=0; b < input.size(); ++b) {
+      validateBuffer("buffer " + b, input.get(b).getData().get(), (2 - b) * 1000);
+    }
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java
index 142669b7868..f83ef9e63d1 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java
@@ -19,6 +19,9 @@
 package org.apache.hadoop.test;
 
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 
 /**
@@ -28,17 +31,18 @@ public class MoreAsserts {
 
   /**
    * Assert equivalence for array and iterable
-   * @param <T> the type of the elements
-   * @param s the name/message for the collection
-   * @param expected  the expected array of elements
-   * @param actual    the actual iterable of elements
+   *
+   * @param <T>      the type of the elements
+   * @param s        the name/message for the collection
+   * @param expected the expected array of elements
+   * @param actual   the actual iterable of elements
    */
   public static <T> void assertEquals(String s, T[] expected,
                                       Iterable<T> actual) {
     Iterator<T> it = actual.iterator();
     int i = 0;
     for (; i < expected.length && it.hasNext(); ++i) {
-      Assert.assertEquals("Element "+ i +" for "+ s, expected[i], it.next());
+      Assert.assertEquals("Element " + i + " for " + s, expected[i], it.next());
     }
     Assert.assertTrue("Expected more elements", i == expected.length);
     Assert.assertTrue("Expected less elements", !it.hasNext());
@@ -46,7 +50,8 @@ public class MoreAsserts {
 
   /**
    * Assert equality for two iterables
-   * @param <T> the type of the elements
+   *
+   * @param <T>      the type of the elements
    * @param s
    * @param expected
    * @param actual
@@ -57,10 +62,28 @@ public class MoreAsserts {
     Iterator<T> ita = actual.iterator();
     int i = 0;
     while (ite.hasNext() && ita.hasNext()) {
-      Assert.assertEquals("Element "+ i +" for "+s, ite.next(), ita.next());
+      Assert.assertEquals("Element " + i + " for " + s, ite.next(), ita.next());
     }
     Assert.assertTrue("Expected more elements", !ite.hasNext());
     Assert.assertTrue("Expected less elements", !ita.hasNext());
   }
 
+
+  public static <T> void assertFutureCompletedSuccessfully(CompletableFuture<T> future) {
+    Assertions.assertThat(future.isDone())
+            .describedAs("This future is supposed to be " +
+                    "completed successfully")
+            .isTrue();
+    Assertions.assertThat(future.isCompletedExceptionally())
+            .describedAs("This future is supposed to be " +
+                    "completed successfully")
+            .isFalse();
+  }
+
+  public static <T> void assertFutureFailedExceptionally(CompletableFuture<T> future) {
+    Assertions.assertThat(future.isCompletedExceptionally())
+            .describedAs("This future is supposed to be " +
+                    "completed exceptionally")
+            .isTrue();
+  }
 }
diff --git a/hadoop-common-project/pom.xml b/hadoop-common-project/pom.xml
index b36dbf30610..f167a079a9b 100644
--- a/hadoop-common-project/pom.xml
+++ b/hadoop-common-project/pom.xml
@@ -56,5 +56,4 @@
       </plugin>
     </plugins>
   </build>
-
 </project>
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 76ba2eeaa02..52f651b80c2 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -217,6 +217,7 @@
     <nodejs.version>v12.22.1</nodejs.version>
     <yarnpkg.version>v1.22.5</yarnpkg.version>
     <apache-ant.version>1.10.11</apache-ant.version>
+    <jmh.version>1.20</jmh.version>
   </properties>
 
   <dependencyManagement>
@@ -1561,6 +1562,16 @@
          </exclusion>
        </exclusions>
      </dependency>
+      <dependency>
+        <groupId>org.openjdk.jmh</groupId>
+        <artifactId>jmh-core</artifactId>
+        <version>${jmh.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.openjdk.jmh</groupId>
+        <artifactId>jmh-generator-annprocess</artifactId>
+        <version>${jmh.version}</version>
+      </dependency>
      <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-test</artifactId>
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 15e240f9018..b18090f3dfe 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -1443,11 +1443,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     fileInformation.applyOptions(readContext);
     LOG.debug("Opening '{}'", readContext);
     return new FSDataInputStream(
-        new S3AInputStream(
-            readContext.build(),
-            createObjectAttributes(path, fileStatus),
-            createInputStreamCallbacks(auditSpan),
-            inputStreamStats));
+            new S3AInputStream(
+                  readContext.build(),
+                  createObjectAttributes(path, fileStatus),
+                  createInputStreamCallbacks(auditSpan),
+                  inputStreamStats,
+                  unboundedThreadPool));
   }
 
   /**
@@ -4795,9 +4796,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   /**
    * This is a proof of concept of a select API.
    * @param source path to source data
-   * @param expression select expression
    * @param options request configuration from the builder.
-   * @param providedStatus any passed in status
+   * @param fileInformation any passed in information.
    * @return the stream of the results
    * @throws IOException IO failure
    */
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 6beeb2891ee..05d9c7f9fe9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -19,38 +19,49 @@
 package org.apache.hadoop.fs.s3a;
 
 import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.IntFunction;
 
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.impl.CombinedFileRange;
+import org.apache.hadoop.fs.impl.FutureIOSupport;
+import org.apache.hadoop.fs.impl.VectoredReadUtils;
 import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
-import org.apache.hadoop.fs.PathIOException;
-import org.apache.hadoop.fs.StreamCapabilities;
-import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.util.functional.CallableRaisingIOE;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.util.concurrent.CompletableFuture;
-
 import static java.util.Objects.requireNonNull;
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
+import static org.apache.hadoop.fs.impl.VectoredReadUtils.isOrderedDisjoint;
+import static org.apache.hadoop.fs.impl.VectoredReadUtils.sliceTo;
+import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortAndMergeRanges;
 import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;
 import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
 import static org.apache.hadoop.util.StringUtils.toLowerCase;
@@ -88,6 +99,13 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    * size of a buffer to create when draining the stream.
    */
   private static final int DRAIN_BUFFER_SIZE = 16384;
+  /**
+   * This is the maximum temporary buffer size we use while
+   * populating the data in direct byte buffers during a vectored IO
+   * operation. This is to ensure that when a big range of data is
+   * requested in direct byte buffer doesn't leads to OOM errors.
+   */
+  private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024;
 
   /**
    * This is the public position; the one set in {@link #seek(long)}
@@ -111,6 +129,11 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   private S3ObjectInputStream wrappedStream;
   private final S3AReadOpContext context;
   private final InputStreamCallbacks client;
+
+  /**
+   * Thread pool used for vectored IO operation.
+   */
+  private final ThreadPoolExecutor unboundedThreadPool;
   private final String bucket;
   private final String key;
   private final String pathStr;
@@ -160,12 +183,13 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    * @param ctx operation context
    * @param s3Attributes object attributes
    * @param client S3 client to use
-   * @param streamStatistics statistics for this stream
+   * @param unboundedThreadPool thread pool to use.
    */
   public S3AInputStream(S3AReadOpContext ctx,
-      S3ObjectAttributes s3Attributes,
-      InputStreamCallbacks client,
-      S3AInputStreamStatistics streamStatistics) {
+                        S3ObjectAttributes s3Attributes,
+                        InputStreamCallbacks client,
+                        S3AInputStreamStatistics streamStatistics,
+                        ThreadPoolExecutor unboundedThreadPool) {
     Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
         "No Bucket");
     Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
@@ -187,6 +211,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     setInputPolicy(ctx.getInputPolicy());
     setReadahead(ctx.getReadahead());
     this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
+    this.unboundedThreadPool = unboundedThreadPool;
   }
 
   /**
@@ -880,6 +905,231 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     }
   }
 
+  /**
+   * {@inheritDoc}
+   * Vectored read implementation for S3AInputStream.
+   * @param ranges the byte ranges to read.
+   * @param allocate the function to allocate ByteBuffer.
+   * @throws IOException IOE if any.
+   */
+  @Override
+  public void readVectored(List<? extends FileRange> ranges,
+                           IntFunction<ByteBuffer> allocate) throws IOException {
+
+    LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges);
+    checkNotClosed();
+    for (FileRange range : ranges) {
+      validateRangeRequest(range);
+      CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
+      range.setData(result);
+    }
+
+    if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+      LOG.debug("Not merging the ranges as they are disjoint");
+      for(FileRange range: ranges) {
+        ByteBuffer buffer = allocate.apply(range.getLength());
+        unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
+      }
+    } else {
+      LOG.debug("Trying to merge the ranges as they are not disjoint");
+      List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
+              1, minSeekForVectorReads(),
+              maxReadSizeForVectorReads());
+      LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
+              ranges.size(), combinedFileRanges.size());
+      for(CombinedFileRange combinedFileRange: combinedFileRanges) {
+        CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
+        ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
+        combinedFileRange.setData(result);
+        unboundedThreadPool.submit(
+            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, buffer));
+      }
+    }
+    LOG.debug("Finished submitting vectored read to threadpool" +
+            " on path {} for ranges {} ", pathStr, ranges);
+  }
+
+  /**
+   * Read data in the combinedFileRange and update data in buffers
+   * of all underlying ranges.
+   * @param combinedFileRange combined range.
+   * @param buffer combined buffer.
+   */
+  private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange,
+                                                  ByteBuffer buffer) {
+    // Not putting read single range call inside try block as
+    // exception if any occurred during this call will be raised
+    // during awaitFuture call while getting the combined buffer.
+    readSingleRange(combinedFileRange, buffer);
+    try {
+      // In case of single range we return the original byte buffer else
+      // we return slice byte buffers for each child ranges.
+      ByteBuffer combinedBuffer = FutureIOSupport.awaitFuture(combinedFileRange.getData());
+      if (combinedFileRange.getUnderlying().size() == 1) {
+        combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
+      } else {
+        for (FileRange child : combinedFileRange.getUnderlying()) {
+          updateOriginalRange(child, combinedBuffer, combinedFileRange);
+        }
+      }
+    } catch (Exception ex) {
+      LOG.warn("Exception occurred while reading combined range from file {}", pathStr, ex);
+      for(FileRange child : combinedFileRange.getUnderlying()) {
+        child.getData().completeExceptionally(ex);
+      }
+    }
+  }
+
+  /**
+   * Update data in child range from combined range.
+   * @param child child range.
+   * @param combinedBuffer combined buffer.
+   * @param combinedFileRange combined range.
+   */
+  private void updateOriginalRange(FileRange child,
+                                   ByteBuffer combinedBuffer,
+                                   CombinedFileRange combinedFileRange) {
+    LOG.trace("Start Filling original range [{}, {}) from combined range [{}, {}) ",
+            child.getOffset(), child.getLength(),
+            combinedFileRange.getOffset(), combinedFileRange.getLength());
+    ByteBuffer childBuffer = sliceTo(combinedBuffer, combinedFileRange.getOffset(), child);
+    child.getData().complete(childBuffer);
+    LOG.trace("End Filling original range [{}, {}) from combined range [{}, {}) ",
+            child.getOffset(), child.getLength(),
+            combinedFileRange.getOffset(), combinedFileRange.getLength());
+  }
+
+  /**
+   *  // Check if we can use contentLength returned by http GET request.
+   * Validates range parameters.
+   * @param range requested range.
+   * @throws EOFException end of file exception.
+   */
+  private void validateRangeRequest(FileRange range) throws EOFException {
+    VectoredReadUtils.validateRangeRequest(range);
+    if(range.getOffset() + range.getLength() > contentLength) {
+      LOG.warn("Requested range [{}, {}) is beyond EOF for path {}",
+              range.getOffset(), range.getLength(), pathStr);
+      throw new EOFException("Requested range [" + range.getOffset() +", "
+              + range.getLength() + ") is beyond EOF for path " + pathStr);
+    }
+  }
+
+  /**
+   * TODO: Add retry in client.getObject(). not present in older reads why here??
+   * Okay retry is being done in the top layer during read.
+   * But if we do here in the top layer, one issue I am thinking is
+   * what if there is some error which happened during filling the buffer
+   * If we retry that old offsets of heap buffers can be overwritten ?
+   * I think retry should be only added in {@link S3AInputStream#getS3Object}
+   * Read data from S3 for this range and populate the bufffer.
+   * @param range range of data to read.
+   * @param buffer buffer to fill.
+   */
+  private void readSingleRange(FileRange range, ByteBuffer buffer) {
+    LOG.debug("Start reading range {} from path {} ", range, pathStr);
+    S3Object objectRange = null;
+    S3ObjectInputStream objectContent = null;
+    try {
+      long position = range.getOffset();
+      int length = range.getLength();
+      final String operationName = "readRange";
+      objectRange = getS3Object(operationName, position, length);
+      objectContent = objectRange.getObjectContent();
+      if (objectContent == null) {
+        throw new PathIOException(uri,
+                "Null IO stream received during " + operationName);
+      }
+      populateBuffer(length, buffer, objectContent);
+      range.getData().complete(buffer);
+    } catch (Exception ex) {
+      LOG.warn("Exception while reading a range {} from path {} ", range, pathStr, ex);
+      range.getData().completeExceptionally(ex);
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, objectRange, objectContent);
+    }
+    LOG.debug("Finished reading range {} from path {} ", range, pathStr);
+  }
+
+  /**
+   * Populates the buffer with data from objectContent
+   * till length. Handles both direct and heap byte buffers.
+   * @param length length of data to populate.
+   * @param buffer buffer to fill.
+   * @param objectContent result retrieved from S3 store.
+   * @throws IOException any IOE.
+   */
+  private void populateBuffer(int length,
+                              ByteBuffer buffer,
+                              S3ObjectInputStream objectContent) throws IOException {
+    if (buffer.isDirect()) {
+      int readBytes = 0;
+      int offset = 0;
+      byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE];
+      while (readBytes < length) {
+        int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ?
+                TMP_BUFFER_MAX_SIZE
+                : length - readBytes;
+        readByteArray(objectContent, tmp, 0, currentLength);
+        buffer.put(tmp, 0, currentLength);
+        offset = offset + currentLength;
+        readBytes = readBytes + currentLength;
+      }
+      buffer.flip();
+    } else {
+      readByteArray(objectContent, buffer.array(), 0, length);
+    }
+  }
+
+  public void readByteArray(S3ObjectInputStream objectContent,
+                            byte[] dest,
+                            int offset,
+                            int length) throws IOException {
+    int readBytes = 0;
+    while (readBytes < length) {
+      int readBytesCurr = objectContent.read(dest,
+              offset + readBytes,
+              length - readBytes);
+      readBytes +=readBytesCurr;
+      if (readBytesCurr < 0) {
+        throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
+      }
+    }
+  }
+
+  /**
+   * Read data from S3 using a http request.
+   * This also handles if file has been changed while http call
+   * is getting executed. If file has been changed RemoteFileChangedException
+   * is thrown.
+   * @param operationName name of the operation for which get object on S3 is called.
+   * @param position position of the object to be read from S3.
+   * @param length length from position of the object to be read from S3.
+   * @return S3Object
+   * @throws IOException exception if any.
+   */
+  private S3Object getS3Object(String operationName, long position,
+                               int length) throws IOException {
+    final GetObjectRequest request = client.newGetRequest(key)
+            .withRange(position, position + length - 1);
+    changeTracker.maybeApplyConstraint(request);
+    DurationTracker tracker = streamStatistics.initiateGetRequest();
+    S3Object objectRange;
+    Invoker invoker = context.getReadInvoker();
+    try {
+      objectRange = invoker.retry(operationName, pathStr, true,
+          () -> client.getObject(request));
+    } catch (IOException ex) {
+      tracker.failed();
+      throw ex;
+    } finally {
+      tracker.close();
+    }
+    changeTracker.processResponse(objectRange, operationName,
+            position);
+    return objectRange;
+  }
+
   /**
    * Access the input stream statistics.
    * This is for internal testing and may be removed without warning.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
new file mode 100644
index 00000000000..255cc6501c2
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
+
+  public ITestS3AContractVectoredRead(String bufferType) {
+    super(bufferType);
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  /**
+   * Overriding in S3 vectored read api fails fast in case of EOF
+   * requested range.
+   * @throws Exception
+   */
+  @Override
+  public void testEOFRanges() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(new FileRangeImpl(DATASET_LEN, 100));
+    testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected");
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
index 62f5bff35c4..c62bf5daca3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
@@ -111,7 +111,8 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
         s3AReadOpContext,
         s3ObjectAttributes,
         getMockedInputStreamCallback(),
-        s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics());
+        s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(),
+            null);
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
index fc287e9845c..c831999008f 100644
--- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
+++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
@@ -52,7 +52,7 @@ log4j.logger.org.apache.hadoop.ipc.Server=WARN
 
 # for debugging low level S3a operations, uncomment these lines
 # Log all S3A classes
-#log4j.logger.org.apache.hadoop.fs.s3a=DEBUG
+log4j.logger.org.apache.hadoop.fs.s3a=DEBUG
 #log4j.logger.org.apache.hadoop.fs.s3a.S3AUtils=INFO
 #log4j.logger.org.apache.hadoop.fs.s3a.Listing=INFO
 
diff --git a/hadoop-tools/hadoop-benchmark/pom.xml b/hadoop-tools/hadoop-benchmark/pom.xml
new file mode 100644
index 00000000000..3d742fab5c8
--- /dev/null
+++ b/hadoop-tools/hadoop-benchmark/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-project</artifactId>
+    <version>3.4.0-SNAPSHOT</version>
+    <relativePath>../../hadoop-project/pom.xml</relativePath>
+  </parent>
+  <artifactId>hadoop-benchmark</artifactId>
+  <version>3.4.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+
+  <name>Apache Hadoop Common Benchmark</name>
+  <description>Apache Hadoop Common Benchmark</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.openjdk.jmh</groupId>
+      <artifactId>jmh-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.openjdk.jmh</groupId>
+      <artifactId>jmh-generator-annprocess</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifest>
+              <mainClass>org.apache.hadoop.benchmark.VectoredReadBenchmark</mainClass>
+            </manifest>
+          </archive>
+          <descriptors>
+            <descriptor>src/main/assembly/uber.xml</descriptor>
+          </descriptors>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id> <!-- this is used for inheritance merges -->
+            <phase>package</phase> <!-- bind to the packaging phase -->
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+          <configuration>
+            <excludeFilterFile>${basedir}/src/main/findbugs/exclude.xml</excludeFilterFile>
+          </configuration>
+      </plugin>
+      <plugin>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-maven-plugin</artifactId>
+        <configuration>
+          <excludeFilterFile>${basedir}/src/main/findbugs/exclude.xml</excludeFilterFile>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/hadoop-tools/hadoop-benchmark/src/main/assembly/uber.xml b/hadoop-tools/hadoop-benchmark/src/main/assembly/uber.xml
new file mode 100644
index 00000000000..014eab951b3
--- /dev/null
+++ b/hadoop-tools/hadoop-benchmark/src/main/assembly/uber.xml
@@ -0,0 +1,33 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<assembly>
+  <id>uber</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <dependencySets>
+    <dependencySet>
+      <outputDirectory>/</outputDirectory>
+      <useProjectArtifact>true</useProjectArtifact>
+      <unpack>true</unpack>
+      <scope>runtime</scope>
+    </dependencySet>
+  </dependencySets>
+  <containerDescriptorHandlers>
+    <containerDescriptorHandler>
+      <handlerName>metaInf-services</handlerName>
+    </containerDescriptorHandler>
+  </containerDescriptorHandlers>
+</assembly>
diff --git a/hadoop-tools/hadoop-benchmark/src/main/findbugs/exclude.xml b/hadoop-tools/hadoop-benchmark/src/main/findbugs/exclude.xml
new file mode 100644
index 00000000000..05f2a067cf0
--- /dev/null
+++ b/hadoop-tools/hadoop-benchmark/src/main/findbugs/exclude.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<FindBugsFilter>
+  <Match>
+    <Class name="~org\.apache\.hadoop\.benchmark\.generated.*"/>
+  </Match>
+  <Match>
+    <Class name="~org\.openjdk\.jmh\.infra\.generated.*"/>
+  </Match>
+</FindBugsFilter>
diff --git a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java
new file mode 100644
index 00000000000..aaee951d72c
--- /dev/null
+++ b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.benchmark;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.file.FileSystems;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.IntFunction;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+public class VectoredReadBenchmark {
+
+  static final Path DATA_PATH = getTestDataPath();
+  static final String DATA_PATH_PROPERTY = "bench.data";
+  static final int READ_SIZE = 64 * 1024;
+  static final long SEEK_SIZE = 1024L * 1024;
+
+
+  static Path getTestDataPath() {
+    String value = System.getProperty(DATA_PATH_PROPERTY);
+    return new Path(value == null ? "/tmp/taxi.orc" : value);
+  }
+
+  @State(Scope.Thread)
+  public static class FileSystemChoice {
+
+    @Param({"local", "raw"})
+    private String fileSystemKind;
+
+    private Configuration conf;
+    private FileSystem fs;
+
+    @Setup(Level.Trial)
+    public void setup() {
+      conf = new Configuration();
+      try {
+        LocalFileSystem local = FileSystem.getLocal(conf);
+        fs = "raw".equals(fileSystemKind) ? local.getRaw() : local;
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Can't get filesystem", e);
+      }
+    }
+  }
+
+  @State(Scope.Thread)
+  public static class BufferChoice {
+    @Param({"direct", "array"})
+    private String bufferKind;
+
+    private IntFunction<ByteBuffer> allocate;
+    @Setup(Level.Trial)
+    public void setup() {
+      allocate = "array".equals(bufferKind)
+                     ? ByteBuffer::allocate : ByteBuffer::allocateDirect;
+    }
+  }
+
+  @Benchmark
+  public void asyncRead(FileSystemChoice fsChoice,
+                        BufferChoice bufferChoice,
+                        Blackhole blackhole) throws Exception {
+    FSDataInputStream stream = fsChoice.fs.open(DATA_PATH);
+    List<FileRange> ranges = new ArrayList<>();
+    for(int m=0; m < 100; ++m) {
+      FileRangeImpl range = new FileRangeImpl(m * SEEK_SIZE, READ_SIZE);
+      ranges.add(range);
+    }
+    stream.readVectored(ranges, bufferChoice.allocate);
+    for(FileRange range: ranges) {
+      blackhole.consume(range.getData().get());
+    }
+    stream.close();
+  }
+
+  static class Joiner implements CompletionHandler<ByteBuffer, FileRange> {
+    private int remaining;
+    private final ByteBuffer[] result;
+    private Throwable exception = null;
+
+    Joiner(int total) {
+      remaining = total;
+      result = new ByteBuffer[total];
+    }
+
+    synchronized void finish() {
+      remaining -= 1;
+      if (remaining == 0) {
+        notify();
+      }
+    }
+
+    synchronized ByteBuffer[] join() throws InterruptedException, IOException {
+      while (remaining > 0 && exception == null) {
+        wait();
+      }
+      if (exception != null) {
+        throw new IOException("problem reading", exception);
+      }
+      return result;
+    }
+
+
+    @Override
+    public synchronized void completed(ByteBuffer buffer, FileRange attachment) {
+      result[--remaining] = buffer;
+      if (remaining == 0) {
+        notify();
+      }
+    }
+
+    @Override
+    public synchronized void failed(Throwable exc, FileRange attachment) {
+      this.exception = exc;
+      notify();
+    }
+  }
+
+  static class FileRangeCallback extends FileRangeImpl implements
+          CompletionHandler<Integer, FileRangeCallback> {
+    private final AsynchronousFileChannel channel;
+    private final ByteBuffer buffer;
+    private int completed = 0;
+    private final Joiner joiner;
+
+    FileRangeCallback(AsynchronousFileChannel channel, long offset,
+                      int length, Joiner joiner, ByteBuffer buffer) {
+      super(offset, length);
+      this.channel = channel;
+      this.joiner = joiner;
+      this.buffer = buffer;
+    }
+
+    @Override
+    public void completed(Integer result, FileRangeCallback attachment) {
+      final int bytes = result;
+      if (bytes == -1) {
+        failed(new EOFException("Read past end of file"), this);
+      }
+      completed += bytes;
+      if (completed < this.getLength()) {
+        channel.read(buffer, this.getOffset() + completed, this, this);
+      } else {
+        buffer.flip();
+        joiner.finish();
+      }
+    }
+
+    @Override
+    public void failed(Throwable exc, FileRangeCallback attachment) {
+      joiner.failed(exc, this);
+    }
+  }
+
+  @Benchmark
+  public void asyncFileChanArray(BufferChoice bufferChoice,
+                                 Blackhole blackhole) throws Exception {
+    java.nio.file.Path path = FileSystems.getDefault().getPath(DATA_PATH.toString());
+    AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
+    List<FileRangeImpl> ranges = new ArrayList<>();
+    Joiner joiner = new Joiner(100);
+    for(int m=0; m < 100; ++m) {
+      ByteBuffer buffer = bufferChoice.allocate.apply(READ_SIZE);
+      FileRangeCallback range = new FileRangeCallback(channel, m * SEEK_SIZE,
+          READ_SIZE, joiner, buffer);
+      ranges.add(range);
+      channel.read(buffer, range.getOffset(), range, range);
+    }
+    joiner.join();
+    channel.close();
+    blackhole.consume(ranges);
+  }
+
+  @Benchmark
+  public void syncRead(FileSystemChoice fsChoice,
+                       Blackhole blackhole) throws Exception {
+    FSDataInputStream stream = fsChoice.fs.open(DATA_PATH);
+    List<byte[]> result = new ArrayList<>();
+    for(int m=0; m < 100; ++m) {
+      byte[] buffer = new byte[READ_SIZE];
+      stream.readFully(m * SEEK_SIZE, buffer);
+      result.add(buffer);
+    }
+    blackhole.consume(result);
+    stream.close();
+  }
+
+  /**
+   * Run the benchmarks.
+   * @param args the pathname of a 100MB data file
+   * @throws Exception any ex.
+   */
+  public static void main(String[] args) throws Exception {
+    OptionsBuilder opts = new OptionsBuilder();
+    opts.include("VectoredReadBenchmark");
+    opts.jvmArgs("-server", "-Xms256m", "-Xmx2g",
+        "-D" + DATA_PATH_PROPERTY + "=" + args[0]);
+    opts.forks(1);
+    new Runner(opts.build()).run();
+  }
+}
diff --git a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/package-info.java b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/package-info.java
new file mode 100644
index 00000000000..95d6977e3ab
--- /dev/null
+++ b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Benchmark for Vectored Read IO operations.
+ */
+package org.apache.hadoop.benchmark;
diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml
index f026bc261e0..4e934cd101f 100644
--- a/hadoop-tools/pom.xml
+++ b/hadoop-tools/pom.xml
@@ -51,6 +51,7 @@
     <module>hadoop-azure-datalake</module>
     <module>hadoop-aliyun</module>
     <module>hadoop-fs2img</module>
+    <module>hadoop-benchmark</module>
   </modules>
 
   <build>
diff --git a/pom.xml b/pom.xml
index 1f6aa885666..8f8ebcdceee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -544,6 +544,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/x
             <exclude>licenses-binary/**</exclude>
             <exclude>dev-support/docker/pkg-resolver/packages.json</exclude>
             <exclude>dev-support/docker/pkg-resolver/platforms.json</exclude>
+            <exclude>**/target/**</exclude>
          </excludes>
        </configuration>
       </plugin>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org