You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/03/17 11:28:37 UTC

[hadoop] 03/03: HADOOP-18163. hadoop-azure support for the Manifest Committer of MAPREDUCE-7341

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 8294bd5a37c0de15af576700c6cba46791eddd07
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Wed Mar 16 15:41:03 2022 +0000

    HADOOP-18163. hadoop-azure support for the Manifest Committer of MAPREDUCE-7341
    
    Follow-on patch to MAPREDUCE-7341, adding ABFS support and tests
    
    * resilient rename
    * tests for job commit through the manifest committer.
    
    contains
    - HADOOP-17976. ABFS etag extraction inconsistent between LIST and HEAD calls
    - HADOOP-16204. ABFS tests to include terasort
    
    Contributed by Steve Loughran.
    
    Change-Id: I0a7d4043bdf19bcb00c033fc389730109b93b77f
---
 hadoop-project/pom.xml                             |   7 +
 hadoop-tools/hadoop-azure/pom.xml                  |  81 ++++-
 .../src/config/checkstyle-suppressions.xml         |   3 +
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |   9 +
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    | 197 +++++++++---
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  34 +-
 .../commit/AbfsManifestStoreOperations.java        | 130 ++++++++
 .../commit/AzureManifestCommitterFactory.java      |  58 ++++
 .../azurebfs/commit/ResilientCommitByRename.java   | 101 ++++++
 .../hadoop/fs/azurebfs/commit/package-info.java    |  28 ++
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   3 +
 .../constants/FileSystemConfigurations.java        |   5 +
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |  99 +++++-
 .../fs/azurebfs/services/SimpleKeyProvider.java    |  10 +-
 .../fs/azurebfs/AbstractAbfsIntegrationTest.java   |  11 +-
 .../fs/azurebfs/ITestAbfsReadWriteAndSeek.java     |  13 +-
 .../ITestAzureBlobFileSystemDelegationSAS.java     |   5 +-
 .../fs/azurebfs/ITestCustomerProvidedKey.java      |   3 +-
 .../fs/azurebfs/commit/AbfsCommitTestHelper.java   |  49 +++
 .../azurebfs/commit/AbstractAbfsClusterITest.java  | 260 +++++++++++++++
 .../fs/azurebfs/commit/ITestAbfsCleanupStage.java  |  54 ++++
 .../azurebfs/commit/ITestAbfsCommitTaskStage.java  |  54 ++++
 .../ITestAbfsCreateOutputDirectoriesStage.java     |  54 ++++
 .../ITestAbfsJobThroughManifestCommitter.java      | 101 ++++++
 .../commit/ITestAbfsLoadManifestsStage.java        |  55 ++++
 .../commit/ITestAbfsManifestCommitProtocol.java    |  62 ++++
 .../commit/ITestAbfsManifestStoreOperations.java   | 175 ++++++++++
 .../commit/ITestAbfsRenameStageFailure.java        |  69 ++++
 .../commit/ITestAbfsTaskManifestFileIO.java        |  54 ++++
 .../fs/azurebfs/commit/ITestAbfsTerasort.java      | 353 +++++++++++++++++++++
 .../hadoop/fs/azurebfs/commit/package-info.java    |  24 ++
 .../azurebfs/contract/AbfsFileSystemContract.java  |   2 +-
 .../hadoop-azure/src/test/resources/core-site.xml  |  25 ++
 33 files changed, 2106 insertions(+), 82 deletions(-)

diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 193f898..80e07f0 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -386,6 +386,13 @@
 
       <dependency>
         <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-mapreduce-client-core</artifactId>
+        <version>${hadoop.version}</version>
+        <type>test-jar</type>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
         <version>${hadoop.version}</version>
       </dependency>
diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index a9f58e3..40aeec0 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -219,6 +219,67 @@
       <scope>test</scope>
     </dependency>
 
+    <!--
+    the mapreduce-client-core module is compiled against for the
+    manifest committer support. It is not needed on the classpath
+    except when using this committer, so it only tagged as
+    "provided".
+    It is not exported as a transitive dependency of the JAR.
+    Applications which wish to to use the manifest committer
+    will need to explicity add the mapreduce JAR to their classpath.
+    This is already done by MapReduce and Spark.
+    -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <!--
+    These are only added to the classpath for tests,
+    and are not exported by the test JAR as transitive
+    dependencies.
+    -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-hs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-examples</artifactId>
+      <scope>test</scope>
+      <type>jar</type>
+    </dependency>
+    <!-- artifacts needed to bring up a Mini MR Yarn cluster-->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-app</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-app</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-distcp</artifactId>
@@ -319,7 +380,7 @@
                     <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
                     <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
                     <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
-                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
                     <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
                     <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
                     <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@@ -350,7 +411,7 @@
                     <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
                     <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
                     <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
-                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
                     <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
                     <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
                     <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@@ -392,7 +453,7 @@
                     <!-- surefire.forkNumber won't do the parameter -->
                     <!-- substitution.  Putting a prefix in front of it like -->
                     <!-- "fork-" makes it work. -->
-                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
                     <!-- Propagate scale parameters -->
                     <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
                     <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
@@ -482,7 +543,7 @@
                     <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
                     <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
                     <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
-                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
                     <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
                     <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
                     <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@@ -526,7 +587,7 @@
                     <!-- surefire.forkNumber won't do the parameter -->
                     <!-- substitution.  Putting a prefix in front of it like -->
                     <!-- "fork-" makes it work. -->
-                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
                     <!-- Propagate scale parameters -->
                     <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
                     <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
@@ -544,6 +605,7 @@
                     <exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
                     <exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
                     <exclude>**/azurebfs/services/ITestReadBufferManager.java</exclude>
+                    <exclude>**/azurebfs/commit/*.java</exclude>
                   </excludes>
 
                 </configuration>
@@ -572,7 +634,7 @@
                     <!-- surefire.forkNumber won't do the parameter -->
                     <!-- substitution.  Putting a prefix in front of it like -->
                     <!-- "fork-" makes it work. -->
-                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
                     <!-- Propagate scale parameters -->
                     <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
                     <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
@@ -585,6 +647,7 @@
                     <include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
                     <include>**/azurebfs/ITestSmallWriteOptimization.java</include>
                     <include>**/azurebfs/services/ITestReadBufferManager.java</include>
+                    <include>**/azurebfs/commit/*.java</include>
                   </includes>
                 </configuration>
               </execution>
@@ -634,7 +697,7 @@
                     <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
                     <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
                     <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
-                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
                     <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
                     <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
                     <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@@ -664,7 +727,7 @@
                     <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
                     <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
                     <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
-                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
                     <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
                     <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
                     <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@@ -706,7 +769,7 @@
                     <!-- surefire.forkNumber won't do the parameter -->
                     <!-- substitution.  Putting a prefix in front of it like -->
                     <!-- "fork-" makes it work. -->
-                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
                     <!-- Propagate scale parameters -->
                     <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
                     <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
index 070c8c1..fd2a7c2 100644
--- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
+++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
@@ -48,4 +48,7 @@
               files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
     <suppress checks="ParameterNumber|VisibilityModifier"
               files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestSmallWriteOptimization.java"/>
+    <!-- allow tests to use _ for ordering. -->
+    <suppress checks="MethodName"
+              files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
 </suppressions>
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 9719da7..fafc303 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -260,6 +260,11 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
   private boolean enableAutoThrottling;
 
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_IO_RATE_LIMIT,
+      MinValue = 0,
+      DefaultValue = RATE_LIMIT_DEFAULT)
+  private int rateLimit;
+
   @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY,
       DefaultValue = DEFAULT_FS_AZURE_USER_AGENT_PREFIX)
   private String userAgentId;
@@ -726,6 +731,10 @@ public class AbfsConfiguration{
     return this.enableAutoThrottling;
   }
 
+  public int getRateLimit() {
+    return rateLimit;
+  }
+
   public String getCustomUserAgentPrefix() {
     return this.userAgentId;
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index ae70b8d..46141e7 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -27,6 +27,7 @@ import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.AccessDeniedException;
+import java.time.Duration;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.ArrayList;
@@ -42,13 +43,17 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import org.apache.hadoop.io.IOUtils;
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
 import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
@@ -94,9 +99,12 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.fs.store.DataBlocks;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.RateLimiting;
+import org.apache.hadoop.util.RateLimitingFactory;
 import org.apache.hadoop.util.functional.RemoteIterators;
 import org.apache.hadoop.util.DurationInfo;
 import org.apache.hadoop.util.LambdaUtils;
@@ -143,6 +151,9 @@ public class AzureBlobFileSystem extends FileSystem
   /** Maximum Active blocks per OutputStream. */
   private int blockOutputActiveBlocks;
 
+  /** Rate limiting for operations which use it to throttle their IO. */
+  private RateLimiting rateLimiting;
+
   @Override
   public void initialize(URI uri, Configuration configuration)
       throws IOException {
@@ -215,7 +226,7 @@ public class AzureBlobFileSystem extends FileSystem
     }
 
     AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
-
+    rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit());
     LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
   }
 
@@ -261,7 +272,7 @@ public class AzureBlobFileSystem extends FileSystem
       InputStream inputStream = abfsStore
           .openFileForRead(qualifiedPath, parameters, statistics, tracingContext);
       return new FSDataInputStream(inputStream);
-    } catch(AzureBlobFileSystemException ex) {
+    } catch (AzureBlobFileSystemException ex) {
       checkException(path, ex);
       return null;
     }
@@ -290,8 +301,13 @@ public class AzureBlobFileSystem extends FileSystem
   }
 
   @Override
-  public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize,
-      final short replication, final long blockSize, final Progressable progress) throws IOException {
+  public FSDataOutputStream create(final Path f,
+      final FsPermission permission,
+      final boolean overwrite,
+      final int bufferSize,
+      final short replication,
+      final long blockSize,
+      final Progressable progress) throws IOException {
     LOG.debug("AzureBlobFileSystem.create path: {} permission: {} overwrite: {} bufferSize: {}",
         f,
         permission,
@@ -311,7 +327,7 @@ public class AzureBlobFileSystem extends FileSystem
           FsPermission.getUMask(getConf()), tracingContext);
       statIncrement(FILES_CREATED);
       return new FSDataOutputStream(outputStream, statistics);
-    } catch(AzureBlobFileSystemException ex) {
+    } catch (AzureBlobFileSystemException ex) {
       checkException(f, ex);
       return null;
     }
@@ -340,8 +356,12 @@ public class AzureBlobFileSystem extends FileSystem
 
   @Override
   @SuppressWarnings("deprecation")
-  public FSDataOutputStream createNonRecursive(final Path f, final FsPermission permission,
-      final EnumSet<CreateFlag> flags, final int bufferSize, final short replication, final long blockSize,
+  public FSDataOutputStream createNonRecursive(final Path f,
+      final FsPermission permission,
+      final EnumSet<CreateFlag> flags,
+      final int bufferSize,
+      final short replication,
+      final long blockSize,
       final Progressable progress) throws IOException {
 
     // Check if file should be appended or overwritten. Assume that the file
@@ -365,7 +385,8 @@ public class AzureBlobFileSystem extends FileSystem
   }
 
   @Override
-  public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException {
+  public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress)
+      throws IOException {
     LOG.debug(
         "AzureBlobFileSystem.append path: {} bufferSize: {}",
         f.toString(),
@@ -380,7 +401,7 @@ public class AzureBlobFileSystem extends FileSystem
       OutputStream outputStream = abfsStore
           .openFileForWrite(qualifiedPath, statistics, false, tracingContext);
       return new FSDataOutputStream(outputStream, statistics);
-    } catch(AzureBlobFileSystemException ex) {
+    } catch (AzureBlobFileSystemException ex) {
       checkException(f, ex);
       return null;
     }
@@ -403,7 +424,7 @@ public class AzureBlobFileSystem extends FileSystem
         fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat,
         listener);
     // rename under same folder;
-    if(makeQualified(parentFolder).equals(qualifiedDstPath)) {
+    if (makeQualified(parentFolder).equals(qualifiedDstPath)) {
       return tryGetFileStatus(qualifiedSrcPath, tracingContext) != null;
     }
 
@@ -438,24 +459,99 @@ public class AzureBlobFileSystem extends FileSystem
 
       qualifiedDstPath = makeQualified(adjustedDst);
 
-      abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext);
+      abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null);
       return true;
-    } catch(AzureBlobFileSystemException ex) {
+    } catch (AzureBlobFileSystemException ex) {
       LOG.debug("Rename operation failed. ", ex);
       checkException(
-              src,
-              ex,
-              AzureServiceErrorCode.PATH_ALREADY_EXISTS,
-              AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
-              AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
-              AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
-              AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
-              AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
+          src,
+          ex,
+          AzureServiceErrorCode.PATH_ALREADY_EXISTS,
+          AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
+          AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
+          AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
+          AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
+          AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
       return false;
     }
 
   }
 
+  /**
+   * Private method to create resilient commit support.
+   * @return a new instance
+   * @param path destination path
+   * @throws IOException problem probing store capabilities
+   * @throws UnsupportedOperationException if the store lacks this support
+   */
+  @InterfaceAudience.Private
+  public ResilientCommitByRename createResilientCommitSupport(final Path path)
+      throws IOException {
+
+    if (!hasPathCapability(path,
+        CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME)) {
+      throw new UnsupportedOperationException(
+          "Resilient commit support not available for " + path);
+    }
+    return new ResilientCommitByRenameImpl();
+  }
+
+  /**
+   * Resilient commit support.
+   * Provided as a nested class to avoid contaminating the
+   * FS instance with too many private methods which end up
+   * being used widely (as has happened to the S3A FS)
+   */
+  public class ResilientCommitByRenameImpl implements ResilientCommitByRename {
+
+    /**
+     * Perform the rename.
+     * This will be rate limited, as well as able to recover
+     * from rename errors if the etag was passed in.
+     * @param source path to source file
+     * @param dest destination of rename.
+     * @param sourceEtag etag of source file. may be null or empty
+     * @return the outcome of the operation
+     * @throws IOException any rename failure which was not recovered from.
+     */
+    public Pair<Boolean, Duration> commitSingleFileByRename(
+        final Path source,
+        final Path dest,
+        @Nullable final String sourceEtag) throws IOException {
+
+      LOG.debug("renameFileWithEtag source: {} dest: {} etag {}", source, dest, sourceEtag);
+      statIncrement(CALL_RENAME);
+
+      trailingPeriodCheck(dest);
+      Path qualifiedSrcPath = makeQualified(source);
+      Path qualifiedDstPath = makeQualified(dest);
+
+      TracingContext tracingContext = new TracingContext(clientCorrelationId,
+          fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat,
+          listener);
+
+      if (qualifiedSrcPath.equals(qualifiedDstPath)) {
+        // rename to itself is forbidden
+        throw new PathIOException(qualifiedSrcPath.toString(), "cannot rename object onto self");
+      }
+
+      // acquire one IO permit
+      final Duration waitTime = rateLimiting.acquire(1);
+
+      try {
+        final boolean recovered = abfsStore.rename(qualifiedSrcPath,
+            qualifiedDstPath, tracingContext, sourceEtag);
+        return Pair.of(recovered, waitTime);
+      } catch (AzureBlobFileSystemException ex) {
+        LOG.debug("Rename operation failed. ", ex);
+        checkException(source, ex);
+        // never reached
+        return null;
+      }
+
+    }
+  }
+
   @Override
   public boolean delete(final Path f, final boolean recursive) throws IOException {
     LOG.debug(
@@ -533,7 +629,7 @@ public class AzureBlobFileSystem extends FileSystem
    * @throws IllegalArgumentException if the path has a trailing period (.)
    */
   private void trailingPeriodCheck(Path path) throws IllegalArgumentException {
-    while (!path.isRoot()){
+    while (!path.isRoot()) {
       String pathToString = path.toString();
       if (pathToString.length() != 0) {
         if (pathToString.charAt(pathToString.length() - 1) == '.') {
@@ -541,8 +637,7 @@ public class AzureBlobFileSystem extends FileSystem
               "ABFS does not allow files or directories to end with a dot.");
         }
         path = path.getParent();
-      }
-      else {
+      } else {
         break;
       }
     }
@@ -601,10 +696,10 @@ public class AzureBlobFileSystem extends FileSystem
 
   @Override
   public FileStatus getFileStatus(final Path f) throws IOException {
-      TracingContext tracingContext = new TracingContext(clientCorrelationId,
-          fileSystemId, FSOperationType.GET_FILESTATUS, tracingHeaderFormat,
-          listener);
-      return getFileStatus(f, tracingContext);
+    TracingContext tracingContext = new TracingContext(clientCorrelationId,
+        fileSystemId, FSOperationType.GET_FILESTATUS, tracingHeaderFormat,
+        listener);
+    return getFileStatus(f, tracingContext);
   }
 
   private FileStatus getFileStatus(final Path path,
@@ -615,7 +710,7 @@ public class AzureBlobFileSystem extends FileSystem
 
     try {
       return abfsStore.getFileStatus(qualifiedPath, tracingContext);
-    } catch(AzureBlobFileSystemException ex) {
+    } catch (AzureBlobFileSystemException ex) {
       checkException(path, ex);
       return null;
     }
@@ -639,7 +734,7 @@ public class AzureBlobFileSystem extends FileSystem
           fileSystemId, FSOperationType.BREAK_LEASE, tracingHeaderFormat,
           listener);
       abfsStore.breakLease(qualifiedPath, tracingContext);
-    } catch(AzureBlobFileSystemException ex) {
+    } catch (AzureBlobFileSystemException ex) {
       checkException(f, ex);
     }
   }
@@ -666,7 +761,6 @@ public class AzureBlobFileSystem extends FileSystem
     return super.makeQualified(path);
   }
 
-
   @Override
   public Path getWorkingDirectory() {
     return this.workingDir;
@@ -689,8 +783,8 @@ public class AzureBlobFileSystem extends FileSystem
   @Override
   public Path getHomeDirectory() {
     return makeQualified(new Path(
-            FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX
-                + "/" + abfsStore.getUser()));
+        FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX
+            + "/" + abfsStore.getUser()));
   }
 
   /**
@@ -714,8 +808,8 @@ public class AzureBlobFileSystem extends FileSystem
     }
     final String blobLocationHost = abfsStore.getAbfsConfiguration().getAzureBlockLocationHost();
 
-    final String[] name = { blobLocationHost };
-    final String[] host = { blobLocationHost };
+    final String[] name = {blobLocationHost};
+    final String[] host = {blobLocationHost};
     long blockSize = file.getBlockSize();
     if (blockSize <= 0) {
       throw new IllegalArgumentException(
@@ -790,15 +884,14 @@ public class AzureBlobFileSystem extends FileSystem
           }
         });
       }
-    }
-    finally {
+    } finally {
       executorService.shutdownNow();
     }
 
     return true;
   }
 
-   /**
+  /**
    * Set owner of a path (i.e. a file or a directory).
    * The parameters owner and group cannot both be null.
    *
@@ -828,9 +921,9 @@ public class AzureBlobFileSystem extends FileSystem
 
     try {
       abfsStore.setOwner(qualifiedPath,
-              owner,
-              group,
-              tracingContext);
+          owner,
+          group,
+          tracingContext);
     } catch (AzureBlobFileSystemException ex) {
       checkException(path, ex);
     }
@@ -847,7 +940,10 @@ public class AzureBlobFileSystem extends FileSystem
    * @throws IllegalArgumentException If name is null or empty or if value is null
    */
   @Override
-  public void setXAttr(final Path path, final String name, final byte[] value, final EnumSet<XAttrSetFlag> flag)
+  public void setXAttr(final Path path,
+      final String name,
+      final byte[] value,
+      final EnumSet<XAttrSetFlag> flag)
       throws IOException {
     LOG.debug("AzureBlobFileSystem.setXAttr path: {}", path);
 
@@ -971,7 +1067,7 @@ public class AzureBlobFileSystem extends FileSystem
     if (!getIsNamespaceEnabled(tracingContext)) {
       throw new UnsupportedOperationException(
           "modifyAclEntries is only supported by storage accounts with the "
-          + "hierarchical namespace enabled.");
+              + "hierarchical namespace enabled.");
     }
 
     if (aclSpec == null || aclSpec.isEmpty()) {
@@ -1006,7 +1102,7 @@ public class AzureBlobFileSystem extends FileSystem
     if (!getIsNamespaceEnabled(tracingContext)) {
       throw new UnsupportedOperationException(
           "removeAclEntries is only supported by storage accounts with the "
-          + "hierarchical namespace enabled.");
+              + "hierarchical namespace enabled.");
     }
 
     if (aclSpec == null || aclSpec.isEmpty()) {
@@ -1038,7 +1134,7 @@ public class AzureBlobFileSystem extends FileSystem
     if (!getIsNamespaceEnabled(tracingContext)) {
       throw new UnsupportedOperationException(
           "removeDefaultAcl is only supported by storage accounts with the "
-          + "hierarchical namespace enabled.");
+              + "hierarchical namespace enabled.");
     }
 
     Path qualifiedPath = makeQualified(path);
@@ -1068,7 +1164,7 @@ public class AzureBlobFileSystem extends FileSystem
     if (!getIsNamespaceEnabled(tracingContext)) {
       throw new UnsupportedOperationException(
           "removeAcl is only supported by storage accounts with the "
-          + "hierarchical namespace enabled.");
+              + "hierarchical namespace enabled.");
     }
 
     Path qualifiedPath = makeQualified(path);
@@ -1101,7 +1197,7 @@ public class AzureBlobFileSystem extends FileSystem
     if (!getIsNamespaceEnabled(tracingContext)) {
       throw new UnsupportedOperationException(
           "setAcl is only supported by storage accounts with the hierarchical "
-          + "namespace enabled.");
+              + "namespace enabled.");
     }
 
     if (aclSpec == null || aclSpec.size() == 0) {
@@ -1133,7 +1229,7 @@ public class AzureBlobFileSystem extends FileSystem
     if (!getIsNamespaceEnabled(tracingContext)) {
       throw new UnsupportedOperationException(
           "getAclStatus is only supported by storage account with the "
-          + "hierarchical namespace enabled.");
+              + "hierarchical namespace enabled.");
     }
 
     Path qualifiedPath = makeQualified(path);
@@ -1243,7 +1339,7 @@ public class AzureBlobFileSystem extends FileSystem
 
   private boolean fileSystemExists() throws IOException {
     LOG.debug(
-            "AzureBlobFileSystem.fileSystemExists uri: {}", uri);
+        "AzureBlobFileSystem.fileSystemExists uri: {}", uri);
     try {
       TracingContext tracingContext = new TracingContext(clientCorrelationId,
           fileSystemId, FSOperationType.TEST_OP, tracingHeaderFormat, listener);
@@ -1534,8 +1630,9 @@ public class AzureBlobFileSystem extends FileSystem
     case CommonPathCapabilities.FS_PERMISSIONS:
     case CommonPathCapabilities.FS_APPEND:
     case CommonPathCapabilities.ETAGS_AVAILABLE:
-    case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME:
       return true;
+
+    case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME:
     case CommonPathCapabilities.FS_ACLS:
       return getIsNamespaceEnabled(
           new TracingContext(clientCorrelationId, fileSystemId,
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index f4f8959..046f9f0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Listenable
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -878,7 +879,22 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     client.breakLease(getRelativePath(path), tracingContext);
   }
 
-  public void rename(final Path source, final Path destination, TracingContext tracingContext) throws
+  /**
+   * Rename a file or directory.
+   * If a source etag is passed in, the operation will attempt to recover
+   * from a missing source file by probing the destination for
+   * existence and comparing etags.
+   * @param source path to source file
+   * @param destination destination of rename.
+   * @param tracingContext trace context
+   * @param sourceEtag etag of source file. may be null or empty
+   * @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
+   * @return true if recovery was needed and succeeded.
+   */
+  public boolean rename(final Path source,
+      final Path destination,
+      final TracingContext tracingContext,
+      final String sourceEtag) throws
           AzureBlobFileSystemException {
     final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
     long countAggregate = 0;
@@ -898,23 +914,29 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
 
     String sourceRelativePath = getRelativePath(source);
     String destinationRelativePath = getRelativePath(destination);
+    // was any operation recovered from?
+    boolean recovered = false;
 
     do {
       try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) {
-        AbfsRestOperation op = client
-            .renamePath(sourceRelativePath, destinationRelativePath,
-                continuation, tracingContext);
+        final Pair<AbfsRestOperation, Boolean> pair =
+            client.renamePath(sourceRelativePath, destinationRelativePath,
+                continuation, tracingContext, sourceEtag);
+
+        AbfsRestOperation op = pair.getLeft();
         perfInfo.registerResult(op.getResult());
         continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
         perfInfo.registerSuccess(true);
         countAggregate++;
         shouldContinue = continuation != null && !continuation.isEmpty();
-
+        // update the recovery flag.
+        recovered |= pair.getRight();
         if (!shouldContinue) {
           perfInfo.registerAggregates(startAggregate, countAggregate);
         }
       }
     } while (shouldContinue);
+    return recovered;
   }
 
   public void delete(final Path path, final boolean recursive,
@@ -1932,7 +1954,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
    * @param result response to process.
    * @return the quote-unwrapped etag.
    */
-  private static String extractEtagHeader(AbfsHttpOperation result) {
+  public static String extractEtagHeader(AbfsHttpOperation result) {
     String etag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
     if (etag != null) {
       // strip out any wrapper "" quotes which come back, for consistency with
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java
new file mode 100644
index 0000000..efba924
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem;
+
+/**
+ * Extension of StoreOperationsThroughFileSystem with ABFS awareness.
+ * Purely for use by jobs committing work through the manifest committer.
+ * The {@link AzureManifestCommitterFactory} will configure
+ * this as the binding to the FS.
+ *
+ * ADLS Gen2 stores support etag-recovery on renames, but not WASB
+ * stores.
+ */
+@InterfaceAudience.LimitedPrivate("mapreduce")
+@InterfaceStability.Unstable
+public class AbfsManifestStoreOperations extends
+    ManifestStoreOperationsThroughFileSystem {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsManifestStoreOperations.class);
+
+  /**
+   * Classname, which can be declared in jpb configurations.
+   */
+  public static final String NAME = AbfsManifestStoreOperations.class.getName();
+
+  /**
+   * Resilient rename calls; only available on an ADLS Gen2 store.
+   * Will be null after binding if the FS isn't compatible.
+   */
+  private ResilientCommitByRename resilientCommitByRename;
+
+  @Override
+  public AzureBlobFileSystem getFileSystem() {
+    return (AzureBlobFileSystem) super.getFileSystem();
+  }
+
+  /**
+   * Bind to the store.
+   *
+   * @param filesystem FS.
+   * @param path path to work under
+   * @throws IOException binding problems.
+   */
+  @Override
+  public void bindToFileSystem(FileSystem filesystem, Path path) throws IOException {
+    if (!(filesystem instanceof AzureBlobFileSystem)) {
+      throw new PathIOException(path.toString(),
+          "Not an abfs filesystem: " + filesystem.getClass());
+    }
+    super.bindToFileSystem(filesystem, path);
+    try {
+      resilientCommitByRename = getFileSystem().createResilientCommitSupport(path);
+      LOG.debug("Bonded to filesystem with resilient commits under path {}", path);
+    } catch (UnsupportedOperationException e) {
+      LOG.debug("No resilient commit support under path {}", path);
+    }
+  }
+
+  @Override
+  public boolean storePreservesEtagsThroughRenames(final Path path) {
+    return true;
+  }
+
+  /**
+   * Resilient commits available on hierarchical stores.
+   * @return true if the FS can use etags on renames.
+   */
+  @Override
+  public boolean storeSupportsResilientCommit() {
+    return resilientCommitByRename != null;
+  }
+
+  /**
+   * Commit a file through an internal ABFS operation.
+   * If resilient commit is unavailable, invokes the superclass, which
+   * will raise an UnsupportedOperationException
+   * @param entry entry to commit
+   * @return the outcome
+   * @throws IOException any failure in resilient commit.
+   * @throws UnsupportedOperationException if not available.
+   */
+  @Override
+  public CommitFileResult commitFile(final FileEntry entry) throws IOException {
+
+    if (resilientCommitByRename != null) {
+      final Pair<Boolean, Duration> result =
+          resilientCommitByRename.commitSingleFileByRename(
+              entry.getSourcePath(),
+              entry.getDestPath(),
+              entry.getEtag());
+      return CommitFileResult.fromResilientCommit(result.getLeft(),
+          result.getRight());
+    } else {
+      return super.commitFile(entry);
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java
new file mode 100644
index 0000000..b760fa7
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
+
+/**
+ * A Committer for the manifest committer which performs all bindings needed
+ * to work best with abfs.
+ * This includes, at a minimum, switching to the abfs-specific manifest store operations.
+ *
+ * This classname is referenced in configurations, so MUST NOT change.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class AzureManifestCommitterFactory extends ManifestCommitterFactory {
+
+  /**
+   * Classname, which can be declared in job configurations.
+   */
+  public static final String NAME = ManifestCommitterFactory.class.getName();
+
+  @Override
+  public ManifestCommitter createOutputCommitter(final Path outputPath,
+      final TaskAttemptContext context) throws IOException {
+    final Configuration conf = context.getConfiguration();
+    // use ABFS Store operations
+    conf.set(OPT_STORE_OPERATIONS_CLASS,
+        AbfsManifestStoreOperations.NAME);
+    return super.createOutputCommitter(outputPath, context);
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java
new file mode 100644
index 0000000..2e91392
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.time.Duration;
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+/**
+ * API exclusively for committing files.
+ *
+ * This is only for use by (@link {@link AbfsManifestStoreOperations},
+ * and is intended to be implemented by ABFS.
+ * To ensure that there is no need to add mapreduce JARs to the
+ * classpath just to work with ABFS, this interface
+ * MUST NOT refer to anything in the
+ * {@code org.apache.hadoop.mapreduce} package.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ResilientCommitByRename extends IOStatisticsSource {
+
+  /**
+   * Rename source file to dest path *Exactly*; no subdirectory games here.
+   * if the method does not raise an exception,then
+   * the data at dest is the data which was at source.
+   *
+   * Requirements
+   *
+   * <pre>
+   *   exists(FS, source) else raise FileNotFoundException
+   *   source != dest else raise PathIOException
+   *   not exists(FS, dest)
+   *   isDir(FS, dest.getParent)
+   * </pre>
+   * <ol>
+   *   <li>source != dest else raise PathIOException</li>
+   *   <li>source must exist else raise FileNotFoundException</li>
+   *   <li>source must exist and be a file</li>
+   *   <li>dest must not exist; </li>
+   *   <li>dest.getParent() must be a dir</li>
+   *   <li>if sourceEtag is non-empty, it MAY be used to qualify/validate the rename.</li>
+   * </ol>
+   *
+   * The outcome of the operation is undefined if source is not a file, dest exists,
+   * dest.getParent() doesn't exist/is a file.
+   * That is: implementations SHOULD assume that the code calling this method has
+   * set up the destination directory tree and is only invoking this call on a file.
+   * Accordingly: <i>implementations MAY skip validation checks</i>
+   *
+   * Post Conditions on a successful operation:
+   * <pre>
+   * FS' where:
+   *     not exists(FS', source)
+   *     and exists(FS', dest)
+   *     and data(FS', dest) == data (FS, source)
+   * </pre>
+   * This is exactly the same outcome as `FileSystem.rename()` when the same preconditions
+   * are met. This API call simply restricts the operation to file rename with strict
+   * conditions, (no need to be 'clever' about dest path calculation) and the ability
+   * to pass in etags, modtimes and file status values.
+   *
+   * @param source path to source file
+   * @param dest destination of rename.
+   * @param sourceEtag etag of source file. may be null or empty
+   * @return true if recovery was needed.
+   * @throws FileNotFoundException source file not found
+   * @throws PathIOException failure, including source and dest being the same path
+   * @throws IOException any other exception
+   */
+  Pair<Boolean, Duration> commitSingleFileByRename(
+      Path source,
+      Path dest,
+      @Nullable String sourceEtag) throws IOException;
+
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java
new file mode 100644
index 0000000..3567377
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Support for manifest committer.
+ * Unless otherwise stated: classes are private.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 12beb5a..9d3b2d5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -220,6 +220,9 @@ public final class ConfigurationKeys {
   /** Key for enabling the tracking of ABFS API latency and sending the latency numbers to the ABFS API service */
   public static final String FS_AZURE_ABFS_LATENCY_TRACK = "fs.azure.abfs.latency.track";
 
+  /** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */
+  public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit";
+
   public static String accountProperty(String property, String account) {
     return property + "." + account;
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index f58c61e..63d62a3 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -133,5 +133,10 @@ public final class FileSystemConfigurations {
   public static final String DATA_BLOCKS_BUFFER_DEFAULT =
       DATA_BLOCKS_BUFFER_DISK;
 
+  /**
+   * IO rate limit. Value: {@value}
+   */
+  public static final int RATE_LIMIT_DEFAULT = 10_000;
+
   private FileSystemConfigurations() {}
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 08142a1..b701037 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFact
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
@@ -67,6 +68,8 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
+import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
@@ -478,8 +481,30 @@ public class AbfsClient implements Closeable {
     return op;
   }
 
-  public AbfsRestOperation renamePath(String source, final String destination,
-      final String continuation, TracingContext tracingContext)
+
+  /**
+   * Rename a file or directory.
+   * If a source etag is passed in, the operation will attempt to recover
+   * from a missing source file by probing the destination for
+   * existence and comparing etags.
+   * The second value in the result will be true to indicate that this
+   * took place.
+   * As rename recovery is only attempted if the source etag is non-empty,
+   * in normal rename operations rename recovery will never happen.
+   * @param source path to source file
+   * @param destination destination of rename.
+   * @param continuation continuation.
+   * @param tracingContext trace context
+   * @param sourceEtag etag of source file. may be null or empty
+   * @return pair of (the rename operation, flag indicating recovery took place)
+   * @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
+   */
+  public Pair<AbfsRestOperation, Boolean> renamePath(
+      final String source,
+      final String destination,
+      final String continuation,
+      final TracingContext tracingContext,
+      final String sourceEtag)
       throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
 
@@ -505,9 +530,73 @@ public class AbfsClient implements Closeable {
             HTTP_METHOD_PUT,
             url,
             requestHeaders);
-    // no attempt at recovery using timestamps as it was not reliable.
-    op.execute(tracingContext);
-    return op;
+    try {
+      op.execute(tracingContext);
+      return Pair.of(op, false);
+    } catch (AzureBlobFileSystemException e) {
+        // If we have no HTTP response, throw the original exception.
+        if (!op.hasResult()) {
+          throw e;
+        }
+        boolean etagCheckSucceeded = renameIdempotencyCheckOp(
+            source,
+            sourceEtag, op, destination, tracingContext);
+        if (!etagCheckSucceeded) {
+          // idempotency did not return different result
+          // throw back the exception
+          throw e;
+        }
+      return Pair.of(op, true);
+    }
+  }
+
+  /**
+   * Check if the rename request failure is post a retry and if earlier rename
+   * request might have succeeded at back-end.
+   *
+   * If a source etag was passed in, and the error was 404, get the
+   * etag of any file at the destination.
+   * If it matches the source etag, then the rename is considered
+   * a success.
+   * Exceptions raised in the probe of the destination are swallowed,
+   * so that they do not interfere with the original rename failures.
+   * @param source source path
+   * @param op Rename request REST operation response with non-null HTTP response
+   * @param destination rename destination path
+   * @param sourceEtag etag of source file. may be null or empty
+   * @param tracingContext Tracks identifiers for request header
+   * @return true if the file was successfully copied
+   */
+  public boolean renameIdempotencyCheckOp(
+      final String source,
+      final String sourceEtag,
+      final AbfsRestOperation op,
+      final String destination,
+      TracingContext tracingContext) {
+    Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response");
+
+    if ((op.isARetriedRequest())
+        && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)
+        && isNotEmpty(sourceEtag)) {
+
+      // Server has returned HTTP 404, which means rename source no longer
+      // exists. Check on destination status and if its etag matches
+      // that of the source, consider it to be a success.
+      LOG.debug("rename {} to {} failed, checking etag of destination",
+          source, destination);
+
+      try {
+        final AbfsRestOperation destStatusOp = getPathStatus(destination,
+            false, tracingContext);
+        final AbfsHttpOperation result = destStatusOp.getResult();
+
+        return result.getStatusCode() == HttpURLConnection.HTTP_OK
+            && sourceEtag.equals(extractEtagHeader(result));
+      } catch (AzureBlobFileSystemException ignored) {
+        // GetFileStatus on the destination failed, the rename did not take place
+      }
+    }
+    return false;
   }
 
   public AbfsRestOperation append(final String path, final byte[] buffer,
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java
index bb1ec9e..e3adc59 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java
@@ -48,9 +48,15 @@ public class SimpleKeyProvider implements KeyProvider {
       // Validating the key.
       validateStorageAccountKey(key);
     } catch (IllegalAccessException | InvalidConfigurationValueException e) {
-      throw new KeyProviderException("Failure to initialize configuration", e);
+      LOG.debug("Failure to retrieve storage account key for {}", accountName,
+          e);
+      throw new KeyProviderException("Failure to initialize configuration for "
+          + accountName
+          + " key =\"" + key + "\""
+          + ": " + e, e);
     } catch(IOException ioe) {
-      LOG.warn("Unable to get key from credential providers. {}", ioe);
+      LOG.warn("Unable to get key for {} from credential providers. {}",
+          accountName, ioe, ioe);
     }
 
     return key;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index 56d5538..4a55075 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -271,11 +271,12 @@ public abstract class AbstractAbfsIntegrationTest extends
     // The SAS tests do not have permission to create a filesystem
     // so first create temporary instance of the filesystem using SharedKey
     // then re-use the filesystem it creates with SAS auth instead of SharedKey.
-    AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
-    ContractTestUtils.assertPathExists(tempFs, "This path should exist",
-        new Path("/"));
-    abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
-    usingFilesystemForSASTests = true;
+    try (AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig)){
+      ContractTestUtils.assertPathExists(tempFs, "This path should exist",
+          new Path("/"));
+      abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
+      usingFilesystemForSASTests = true;
+    }
   }
 
   public AzureBlobFileSystem getFileSystem() throws IOException {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
index b0e8244..5bd6eaf 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
@@ -32,7 +32,10 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
@@ -76,13 +79,19 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
     new Random().nextBytes(b);
 
     Path testPath = path(TEST_PATH);
-    try (FSDataOutputStream stream = fs.create(testPath)) {
+    FSDataOutputStream stream = fs.create(testPath);
+    try {
       stream.write(b);
+    } finally{
+    stream.close();
     }
+    IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
 
     final byte[] readBuffer = new byte[2 * bufferSize];
     int result;
+    IOStatisticsSource statisticsSource = null;
     try (FSDataInputStream inputStream = fs.open(testPath)) {
+      statisticsSource = inputStream;
       ((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
           new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
               fs.getFileSystemId(), FSOperationType.READ, true, 0,
@@ -100,6 +109,8 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
       inputStream.seek(0);
       result = inputStream.read(readBuffer, 0, bufferSize);
     }
+    IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
+
     assertNotEquals("data read in final read()", -1, result);
     assertArrayEquals(readBuffer, b);
   }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
index ea9fba6..965e02a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
@@ -401,7 +401,8 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
     fs.create(new Path(src)).close();
     AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient()
         .renamePath(src, "/testABC" + "/abc.txt", null,
-            getTestTracingContext(fs, false));
+            getTestTracingContext(fs, false), null)
+        .getLeft();
     AbfsHttpOperation result = abfsHttpRestOperation.getResult();
     String url = result.getMaskedUrl();
     String encodedUrl = result.getMaskedEncodedUrl();
@@ -418,7 +419,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
     intercept(IOException.class, "sig=XXXX",
         () -> getFileSystem().getAbfsClient()
             .renamePath("testABC/test.xt", "testABC/abc.txt", null,
-                getTestTracingContext(getFileSystem(), false)));
+                getTestTracingContext(getFileSystem(), false), null));
   }
 
   @Test
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
index 2198a6a..0226031 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
@@ -526,7 +526,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
     AbfsClient abfsClient = fs.getAbfsClient();
     AbfsRestOperation abfsRestOperation = abfsClient
         .renamePath(testFileName, newName, null,
-            getTestTracingContext(fs, false));
+            getTestTracingContext(fs, false), null)
+        .getLeft();
     assertCPKHeaders(abfsRestOperation, false);
     assertNoCPKResponseHeadersPresent(abfsRestOperation);
 
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java
new file mode 100644
index 0000000..8160cdc
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
+
+/**
+ * Helper methods for committer tests on ABFS.
+ */
+final class AbfsCommitTestHelper {
+  private AbfsCommitTestHelper() {
+  }
+
+  /**
+   * Prepare the test configuration.
+   * @param contractTestBinding test binding
+   * @return an extracted and patched configuration.
+   */
+  static Configuration prepareTestConfiguration(
+      ABFSContractTestBinding contractTestBinding) {
+    final Configuration conf =
+        contractTestBinding.getRawConfiguration();
+
+    // use ABFS Store operations
+    conf.set(OPT_STORE_OPERATIONS_CLASS,
+        AbfsManifestStoreOperations.NAME);
+
+    return conf;
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java
new file mode 100644
index 0000000..5575205
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.util.DurationInfo;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled;
+import static org.apache.hadoop.io.IOUtils.closeStream;
+
+/**
+ * Tests which create a yarn minicluster.
+ * These are all considered scale tests; the probe for
+ * scale tests being enabled is executed before the cluster
+ * is set up to avoid wasting time on non-scale runs.
+ */
+public abstract class AbstractAbfsClusterITest extends
+    AbstractManifestCommitterTest {
+
+  public static final int NO_OF_NODEMANAGERS = 2;
+
+  private final ABFSContractTestBinding binding;
+
+
+  /**
+   * The static cluster binding with the lifecycle of this test; served
+   * through instance-level methods for sharing across methods in the
+   * suite.
+   */
+  @SuppressWarnings("StaticNonFinalField")
+  private static ClusterBinding clusterBinding;
+
+  protected AbstractAbfsClusterITest() throws Exception {
+    binding = new ABFSContractTestBinding();
+  }
+
+  @Override
+  protected int getTestTimeoutMillis() {
+    return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    binding.setup();
+    super.setup();
+    requireScaleTestsEnabled();
+    if (getClusterBinding() == null) {
+      clusterBinding = demandCreateClusterBinding();
+    }
+    assertNotNull("cluster is not bound", getClusterBinding());
+  }
+
+  @AfterClass
+  public static void teardownClusters() throws IOException {
+    terminateCluster(clusterBinding);
+    clusterBinding = null;
+  }
+
+  @Override
+  protected AbstractFSContract createContract(final Configuration conf) {
+    return new AbfsFileSystemContract(conf, binding.isSecureMode());
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+  }
+
+  /**
+   * This is the cluster binding which every subclass must create.
+   */
+  protected static final class ClusterBinding {
+
+    private String clusterName;
+
+    private final MiniMRYarnCluster yarn;
+
+    public ClusterBinding(
+        final String clusterName,
+        final MiniMRYarnCluster yarn) {
+      this.clusterName = clusterName;
+      this.yarn = requireNonNull(yarn);
+    }
+
+
+    /**
+     * Get the cluster FS, which will either be HDFS or the local FS.
+     * @return a filesystem.
+     * @throws IOException failure
+     */
+    public FileSystem getClusterFS() throws IOException {
+      return FileSystem.getLocal(yarn.getConfig());
+    }
+
+    public MiniMRYarnCluster getYarn() {
+      return yarn;
+    }
+
+    public Configuration getConf() {
+      return getYarn().getConfig();
+    }
+
+    public String getClusterName() {
+      return clusterName;
+    }
+
+    public void terminate() {
+      closeStream(getYarn());
+    }
+  }
+
+  /**
+   * Create the cluster binding.
+   * The configuration will be patched by propagating down options
+   * from the maven build (S3Guard binding etc) and turning off unwanted
+   * YARN features.
+   *
+   * If an HDFS cluster is requested,
+   * the HDFS and YARN clusters will share the same configuration, so
+   * the HDFS cluster binding is implicitly propagated to YARN.
+   * If one is not requested, the local filesystem is used as the cluster FS.
+   * @param conf configuration to start with.
+   * @return the cluster binding.
+   * @throws IOException failure.
+   */
+  protected static ClusterBinding createCluster(
+      final JobConf conf) throws IOException {
+    try (DurationInfo d = new DurationInfo(LOG, "Creating YARN MiniCluster")) {
+      conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
+      // create a unique cluster name based on the current time in millis.
+      String timestamp = LocalDateTime.now().format(
+          DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss.SS"));
+      String clusterName = "yarn-" + timestamp;
+      MiniMRYarnCluster yarnCluster =
+          new MiniMRYarnCluster(clusterName, NO_OF_NODEMANAGERS);
+      yarnCluster.init(conf);
+      yarnCluster.start();
+      return new ClusterBinding(clusterName, yarnCluster);
+    }
+  }
+
+  /**
+   * Terminate the cluster if it is not null.
+   * @param cluster the cluster
+   */
+  protected static void terminateCluster(ClusterBinding cluster) {
+    if (cluster != null) {
+      cluster.terminate();
+    }
+  }
+
+  /**
+   * Get the cluster binding for this subclass.
+   * @return the cluster binding
+   */
+  protected ClusterBinding getClusterBinding() {
+    return clusterBinding;
+  }
+
+  protected MiniMRYarnCluster getYarn() {
+    return getClusterBinding().getYarn();
+  }
+
+
+  /**
+   * We stage work into a temporary directory rather than directly under
+   * the user's home directory, as that is often rejected by CI test
+   * runners.
+   */
+  @Rule
+  public final TemporaryFolder stagingFilesDir = new TemporaryFolder();
+
+
+  /**
+   * binding on demand rather than in a BeforeClass static method.
+   * Subclasses can override this to change the binding options.
+   * @return the cluster binding
+   */
+  protected ClusterBinding demandCreateClusterBinding() throws Exception {
+    return createCluster(new JobConf());
+  }
+
+  /**
+   * Create a job configuration.
+   * This creates a new job conf from the yarn
+   * cluster configuration then calls
+   * {@link #applyCustomConfigOptions(JobConf)} to allow it to be customized.
+   * @return the new job configuration.
+   * @throws IOException failure
+   */
+  protected JobConf newJobConf() throws IOException {
+    JobConf jobConf = new JobConf(getYarn().getConfig());
+    jobConf.addResource(getConfiguration());
+    applyCustomConfigOptions(jobConf);
+    return jobConf;
+  }
+
+  /**
+   * Patch the (job) configuration for this committer.
+   * @param jobConf configuration to patch
+   * @return a configuration which will run this configuration.
+   */
+  protected Configuration patchConfigurationForCommitter(
+      final Configuration jobConf) {
+    enableManifestCommitter(jobConf);
+    return jobConf;
+  }
+
+  /**
+   * Override point to let implementations tune the MR Job conf.
+   * @param jobConf configuration
+   */
+  protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
+
+  }
+
+
+  /**
+   * Assume that scale tests are enabled.
+   */
+  protected void requireScaleTestsEnabled() {
+    assumeScaleTestsEnabled(getConfiguration());
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCleanupStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCleanupStage.java
new file mode 100644
index 0000000..a597c35
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCleanupStage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCleanupStage;
+
+/**
+ * Cleanup logic on ABFS.
+ */
+public class ITestAbfsCleanupStage extends TestCleanupStage {
+
+  private final ABFSContractTestBinding binding;
+
+  public ITestAbfsCleanupStage() throws Exception {
+    binding = new ABFSContractTestBinding();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    binding.setup();
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+  }
+
+  @Override
+  protected AbstractFSContract createContract(final Configuration conf) {
+    return new AbfsFileSystemContract(conf, binding.isSecureMode());
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCommitTaskStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCommitTaskStage.java
new file mode 100644
index 0000000..a0aaec8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCommitTaskStage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCommitTaskStage;
+
+/**
+ * ABFS storage test of task committer.
+ */
+public class ITestAbfsCommitTaskStage extends TestCommitTaskStage {
+
+  private final ABFSContractTestBinding binding;
+
+  public ITestAbfsCommitTaskStage() throws Exception {
+    binding = new ABFSContractTestBinding();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    binding.setup();
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+  }
+
+  @Override
+  protected AbstractFSContract createContract(final Configuration conf) {
+    return new AbfsFileSystemContract(conf, binding.isSecureMode());
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCreateOutputDirectoriesStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCreateOutputDirectoriesStage.java
new file mode 100644
index 0000000..6621b80
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCreateOutputDirectoriesStage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCreateOutputDirectoriesStage;
+
+/**
+ * ABFS storage test of directory creation.
+ */
+public class ITestAbfsCreateOutputDirectoriesStage extends TestCreateOutputDirectoriesStage {
+
+  private final ABFSContractTestBinding binding;
+
+  public ITestAbfsCreateOutputDirectoriesStage() throws Exception {
+    binding = new ABFSContractTestBinding();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    binding.setup();
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+  }
+
+  @Override
+  protected AbstractFSContract createContract(final Configuration conf) {
+    return new AbfsFileSystemContract(conf, binding.isSecureMode());
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsJobThroughManifestCommitter.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsJobThroughManifestCommitter.java
new file mode 100644
index 0000000..4e4c4f5
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsJobThroughManifestCommitter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.FixMethodOrder;
+import org.junit.runners.MethodSorters;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestJobThroughManifestCommitter;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
+
+import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
+
+/**
+ * Test the Manifest committer stages against ABFS.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class ITestAbfsJobThroughManifestCommitter
+    extends TestJobThroughManifestCommitter {
+
+  private final ABFSContractTestBinding binding;
+
+  public ITestAbfsJobThroughManifestCommitter() throws Exception {
+    binding = new ABFSContractTestBinding();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    binding.setup();
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    return enableManifestCommitter(prepareTestConfiguration(binding));
+  }
+
+  @Override
+  protected AbstractFSContract createContract(final Configuration conf) {
+    return new AbfsFileSystemContract(conf, binding.isSecureMode());
+  }
+
+  @Override
+  protected boolean shouldDeleteTestRootAtEndOfTestRun() {
+    return true;
+  }
+
+  /**
+   * Add read of manifest and validate of output's etags.
+   * @param attemptId attempt ID
+   * @param files files which were created.
+   * @param manifest manifest
+   * @throws IOException failure
+   */
+  @Override
+  protected void validateTaskAttemptManifest(String attemptId,
+      List<Path> files,
+      TaskManifest manifest) throws IOException {
+    super.validateTaskAttemptManifest(attemptId, files, manifest);
+    final List<FileEntry> commit = manifest.getFilesToCommit();
+    final ManifestStoreOperations operations = getStoreOperations();
+    for (FileEntry entry : commit) {
+      Assertions.assertThat(entry.getEtag())
+          .describedAs("Etag of %s", entry)
+          .isNotEmpty();
+      final FileStatus sourceStatus = operations.getFileStatus(entry.getSourcePath());
+      final String etag = ManifestCommitterSupport.getEtag(sourceStatus);
+      Assertions.assertThat(etag)
+          .describedAs("Etag of %s", sourceStatus)
+          .isEqualTo(entry.getEtag());
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsLoadManifestsStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsLoadManifestsStage.java
new file mode 100644
index 0000000..acd693e3
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsLoadManifestsStage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestLoadManifestsStage;
+
+/**
+ * ABFS storage test of saving and loading a large number
+ * of manifests.
+ */
+public class ITestAbfsLoadManifestsStage extends TestLoadManifestsStage {
+
+  private final ABFSContractTestBinding binding;
+
+  public ITestAbfsLoadManifestsStage() throws Exception {
+    binding = new ABFSContractTestBinding();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    binding.setup();
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+  }
+
+  @Override
+  protected AbstractFSContract createContract(final Configuration conf) {
+    return new AbfsFileSystemContract(conf, binding.isSecureMode());
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestCommitProtocol.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestCommitProtocol.java
new file mode 100644
index 0000000..aac06f9
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestCommitProtocol.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestManifestCommitProtocol;
+
+import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
+
+/**
+ * Test the Manifest protocol against ABFS.
+ */
+public class ITestAbfsManifestCommitProtocol extends
+    TestManifestCommitProtocol {
+
+  private final ABFSContractTestBinding binding;
+
+  public ITestAbfsManifestCommitProtocol() throws Exception {
+    binding = new ABFSContractTestBinding();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    binding.setup();
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    return enableManifestCommitter(prepareTestConfiguration(binding));
+  }
+
+  @Override
+  protected AbstractFSContract createContract(final Configuration conf) {
+    return new AbfsFileSystemContract(conf, binding.isSecureMode());
+  }
+
+
+  @Override
+  protected String suitename() {
+    return "ITestAbfsManifestCommitProtocol";
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java
new file mode 100644
index 0000000..922782d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.nio.charset.StandardCharsets;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
+
+import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
+import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Test {@link AbfsManifestStoreOperations}.
+ * As this looks at etag handling through FS operations, it's actually testing how etags work
+ * in ABFS (preservation across renames) and in the client (are they consistent
+ * in LIST and HEAD calls).
+ *
+ * Skipped when tested against wasb-compatible stores.
+ */
+public class ITestAbfsManifestStoreOperations extends AbstractManifestCommitterTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestAbfsManifestStoreOperations.class);
+
+  private final ABFSContractTestBinding binding;
+
+  public ITestAbfsManifestStoreOperations() throws Exception {
+    binding = new ABFSContractTestBinding();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    binding.setup();
+    super.setup();
+
+    // skip tests on non-HNS stores
+    assumeTrue("Resilient rename not available",
+        getFileSystem().hasPathCapability(getContract().getTestPath(),
+            ETAGS_PRESERVED_IN_RENAME));
+
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    return enableManifestCommitter(prepareTestConfiguration(binding));
+  }
+
+  @Override
+  protected AbstractFSContract createContract(final Configuration conf) {
+    return new AbfsFileSystemContract(conf, binding.isSecureMode());
+  }
+
+  /**
+   * basic consistency across operations, as well as being non-empty.
+   */
+  @Test
+  public void testEtagConsistencyAcrossListAndHead() throws Throwable {
+    describe("Etag values must be non-empty and consistent across LIST and HEAD Calls.");
+    final Path path = methodPath();
+    final FileSystem fs = getFileSystem();
+    ContractTestUtils.touch(fs, path);
+    final ManifestStoreOperations operations = createManifestStoreOperations();
+    Assertions.assertThat(operations)
+        .describedAs("Store operations class loaded via Configuration")
+        .isInstanceOf(AbfsManifestStoreOperations.class);
+
+    final FileStatus st = operations.getFileStatus(path);
+    final String etag = operations.getEtag(st);
+    Assertions.assertThat(etag)
+        .describedAs("Etag of %s", st)
+        .isNotBlank();
+    LOG.info("etag of empty file is \"{}\"", etag);
+
+    final FileStatus[] statuses = fs.listStatus(path);
+    Assertions.assertThat(statuses)
+        .describedAs("List(%s)", path)
+        .hasSize(1);
+    final FileStatus lsStatus = statuses[0];
+    Assertions.assertThat(operations.getEtag(lsStatus))
+        .describedAs("etag of list status (%s) compared to HEAD value of %s", lsStatus, st)
+        .isEqualTo(etag);
+  }
+
+  @Test
+  public void testEtagsOfDifferentDataDifferent() throws Throwable {
+    describe("Verify that two different blocks of data written have different tags");
+
+    final Path path = methodPath();
+    final FileSystem fs = getFileSystem();
+    Path src = new Path(path, "src");
+
+    ContractTestUtils.createFile(fs, src, true,
+        "data1234".getBytes(StandardCharsets.UTF_8));
+    final ManifestStoreOperations operations = createManifestStoreOperations();
+    final FileStatus srcStatus = operations.getFileStatus(src);
+    final String srcTag = operations.getEtag(srcStatus);
+    LOG.info("etag of file 1 is \"{}\"", srcTag);
+
+    // now overwrite with data of same length
+    // (ensure that path or length aren't used exclusively as tag)
+    ContractTestUtils.createFile(fs, src, true,
+        "1234data".getBytes(StandardCharsets.UTF_8));
+
+    // validate
+    final String tag2 = operations.getEtag(operations.getFileStatus(src));
+    LOG.info("etag of file 2 is \"{}\"", tag2);
+
+    Assertions.assertThat(tag2)
+        .describedAs("etag of updated file")
+        .isNotEqualTo(srcTag);
+  }
+
+  @Test
+  public void testEtagConsistencyAcrossRename() throws Throwable {
+    describe("Verify that when a file is renamed, the etag remains unchanged");
+    final Path path = methodPath();
+    final FileSystem fs = getFileSystem();
+    Path src = new Path(path, "src");
+    Path dest = new Path(path, "dest");
+
+    ContractTestUtils.createFile(fs, src, true,
+        "sample data".getBytes(StandardCharsets.UTF_8));
+    final ManifestStoreOperations operations = createManifestStoreOperations();
+    final FileStatus srcStatus = operations.getFileStatus(src);
+    final String srcTag = operations.getEtag(srcStatus);
+    LOG.info("etag of short file is \"{}\"", srcTag);
+
+    Assertions.assertThat(srcTag)
+        .describedAs("Etag of %s", srcStatus)
+        .isNotBlank();
+
+    // rename
+    operations.commitFile(new FileEntry(src, dest, 0, srcTag));
+
+    // validate
+    FileStatus destStatus = operations.getFileStatus(dest);
+    final String destTag = operations.getEtag(destStatus);
+    Assertions.assertThat(destTag)
+        .describedAs("etag of list status (%s) compared to HEAD value of %s", destStatus, srcStatus)
+        .isEqualTo(srcTag);
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsRenameStageFailure.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsRenameStageFailure.java
new file mode 100644
index 0000000..5547d08
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsRenameStageFailure.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestRenameStageFailure;
+
+/**
+ * Rename failure logic on ABFS.
+ * This will go through the resilient rename operation.
+ */
+public class ITestAbfsRenameStageFailure extends TestRenameStageFailure {
+
+  /**
+   * How many files to create.
+   */
+  private static final int FILES_TO_CREATE = 20;
+
+  private final ABFSContractTestBinding binding;
+
+  public ITestAbfsRenameStageFailure() throws Exception {
+    binding = new ABFSContractTestBinding();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    binding.setup();
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+  }
+
+  @Override
+  protected AbstractFSContract createContract(final Configuration conf) {
+    return new AbfsFileSystemContract(conf, binding.isSecureMode());
+  }
+
+  @Override
+  protected boolean requireRenameResilience() {
+    return true;
+  }
+
+  @Override
+  protected int filesToCreate() {
+    return FILES_TO_CREATE;
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTaskManifestFileIO.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTaskManifestFileIO.java
new file mode 100644
index 0000000..d2fe9de
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTaskManifestFileIO.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestTaskManifestFileIO;
+
+/**
+ * Test Reading/writing manifest file through ABFS.
+ */
+public class ITestAbfsTaskManifestFileIO extends TestTaskManifestFileIO {
+
+  private final ABFSContractTestBinding binding;
+
+  public ITestAbfsTaskManifestFileIO() throws Exception {
+    binding = new ABFSContractTestBinding();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    binding.setup();
+    super.setup();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+  }
+
+  @Override
+  protected AbstractFSContract createContract(final Configuration conf) {
+    return new AbfsFileSystemContract(conf, binding.isSecureMode());
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java
new file mode 100644
index 0000000..4b21b83
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import org.junit.Assume;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.examples.terasort.TeraGen;
+import org.apache.hadoop.examples.terasort.TeraSort;
+import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
+import org.apache.hadoop.examples.terasort.TeraValidate;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.functional.RemoteIterators;
+
+import static java.util.Optional.empty;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.loadSuccessFile;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.validateSuccessFile;
+
+/**
+ * Runs Terasort against ABFS using the manifest committer.
+ * The tests run in sequence, so each operation is isolated.
+ * Scale test only (it is big and slow)
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@SuppressWarnings({"StaticNonFinalField", "OptionalUsedAsFieldOrParameterType"})
+public class ITestAbfsTerasort extends AbstractAbfsClusterITest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestAbfsTerasort.class);
+
+  public static final int EXPECTED_PARTITION_COUNT = 10;
+
+  public static final int PARTITION_SAMPLE_SIZE = 1000;
+
+  public static final int ROW_COUNT = 1000;
+
+  /**
+   * This has to be common across all test methods.
+   */
+  private static final Path TERASORT_PATH = new Path("/ITestAbfsTerasort");
+
+  /**
+   * Duration tracker created in the first of the test cases and closed
+   * in {@link #test_140_teracomplete()}.
+   */
+  private static Optional<DurationInfo> terasortDuration = empty();
+
+  /**
+   * Tracker of which stages are completed and how long they took.
+   */
+  private static final Map<String, DurationInfo> COMPLETED_STAGES = new HashMap<>();
+
+  /**
+   * FileSystem statistics are collected from the _SUCCESS markers.
+   */
+  protected static final IOStatisticsSnapshot JOB_IOSTATS =
+      snapshotIOStatistics();
+
+  /** Base path for all the terasort input and output paths. */
+  private Path terasortPath;
+
+  /** Input (teragen) path. */
+  private Path sortInput;
+
+  /** Path where sorted data goes. */
+  private Path sortOutput;
+
+  /** Path for validated job's output. */
+  private Path sortValidate;
+
+  public ITestAbfsTerasort() throws Exception {
+  }
+
+
+  @Override
+  public void setup() throws Exception {
+    // superclass calls requireScaleTestsEnabled();
+    super.setup();
+    prepareToTerasort();
+  }
+
+  /**
+   * Set up the job conf with the options for terasort chosen by the scale
+   * options.
+   * @param conf configuration
+   */
+  @Override
+  protected void applyCustomConfigOptions(JobConf conf) {
+    // small sample size for faster runs
+    conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(),
+        getSampleSizeForEachPartition());
+    conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
+        getExpectedPartitionCount());
+    conf.setBoolean(
+        TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
+        false);
+  }
+
+  private int getExpectedPartitionCount() {
+    return EXPECTED_PARTITION_COUNT;
+  }
+
+  private int getSampleSizeForEachPartition() {
+    return PARTITION_SAMPLE_SIZE;
+  }
+
+  protected int getRowCount() {
+    return ROW_COUNT;
+  }
+
+  /**
+   * Set up the terasort by initializing paths variables
+   * The paths used must be unique across parameterized runs but
+   * common across all test cases in a single parameterized run.
+   */
+  private void prepareToTerasort() {
+    terasortPath = getFileSystem().makeQualified(TERASORT_PATH);
+    sortInput = new Path(terasortPath, "sortin");
+    sortOutput = new Path(terasortPath, "sortout");
+    sortValidate = new Path(terasortPath, "validate");
+  }
+
+  /**
+   * Declare that a stage has completed.
+   * @param stage stage name/key in the map
+   * @param d duration.
+   */
+  private static void completedStage(final String stage,
+      final DurationInfo d) {
+    COMPLETED_STAGES.put(stage, d);
+  }
+
+  /**
+   * Declare a stage which is required for this test case.
+   * @param stage stage name
+   */
+  private static void requireStage(final String stage) {
+    Assume.assumeTrue(
+        "Required stage was not completed: " + stage,
+        COMPLETED_STAGES.get(stage) != null);
+  }
+
+  /**
+   * Execute a single stage in the terasort.
+   * Updates the completed stages map with the stage duration -if successful.
+   * @param stage Stage name for the stages map.
+   * @param jobConf job conf
+   * @param dest destination directory -the _SUCCESS file will be expected here.
+   * @param tool tool to run.
+   * @param args args for the tool.
+   * @param minimumFileCount minimum number of files to have been created
+   * @throws Exception any failure
+   */
+  private void executeStage(
+      final String stage,
+      final JobConf jobConf,
+      final Path dest,
+      final Tool tool,
+      final String[] args,
+      final int minimumFileCount) throws Exception {
+    int result;
+
+    // the duration info is created outside a try-with-resources
+    // clause as it is used later.
+    DurationInfo d = new DurationInfo(LOG, stage);
+    try {
+      result = ToolRunner.run(jobConf, tool, args);
+    } finally {
+      d.close();
+    }
+    dumpOutputTree(dest);
+    assertEquals(stage
+        + "(" + StringUtils.join(", ", args) + ")"
+        + " failed", 0, result);
+    final ManifestSuccessData successFile = validateSuccessFile(getFileSystem(), dest,
+        minimumFileCount, "");
+    JOB_IOSTATS.aggregate(successFile.getIOStatistics());
+
+    completedStage(stage, d);
+  }
+
+  /**
+   * Set up terasort by cleaning out the destination, and note the initial
+   * time before any of the jobs are executed.
+   *
+   * This is executed first <i>for each parameterized run</i>.
+   * It is where all variables which need to be reset for each run need
+   * to be reset.
+   */
+  @Test
+  public void test_100_terasort_setup() throws Throwable {
+    describe("Setting up for a terasort");
+
+    getFileSystem().delete(terasortPath, true);
+    terasortDuration = Optional.of(new DurationInfo(LOG, false, "Terasort"));
+  }
+
+  @Test
+  public void test_110_teragen() throws Throwable {
+    describe("Teragen to %s", sortInput);
+    getFileSystem().delete(sortInput, true);
+
+    JobConf jobConf = newJobConf();
+    patchConfigurationForCommitter(jobConf);
+    executeStage("teragen",
+        jobConf,
+        sortInput,
+        new TeraGen(),
+        new String[]{Integer.toString(getRowCount()), sortInput.toString()},
+        1);
+  }
+
+
+  @Test
+  public void test_120_terasort() throws Throwable {
+    describe("Terasort from %s to %s", sortInput, sortOutput);
+    requireStage("teragen");
+    getFileSystem().delete(sortOutput, true);
+
+    loadSuccessFile(getFileSystem(), sortInput);
+    JobConf jobConf = newJobConf();
+    patchConfigurationForCommitter(jobConf);
+    executeStage("terasort",
+        jobConf,
+        sortOutput,
+        new TeraSort(),
+        new String[]{sortInput.toString(), sortOutput.toString()},
+        1);
+  }
+
+  @Test
+  public void test_130_teravalidate() throws Throwable {
+    describe("TeraValidate from %s to %s", sortOutput, sortValidate);
+    requireStage("terasort");
+    getFileSystem().delete(sortValidate, true);
+    loadSuccessFile(getFileSystem(), sortOutput);
+    JobConf jobConf = newJobConf();
+    patchConfigurationForCommitter(jobConf);
+    executeStage("teravalidate",
+        jobConf,
+        sortValidate,
+        new TeraValidate(),
+        new String[]{sortOutput.toString(), sortValidate.toString()},
+        1);
+  }
+
+  /**
+   * Print the results, and save to the base dir as a CSV file.
+   * Why there? Makes it easy to list and compare.
+   */
+  @Test
+  public void test_140_teracomplete() throws Throwable {
+    terasortDuration.ifPresent(d -> {
+      d.close();
+      completedStage("overall", d);
+    });
+
+    // IO Statistics
+    IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, JOB_IOSTATS);
+
+    // and the summary
+    final StringBuilder results = new StringBuilder();
+    results.append("\"Operation\"\t\"Duration\"\n");
+
+    // this is how you dynamically create a function in a method
+    // for use afterwards.
+    // Works because there's no IOEs being raised in this sequence.
+    Consumer<String> stage = (s) -> {
+      DurationInfo duration = COMPLETED_STAGES.get(s);
+      results.append(String.format("\"%s\"\t\"%s\"\n",
+          s,
+          duration == null ? "" : duration));
+    };
+
+    stage.accept("teragen");
+    stage.accept("terasort");
+    stage.accept("teravalidate");
+    stage.accept("overall");
+    String text = results.toString();
+    File resultsFile = File.createTempFile("results", ".csv");
+    FileUtils.write(resultsFile, text, StandardCharsets.UTF_8);
+    LOG.info("Results are in {}\n{}", resultsFile, text);
+  }
+
+  /**
+   * Reset the duration so if two committer tests are run sequentially.
+   * Without this the total execution time is reported as from the start of
+   * the first test suite to the end of the second.
+   */
+  @Test
+  public void test_150_teracleanup() throws Throwable {
+    terasortDuration = Optional.empty();
+  }
+
+  @Test
+  public void test_200_directory_deletion() throws Throwable {
+    getFileSystem().delete(terasortPath, true);
+  }
+
+  /**
+   * Dump the files under a path -but not fail if the path is not present.,
+   * @param path path to dump
+   * @throws Exception any failure.
+   */
+  protected void dumpOutputTree(Path path) throws Exception {
+    LOG.info("Files under output directory {}", path);
+    try {
+      RemoteIterators.foreach(getFileSystem().listFiles(path, true),
+          (status) -> LOG.info("{}", status));
+    } catch (FileNotFoundException e) {
+      LOG.info("Output directory {} not found", path);
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java
new file mode 100644
index 0000000..3d49d62
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Unit and integration tests for the manifest committer.
+ * JSON job reports will be saved to
+ * {@code target/reports}
+ */
+package org.apache.hadoop.fs.azurebfs.commit;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java
index 62bcca1..1319ea4 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java
@@ -34,7 +34,7 @@ public class AbfsFileSystemContract extends AbstractBondedFSContract {
   public static final String CONTRACT_XML = "abfs.xml";
   private final boolean isSecure;
 
-  protected AbfsFileSystemContract(final Configuration conf, boolean secure) {
+  public AbfsFileSystemContract(final Configuration conf, boolean secure) {
     super(conf);
     //insert the base features
     addConfResource(CONTRACT_XML);
diff --git a/hadoop-tools/hadoop-azure/src/test/resources/core-site.xml b/hadoop-tools/hadoop-azure/src/test/resources/core-site.xml
new file mode 100644
index 0000000..7d2d11c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/resources/core-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<configuration>
+  <include xmlns="http://www.w3.org/2001/XInclude" href="azure-test.xml">
+    <fallback/>
+  </include>
+</configuration>

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org