You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/10/04 17:55:01 UTC

[hadoop] branch trunk updated: HADOOP-16570. S3A committers encounter scale issues.

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6574f27  HADOOP-16570. S3A committers encounter scale issues.
6574f27 is described below

commit 6574f27fa348542411bff888b184cd7ce34e5d9e
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Fri Oct 4 18:53:53 2019 +0100

    HADOOP-16570. S3A committers encounter scale issues.
    
    Contributed by Steve Loughran.
    
    This addresses two scale issues which has surfaced in large scale benchmarks
    of the S3A Committers.
    
    * Thread pools are not cleaned up.
      This now happens, with tests.
    
    * OOM on job commit for jobs with many thousands of tasks,
      each generating tens of (very large) files.
    
    Instead of loading all pending commits into memory as a single list, the list
    of files to load is the sole list which is passed around; .pendingset files are
    loaded and processed in isolation -and reloaded if necessary for any
    abort/rollback operation.
    
    The parallel commit/abort/revert operations now work at the .pendingset level,
    rather than that of individual pending commit files. The existing parallelized
    Tasks API is still used to commit those files, but with a null thread pool, so
    as to serialize the operations.
    
    Change-Id: I5c8240cd31800eaa83d112358770ca0eb2bca797
---
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |   6 +
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  11 +-
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   |   3 +
 .../hadoop/fs/s3a/commit/AbstractS3ACommitter.java | 466 ++++++++++++++++++---
 .../fs/s3a/commit/AbstractS3ACommitterFactory.java |   2 +-
 .../hadoop/fs/s3a/commit/CommitConstants.java      |   6 +
 .../hadoop/fs/s3a/commit/files/SuccessData.java    |   6 +
 .../fs/s3a/commit/magic/MagicS3GuardCommitter.java |   8 +-
 .../commit/staging/DirectoryStagingCommitter.java  |  17 +-
 .../staging/PartitionedStagingCommitter.java       | 101 ++++-
 .../fs/s3a/commit/staging/StagingCommitter.java    |  43 +-
 .../org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java |  18 +-
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  42 +-
 .../fs/s3a/commit/AbstractITCommitProtocol.java    |  74 +++-
 .../org/apache/hadoop/fs/s3a/commit/TestTasks.java |   2 +-
 .../commit/integration/ITestS3ACommitterMRJob.java |   6 +-
 .../fs/s3a/commit/staging/StagingTestBase.java     |  66 ++-
 .../staging/TestDirectoryCommitterScale.java       | 314 ++++++++++++++
 .../s3a/commit/staging/TestStagingCommitter.java   |  29 +-
 .../TestStagingDirectoryOutputCommitter.java       |  10 +-
 .../staging/TestStagingPartitionedJobCommit.java   |  43 +-
 .../staging/TestStagingPartitionedTaskCommit.java  |  46 +-
 22 files changed, 1123 insertions(+), 196 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 014a494..fdbdf37 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -837,4 +837,10 @@ public final class Constants {
   public static final String AWS_SERVICE_IDENTIFIER_S3 = "S3";
   public static final String AWS_SERVICE_IDENTIFIER_DDB = "DDB";
   public static final String AWS_SERVICE_IDENTIFIER_STS = "STS";
+
+  /**
+   * How long to wait for the thread pool to terminate when cleaning up.
+   * Value: {@value} seconds.
+   */
+  public static final int THREAD_POOL_SHUTDOWN_DELAY_SECONDS = 30;
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 9431884..26f16a7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -154,6 +154,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
 import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys;
 import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -3062,6 +3063,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         transfers.shutdownNow(true);
         transfers = null;
       }
+      HadoopExecutors.shutdown(boundedThreadPool, LOG,
+          THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+      boundedThreadPool = null;
+      HadoopExecutors.shutdown(unboundedThreadPool, LOG,
+          THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+      unboundedThreadPool = null;
       S3AUtils.closeAll(LOG, metadataStore, instrumentation);
       metadataStore = null;
       instrumentation = null;
@@ -4064,7 +4071,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    */
   @Retries.OnceRaw
   void abortMultipartUpload(String destKey, String uploadId) {
-    LOG.info("Aborting multipart upload {} to {}", uploadId, destKey);
+    LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
     getAmazonS3Client().abortMultipartUpload(
         new AbortMultipartUploadRequest(getBucket(),
             destKey,
@@ -4084,7 +4091,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     uploadId = upload.getUploadId();
     if (LOG.isInfoEnabled()) {
       DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-      LOG.info("Aborting multipart upload {} to {} initiated by {} on {}",
+      LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}",
           uploadId, destKey, upload.getInitiator(),
           df.format(upload.getInitiated()));
     }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index e9ed972..15f7390 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -611,11 +611,14 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
 
   public void close() {
     synchronized (metricsSystemLock) {
+      // it is critical to close each quantile, as they start a scheduled
+      // task in a shared thread pool.
       putLatencyQuantile.stop();
       throttleRateQuantile.stop();
       metricsSystem.unregisterSource(metricsSourceName);
       int activeSources = --metricsSourceActiveCounter;
       if (activeSources == 0) {
+        LOG.debug("Shutting down metrics publisher");
         metricsSystem.publishMetricsNow();
         metricsSystem.shutdown();
         metricsSystem = null;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
index a49ab52..e82fbda 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
@@ -18,18 +18,18 @@
 
 package org.apache.hadoop.fs.s3a.commit;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import com.amazonaws.services.s3.model.MultipartUpload;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +49,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
+import static org.apache.hadoop.fs.s3a.Constants.THREAD_POOL_SHUTDOWN_DELAY_SECONDS;
 import static org.apache.hadoop.fs.s3a.Invoker.ignoreIOExceptions;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@@ -66,11 +68,28 @@ import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
  * to handle the creation of a committer when the destination is unknown.
  *
  * Requiring an output directory simplifies coding and testing.
+ *
+ * The original implementation loaded all .pendingset files
+ * before attempting any commit/abort operations.
+ * While straightforward and guaranteeing that no changes were made to the
+ * destination until all files had successfully been loaded -it didn't scale;
+ * the list grew until it exceeded heap size.
+ *
+ * The second iteration builds up an {@link ActiveCommit} class with the
+ * list of .pendingset files to load and then commit; that can be done
+ * incrementally and in parallel.
+ * As a side effect of this change, unless/until changed,
+ * the commit/abort/revert of all files uploaded by a single task will be
+ * serialized. This may slow down these operations if there are many files
+ * created by a few tasks, <i>and</i> the HTTP connection pool in the S3A
+ * committer was large enough for more all the parallel POST requests.
  */
 public abstract class AbstractS3ACommitter extends PathOutputCommitter {
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractS3ACommitter.class);
 
+  public static final String THREAD_PREFIX = "s3a-committer-pool-";
+
   /**
    * Thread pool for task execution.
    */
@@ -349,16 +368,11 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
    * @throws IOException IO failure
    */
   protected void maybeCreateSuccessMarkerFromCommits(JobContext context,
-      List<SinglePendingCommit> pending) throws IOException {
+      ActiveCommit pending) throws IOException {
     List<String> filenames = new ArrayList<>(pending.size());
-    for (SinglePendingCommit commit : pending) {
-      String key = commit.getDestinationKey();
-      if (!key.startsWith("/")) {
-        // fix up so that FS.makeQualified() sets up the path OK
-        key = "/" + key;
-      }
-      filenames.add(key);
-    }
+    // The list of committed objects in pending is size limited in
+    // ActiveCommit.uploadCommitted.
+    filenames.addAll(pending.committedObjects);
     maybeCreateSuccessMarker(context, filenames);
   }
 
@@ -390,22 +404,25 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
   }
 
   /**
-   * Base job setup deletes the success marker.
-   * TODO: Do we need this?
+   * Base job setup (optionally) deletes the success marker and
+   * always creates the destination directory.
+   * When objects are committed that dest dir marker will inevitably
+   * be deleted; creating it now ensures there is something at the end
+   * while the job is in progress -and if nothing is created, that
+   * it is still there.
    * @param context context
    * @throws IOException IO failure
    */
-/*
 
   @Override
   public void setupJob(JobContext context) throws IOException {
-    if (createJobMarker) {
-      try (DurationInfo d = new DurationInfo("Deleting _SUCCESS marker")) {
+    try (DurationInfo d = new DurationInfo(LOG, "preparing destination")) {
+      if (createJobMarker){
         commitOperations.deleteSuccessMarker(getOutputPath());
       }
+      getDestFS().mkdirs(getOutputPath());
     }
   }
-*/
 
   @Override
   public void setupTask(TaskAttemptContext context) throws IOException {
@@ -430,28 +447,152 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
   }
 
   /**
-   * Commit a list of pending uploads.
+   * Commit all the pending uploads.
+   * Each file listed in the ActiveCommit instance is queued for processing
+   * in a separate thread; its contents are loaded and then (sequentially)
+   * committed.
+   * On a failure or abort of a single file's commit, all its uploads are
+   * aborted.
+   * The revert operation lists the files already committed and deletes them.
    * @param context job context
-   * @param pending list of pending uploads
+   * @param pending  pending uploads
    * @throws IOException on any failure
    */
-  protected void commitPendingUploads(JobContext context,
-      List<SinglePendingCommit> pending) throws IOException {
+  protected void commitPendingUploads(
+      final JobContext context,
+      final ActiveCommit pending) throws IOException {
     if (pending.isEmpty()) {
       LOG.warn("{}: No pending uploads to commit", getRole());
     }
-    LOG.debug("{}: committing the output of {} task(s)",
-        getRole(), pending.size());
-    try(CommitOperations.CommitContext commitContext
+    try (DurationInfo ignored = new DurationInfo(LOG,
+        "committing the output of %s task(s)", pending.size());
+        CommitOperations.CommitContext commitContext
             = initiateCommitOperation()) {
-      Tasks.foreach(pending)
+
+      Tasks.foreach(pending.getSourceFiles())
           .stopOnFailure()
+          .suppressExceptions(false)
           .executeWith(buildThreadPool(context))
+          .abortWith(path ->
+              loadAndAbort(commitContext, pending, path, true, false))
+          .revertWith(path ->
+              loadAndRevert(commitContext, pending, path))
+          .run(path ->
+              loadAndCommit(commitContext, pending, path));
+    }
+  }
+
+  /**
+   * Run a precommit check that all files are loadable.
+   * This check avoids the situation where the inability to read
+   * a file only surfaces partway through the job commit, so
+   * results in the destination being tainted.
+   * @param context job context
+   * @param pending the pending operations
+   * @throws IOException any failure
+   */
+  protected void precommitCheckPendingFiles(
+      final JobContext context,
+      final ActiveCommit pending) throws IOException {
+
+    FileSystem sourceFS = pending.getSourceFS();
+    try (DurationInfo ignored =
+             new DurationInfo(LOG, "Preflight Load of pending files")) {
+
+      Tasks.foreach(pending.getSourceFiles())
+          .stopOnFailure()
+          .suppressExceptions(false)
+          .executeWith(buildThreadPool(context))
+          .run(path -> PendingSet.load(sourceFS, path));
+    }
+  }
+
+  /**
+   * Load a pendingset file and commit all of its contents.
+   * @param commitContext context to commit through
+   * @param activeCommit commit state
+   * @param path path to load
+   * @throws IOException failure
+   */
+  private void loadAndCommit(
+      final CommitOperations.CommitContext commitContext,
+      final ActiveCommit activeCommit,
+      final Path path) throws IOException {
+
+    try (DurationInfo ignored =
+             new DurationInfo(LOG, false, "Committing %s", path)) {
+      PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path);
+      Tasks.foreach(pendingSet.getCommits())
+          .stopOnFailure()
+          .suppressExceptions(false)
+          .executeWith(singleCommitThreadPool())
           .onFailure((commit, exception) ->
               commitContext.abortSingleCommit(commit))
           .abortWith(commitContext::abortSingleCommit)
           .revertWith(commitContext::revertCommit)
-          .run(commitContext::commitOrFail);
+          .run(commit -> {
+            commitContext.commitOrFail(commit);
+            activeCommit.uploadCommitted(
+                commit.getDestinationKey(), commit.getLength());
+          });
+    }
+  }
+
+  /**
+   * Load a pendingset file and revert all of its contents.
+   * @param commitContext context to commit through
+   * @param activeCommit commit state
+   * @param path path to load
+   * @throws IOException failure
+   */
+  private void loadAndRevert(
+      final CommitOperations.CommitContext commitContext,
+      final ActiveCommit activeCommit,
+      final Path path) throws IOException {
+
+    try (DurationInfo ignored =
+             new DurationInfo(LOG, false, "Committing %s", path)) {
+      PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path);
+      Tasks.foreach(pendingSet.getCommits())
+          .suppressExceptions(true)
+          .run(commitContext::revertCommit);
+    }
+  }
+
+  /**
+   * Load a pendingset file and abort all of its contents.
+   * @param commitContext context to commit through
+   * @param activeCommit commit state
+   * @param path path to load
+   * @param deleteRemoteFiles should remote files be deleted?
+   * @throws IOException failure
+   */
+  private void loadAndAbort(
+      final CommitOperations.CommitContext commitContext,
+      final ActiveCommit activeCommit,
+      final Path path,
+      final boolean suppressExceptions,
+      final boolean deleteRemoteFiles) throws IOException {
+
+    try (DurationInfo ignored =
+             new DurationInfo(LOG, false, "Aborting %s", path)) {
+      PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(),
+          path);
+      FileSystem fs = getDestFS();
+      Tasks.foreach(pendingSet.getCommits())
+          .executeWith(singleCommitThreadPool())
+          .suppressExceptions(suppressExceptions)
+          .run(commit -> {
+            try {
+              commitContext.abortSingleCommit(commit);
+            } catch (FileNotFoundException e) {
+              // Commit ID was not known; file may exist.
+              // delete it if instructed to do so.
+              if (deleteRemoteFiles) {
+                fs.delete(commit.destinationPath(), false);
+              }
+            }
+          });
     }
   }
 
@@ -466,43 +607,14 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
   }
 
   /**
-   * Try to read every pendingset file and build a list of them/
-   * In the case of a failure to read the file, exceptions are held until all
-   * reads have been attempted.
-   * @param context job context
-   * @param suppressExceptions whether to suppress exceptions.
-   * @param fs job attempt fs
-   * @param pendingCommitFiles list of files found in the listing scan
-   * @return the list of commits
-   * @throws IOException on a failure when suppressExceptions is false.
-   */
-  protected List<SinglePendingCommit> loadPendingsetFiles(
-      JobContext context,
-      boolean suppressExceptions,
-      FileSystem fs,
-      Iterable<? extends FileStatus> pendingCommitFiles) throws IOException {
-
-    final List<SinglePendingCommit> pending = Collections.synchronizedList(
-        Lists.newArrayList());
-    Tasks.foreach(pendingCommitFiles)
-        .suppressExceptions(suppressExceptions)
-        .executeWith(buildThreadPool(context))
-        .run(pendingCommitFile ->
-          pending.addAll(
-              PendingSet.load(fs, pendingCommitFile.getPath()).getCommits())
-      );
-    return pending;
-  }
-
-  /**
    * Internal Job commit operation: where the S3 requests are made
    * (potentially in parallel).
    * @param context job context
-   * @param pending pending request
+   * @param pending pending commits
    * @throws IOException any failure
    */
   protected void commitJobInternal(JobContext context,
-      List<SinglePendingCommit> pending)
+      ActiveCommit pending)
       throws IOException {
 
     commitPendingUploads(context, pending);
@@ -523,6 +635,9 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
    * This must clean up operations; it is called when a commit fails, as
    * well as in an {@link #abortJob(JobContext, JobStatus.State)} call.
    * The base implementation calls {@link #cleanup(JobContext, boolean)}
+   * so cleans up the filesystems and destroys the thread pool.
+   * Subclasses must always invoke this superclass method after their
+   * own operations.
    * @param context job context
    * @param suppressExceptions should exceptions be suppressed?
    * @throws IOException any IO problem raised when suppressExceptions is false.
@@ -536,13 +651,15 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
   /**
    * Abort all pending uploads to the destination directory during
    * job cleanup operations.
+   * Note: this instantiates the thread pool if required -so
+   * {@link #destroyThreadPool()} must be called after this.
    * @param suppressExceptions should exceptions be suppressed
    * @throws IOException IO problem
    */
   protected void abortPendingUploadsInCleanup(
       boolean suppressExceptions) throws IOException {
     Path dest = getOutputPath();
-    try (DurationInfo d =
+    try (DurationInfo ignored =
              new DurationInfo(LOG, "Aborting all pending commits under %s",
                  dest);
          CommitOperations.CommitContext commitContext
@@ -565,13 +682,18 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
   }
 
   /**
-   * Subclass-specific pre commit actions.
+   * Subclass-specific pre-Job-commit actions.
+   * The staging committers all load the pending files to verify that
+   * they can be loaded.
+   * The Magic committer does not, because of the overhead of reading files
+   * from S3 makes it too expensive.
    * @param context job context
    * @param pending the pending operations
    * @throws IOException any failure
    */
-  protected void preCommitJob(JobContext context,
-      List<SinglePendingCommit> pending) throws IOException {
+  @VisibleForTesting
+  public void preCommitJob(JobContext context,
+      ActiveCommit pending) throws IOException {
   }
 
   /**
@@ -584,7 +706,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
    * <p>
    * Commit internal: do the final commit sequence.
    * <p>
-   * The final commit action is to build the {@code __SUCCESS} file entry.
+   * The final commit action is to build the {@code _SUCCESS} file entry.
    * </p>
    * @param context job context
    * @throws IOException any failure
@@ -594,7 +716,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
     String id = jobIdString(context);
     try (DurationInfo d = new DurationInfo(LOG,
         "%s: commitJob(%s)", getRole(), id)) {
-      List<SinglePendingCommit> pending
+      ActiveCommit pending
           = listPendingUploadsToCommit(context);
       preCommitJob(context, pending);
       commitJobInternal(context, pending);
@@ -629,12 +751,13 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
    * @return a list of pending uploads.
    * @throws IOException Any IO failure
    */
-  protected abstract List<SinglePendingCommit> listPendingUploadsToCommit(
+  protected abstract ActiveCommit listPendingUploadsToCommit(
       JobContext context)
       throws IOException;
 
   /**
-   * Cleanup the job context, including aborting anything pending.
+   * Cleanup the job context, including aborting anything pending
+   * and destroying the thread pool.
    * @param context job context
    * @param suppressExceptions should exceptions be suppressed?
    * @throws IOException any failure if exceptions were not suppressed.
@@ -645,6 +768,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
         "Cleanup job %s", jobIdString(context))) {
       abortPendingUploadsInCleanup(suppressExceptions);
     } finally {
+      destroyThreadPool();
       cleanupStagingDirs();
     }
   }
@@ -715,7 +839,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
 
   /**
    * Returns an {@link ExecutorService} for parallel tasks. The number of
-   * threads in the thread-pool is set by s3.multipart.committer.num-threads.
+   * threads in the thread-pool is set by fs.s3a.committer.threads.
    * If num-threads is 0, this will return null;
    *
    * @param context the JobContext for this commit
@@ -730,10 +854,10 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
           DEFAULT_COMMITTER_THREADS);
       LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads);
       if (numThreads > 0) {
-        threadPool = Executors.newFixedThreadPool(numThreads,
+        threadPool = HadoopExecutors.newFixedThreadPool(numThreads,
             new ThreadFactoryBuilder()
                 .setDaemon(true)
-                .setNameFormat("s3-committer-pool-%d")
+                .setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d")
                 .build());
       } else {
         return null;
@@ -743,6 +867,40 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
   }
 
   /**
+   * Destroy any thread pools; wait for that to finish,
+   * but don't overreact if it doesn't finish in time.
+   */
+  protected synchronized void destroyThreadPool() {
+    if (threadPool != null) {
+      LOG.debug("Destroying thread pool");
+      HadoopExecutors.shutdown(threadPool, LOG,
+          THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+      threadPool = null;
+    }
+  }
+
+  /**
+   * Get the thread pool for executing the single file commit/revert
+   * within the commit of all uploads of a single task.
+   * This is currently null; it is here to allow the Tasks class to
+   * provide the logic for execute/revert.
+   * Why not use the existing thread pool? Too much fear of deadlocking,
+   * and tasks are being committed in parallel anyway.
+   * @return null. always.
+   */
+  protected final synchronized ExecutorService singleCommitThreadPool() {
+    return null;
+  }
+
+  /**
+   * Does this committer have a thread pool?
+   * @return true if a thread pool exists.
+   */
+  public synchronized boolean hasThreadPool() {
+    return threadPool != null;
+  }
+
+  /**
    * Delete the task attempt path without raising any errors.
    * @param context task context
    */
@@ -755,6 +913,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
 
   /**
    * Abort all pending uploads in the list.
+   * This operation is used by the magic committer as part of its
+   * rollback after a failure during task commit.
    * @param context job context
    * @param pending pending uploads
    * @param suppressExceptions should exceptions be suppressed
@@ -779,4 +939,172 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
     }
   }
 
+  /**
+   * Abort all pending uploads in the list.
+   * @param context job context
+   * @param pending pending uploads
+   * @param suppressExceptions should exceptions be suppressed?
+   * @param deleteRemoteFiles should remote files be deleted?
+   * @throws IOException any exception raised
+   */
+  protected void abortPendingUploads(
+      final JobContext context,
+      final ActiveCommit pending,
+      final boolean suppressExceptions,
+      final boolean deleteRemoteFiles) throws IOException {
+
+    if (pending.isEmpty()) {
+      LOG.info("{}: no pending commits to abort", getRole());
+    } else {
+      try (DurationInfo d = new DurationInfo(LOG,
+          "Aborting %s uploads", pending.size());
+           CommitOperations.CommitContext commitContext
+               = initiateCommitOperation()) {
+        Tasks.foreach(pending.getSourceFiles())
+            .executeWith(buildThreadPool(context))
+            .suppressExceptions(suppressExceptions)
+            .run(path ->
+                loadAndAbort(commitContext,
+                    pending,
+                    path,
+                    suppressExceptions,
+                    deleteRemoteFiles));
+      }
+    }
+  }
+
+  /**
+   * State of the active commit operation.
+   *
+   * It contains a list of all pendingset files to load as the source
+   * of outstanding commits to complete/abort,
+   * and tracks the files uploaded.
+   *
+   * To avoid running out of heap by loading all the source files
+   * simultaneously:
+   * <ol>
+   *   <li>
+   *     The list of files to load is passed round but
+   *     the contents are only loaded on demand.
+   *   </li>
+   *   <li>
+   *     The number of written files tracked for logging in
+   *     the _SUCCESS file are limited to a small amount -enough
+   *     for testing only.
+   *   </li>
+   * </ol>
+   */
+  public static class ActiveCommit {
+
+    private static final AbstractS3ACommitter.ActiveCommit EMPTY
+        = new ActiveCommit(null, new ArrayList<>());
+
+    /** All pendingset files to iterate through. */
+    private final List<Path> sourceFiles;
+
+    /**
+     * Filesystem for the source files.
+     */
+    private final FileSystem sourceFS;
+
+    /**
+     * List of committed objects; only built up until the commit limit is
+     * reached.
+     */
+    private final List<String> committedObjects = new ArrayList<>();
+
+    /**
+     * The total number of committed objects.
+     */
+    private int committedObjectCount;
+
+    /**
+     * Total number of bytes committed.
+     */
+    private long committedBytes;
+
+    /**
+     * Construct from a source FS and list of files.
+     * @param sourceFS filesystem containing the list of pending files
+     * @param sourceFiles .pendingset files to load and commit.
+     */
+    public ActiveCommit(
+        final FileSystem sourceFS,
+        final List<Path> sourceFiles) {
+      this.sourceFiles = sourceFiles;
+      this.sourceFS = sourceFS;
+    }
+
+    /**
+     * Create an active commit of the given pending files.
+     * @param pendingFS source filesystem.
+     * @param statuses list of file status or subclass to use.
+     * @return the commit
+     */
+    public static ActiveCommit fromStatusList(
+        final FileSystem pendingFS,
+        final List<? extends FileStatus> statuses) {
+      return new ActiveCommit(pendingFS,
+          statuses.stream()
+              .map(FileStatus::getPath)
+              .collect(Collectors.toList()));
+    }
+
+    /**
+     * Get the empty entry.
+     * @return an active commit with no pending files.
+     */
+    public static ActiveCommit empty() {
+      return EMPTY;
+    }
+
+    public List<Path> getSourceFiles() {
+      return sourceFiles;
+    }
+
+    public FileSystem getSourceFS() {
+      return sourceFS;
+    }
+
+    /**
+     * Note that a file was committed.
+     * Increase the counter of files and total size.
+     * If there is room in the committedFiles list, the file
+     * will be added to the list and so end up in the _SUCCESS file.
+     * @param key key of the committed object.
+     * @param size size in bytes.
+     */
+    public synchronized void uploadCommitted(String key, long size) {
+      if (committedObjects.size() < SUCCESS_MARKER_FILE_LIMIT) {
+        committedObjects.add(
+            key.startsWith("/") ? key : ("/" + key));
+      }
+      committedObjectCount++;
+      committedBytes += size;
+    }
+
+    public synchronized List<String> getCommittedObjects() {
+      return committedObjects;
+    }
+
+    public synchronized int getCommittedFileCount() {
+      return committedObjectCount;
+    }
+
+    public synchronized long getCommittedBytes() {
+      return committedBytes;
+    }
+
+    public int size() {
+      return sourceFiles.size();
+    }
+
+    public boolean isEmpty() {
+      return sourceFiles.isEmpty();
+    }
+
+    public void add(Path path) {
+      sourceFiles.add(path);
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java
index b3bcca1..6e7a99f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java
@@ -51,7 +51,7 @@ public abstract class AbstractS3ACommitterFactory
       throw new PathCommitException(outputPath,
           "Filesystem not supported by this committer");
     }
-    LOG.info("Using Commmitter {} for {}",
+    LOG.info("Using Committer {} for {}",
         outputCommitter,
         outputPath);
     return outputCommitter;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
index c9b0337..3e28a5d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
@@ -255,4 +255,10 @@ public final class CommitConstants {
   public static final String FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS =
       "fs.s3a.committer.staging.abort.pending.uploads";
 
+  /**
+   * The limit to the number of committed objects tracked during
+   * job commits and saved to the _SUCCESS file.
+   */
+  public static final int SUCCESS_MARKER_FILE_LIMIT = 100;
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
index cf84cb3..e0273fa 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
@@ -52,6 +52,12 @@ import org.apache.hadoop.util.JsonSerialization;
  * Applications reading this data should use/check the {@link #name} field
  * to differentiate from any other JSON-based manifest and to identify
  * changes in the output format.
+ *
+ * Note: to deal with scale issues, the S3A committers do not include any
+ * more than the number of objects listed in
+ * {@link org.apache.hadoop.fs.s3a.commit.CommitConstants#SUCCESS_MARKER_FILE_LIMIT}.
+ * This is intended to suffice for basic integration tests.
+ * Larger tests should examine the generated files themselves.
  */
 @SuppressWarnings("unused")
 @InterfaceAudience.Private
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
index 969286e..9912173 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
@@ -109,11 +109,11 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
    * @return a list of pending commits.
    * @throws IOException Any IO failure
    */
-  protected List<SinglePendingCommit> listPendingUploadsToCommit(
+  protected ActiveCommit listPendingUploadsToCommit(
       JobContext context)
       throws IOException {
     FileSystem fs = getDestFS();
-    return loadPendingsetFiles(context, false, fs,
+    return ActiveCommit.fromStatusList(fs,
         listAndFilter(fs, getJobAttemptPath(context), false,
             CommitOperations.PENDINGSET_FILTER));
   }
@@ -174,6 +174,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
     } finally {
       // delete the task attempt so there's no possibility of a second attempt
       deleteTaskAttemptPathQuietly(context);
+      destroyThreadPool();
     }
     getCommitOperations().taskCompleted(true);
   }
@@ -181,7 +182,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
   /**
    * Inner routine for committing a task.
    * The list of pending commits is loaded and then saved to the job attempt
-   * dir.
+   * dir in a single pendingset file.
    * Failure to load any file or save the final file triggers an abort of
    * all known pending commits.
    * @param context context
@@ -250,6 +251,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
       deleteQuietly(
           attemptPath.getFileSystem(context.getConfiguration()),
           attemptPath, true);
+      destroyThreadPool();
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
index 32642c9..1a5a63c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.s3a.commit.staging;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,7 +30,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathExistsException;
 import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
-import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -66,7 +64,6 @@ public class DirectoryStagingCommitter extends StagingCommitter {
 
   @Override
   public void setupJob(JobContext context) throws IOException {
-    super.setupJob(context);
     Path outputPath = getOutputPath();
     FileSystem fs = getDestFS();
     ConflictResolution conflictResolution = getConflictResolutionMode(
@@ -91,10 +88,10 @@ public class DirectoryStagingCommitter extends StagingCommitter {
       }
     } catch (FileNotFoundException ignored) {
       // there is no destination path, hence, no conflict.
-      // make the parent directory, which also triggers a recursive directory
-      // creation operation
-      fs.mkdirs(outputPath);
     }
+    // make the parent directory, which also triggers a recursive directory
+    // creation operation
+    super.setupJob(context);
   }
 
   /**
@@ -106,8 +103,12 @@ public class DirectoryStagingCommitter extends StagingCommitter {
    * @throws IOException any failure
    */
   @Override
-  protected void preCommitJob(JobContext context,
-      List<SinglePendingCommit> pending) throws IOException {
+  public void preCommitJob(
+      final JobContext context,
+      final ActiveCommit pending) throws IOException {
+
+    // see if the files can be loaded.
+    super.preCommitJob(context, pending);
     Path outputPath = getOutputPath();
     FileSystem fs = getDestFS();
     Configuration fsConf = fs.getConf();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
index b51bcb5..20aca3c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
@@ -20,10 +20,11 @@ package org.apache.hadoop.fs.s3a.commit.staging;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 
-import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,11 +33,14 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.fs.s3a.commit.Tasks;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.DurationInfo;
 
-import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.COMMITTER_NAME_PARTITIONED;
 
 /**
  * Partitioned committer.
@@ -52,6 +56,9 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
  *   <li>REPLACE: delete the destination partition in the job commit
  *   (i.e. after and only if all tasks have succeeded.</li>
  * </ul>
+ * To determine the paths, the precommit process actually has to read
+ * in all source files, independently of the final commit phase.
+ * This is inefficient, though some parallelization here helps.
  */
 public class PartitionedStagingCommitter extends StagingCommitter {
 
@@ -107,6 +114,7 @@ public class PartitionedStagingCommitter extends StagingCommitter {
   }
 
   /**
+   * All
    * Job-side conflict resolution.
    * The partition path conflict resolution actions are:
    * <ol>
@@ -119,13 +127,15 @@ public class PartitionedStagingCommitter extends StagingCommitter {
    * @throws IOException any failure
    */
   @Override
-  protected void preCommitJob(JobContext context,
-      List<SinglePendingCommit> pending) throws IOException {
+  public void preCommitJob(
+      final JobContext context,
+      final ActiveCommit pending) throws IOException {
 
     FileSystem fs = getDestFS();
 
     // enforce conflict resolution
     Configuration fsConf = fs.getConf();
+    boolean shouldPrecheckPendingFiles = true;
     switch (getConflictResolutionMode(context, fsConf)) {
     case FAIL:
       // FAIL checking is done on the task side, so this does nothing
@@ -134,21 +144,84 @@ public class PartitionedStagingCommitter extends StagingCommitter {
       // no check is needed because the output may exist for appending
       break;
     case REPLACE:
-      Set<Path> partitions = pending.stream()
-          .map(SinglePendingCommit::destinationPath)
-          .map(Path::getParent)
-          .collect(Collectors.toCollection(Sets::newLinkedHashSet));
-      for (Path partitionPath : partitions) {
-        LOG.debug("{}: removing partition path to be replaced: " +
-            getRole(), partitionPath);
-        fs.delete(partitionPath, true);
-      }
+      // identify and replace the destination partitions
+      replacePartitions(context, pending);
+      // and so there is no need to do another check.
+      shouldPrecheckPendingFiles = false;
       break;
     default:
       throw new PathCommitException("",
           getRole() + ": unknown conflict resolution mode: "
           + getConflictResolutionMode(context, fsConf));
     }
+    if (shouldPrecheckPendingFiles) {
+      precommitCheckPendingFiles(context, pending);
+    }
+  }
+
+  /**
+   * Identify all partitions which need to be replaced and then delete them.
+   * The original implementation relied on all the pending commits to be
+   * loaded so could simply enumerate them.
+   * This iteration does not do that; it has to reload all the files
+   * to build the set, after which it initiates the delete process.
+   * This is done in parallel.
+   * <pre>
+   *   Set<Path> partitions = pending.stream()
+   *     .map(Path::getParent)
+   *     .collect(Collectors.toCollection(Sets::newLinkedHashSet));
+   *   for (Path partitionPath : partitions) {
+   *     LOG.debug("{}: removing partition path to be replaced: " +
+   *     getRole(), partitionPath);
+   *     fs.delete(partitionPath, true);
+   *   }
+   * </pre>
+   *
+   * @param context job context
+   * @param pending the pending operations
+   * @throws IOException any failure
+   */
+  private void replacePartitions(
+      final JobContext context,
+      final ActiveCommit pending) throws IOException {
+
+    Map<Path, String> partitions = new ConcurrentHashMap<>();
+    FileSystem sourceFS = pending.getSourceFS();
+    ExecutorService pool = buildThreadPool(context);
+    try (DurationInfo ignored =
+             new DurationInfo(LOG, "Replacing partitions")) {
+
+      // the parent directories are saved to a concurrent hash map.
+      // for a marginal optimisation, the previous parent is tracked, so
+      // if a task writes many files to the same dir, the synchronized map
+      // is updated only once.
+      Tasks.foreach(pending.getSourceFiles())
+          .stopOnFailure()
+          .suppressExceptions(false)
+          .executeWith(pool)
+          .run(path -> {
+            PendingSet pendingSet = PendingSet.load(sourceFS, path);
+            Path lastParent = null;
+            for (SinglePendingCommit commit : pendingSet.getCommits()) {
+              Path parent = commit.destinationPath().getParent();
+              if (parent != null && !parent.equals(lastParent)) {
+                partitions.put(parent, "");
+                lastParent = parent;
+              }
+            }
+          });
+    }
+    // now do the deletes
+    FileSystem fs = getDestFS();
+    Tasks.foreach(partitions.keySet())
+        .stopOnFailure()
+        .suppressExceptions(false)
+        .executeWith(pool)
+        .run(partitionPath -> {
+          LOG.debug("{}: removing partition path to be replaced: " +
+              getRole(), partitionPath);
+          fs.delete(partitionPath, true);
+        });
   }
 
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 833edd4..6cc9e48 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.fs.s3a.commit.staging;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Queue;
@@ -457,6 +456,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
     context.getConfiguration().set(
         InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, uuid);
     wrappedCommitter.setupJob(context);
+    super.setupJob(context);
   }
 
   /**
@@ -466,7 +466,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
    * @throws IOException Any IO failure
    */
   @Override
-  protected List<SinglePendingCommit> listPendingUploadsToCommit(
+  protected ActiveCommit listPendingUploadsToCommit(
       JobContext context)
       throws IOException {
     return listPendingUploads(context, false);
@@ -480,7 +480,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
    * then this may not match the actual set of pending operations
    * @throws IOException shouldn't be raised, but retained for the compiler
    */
-  protected List<SinglePendingCommit> listPendingUploadsToAbort(
+  protected ActiveCommit listPendingUploadsToAbort(
       JobContext context) throws IOException {
     return listPendingUploads(context, true);
   }
@@ -493,13 +493,14 @@ public class StagingCommitter extends AbstractS3ACommitter {
    * then this may not match the actual set of pending operations
    * @throws IOException Any IO failure which wasn't swallowed.
    */
-  protected List<SinglePendingCommit> listPendingUploads(
+  protected ActiveCommit listPendingUploads(
       JobContext context, boolean suppressExceptions) throws IOException {
-    try {
-      Path wrappedJobAttemptPath = wrappedCommitter.getJobAttemptPath(context);
+    try (DurationInfo ignored = new DurationInfo(LOG,
+        "Listing pending uploads")) {
+      Path wrappedJobAttemptPath = getJobAttemptPath(context);
       final FileSystem attemptFS = wrappedJobAttemptPath.getFileSystem(
           context.getConfiguration());
-      return loadPendingsetFiles(context, suppressExceptions, attemptFS,
+      return ActiveCommit.fromStatusList(attemptFS,
           listAndFilter(attemptFS,
               wrappedJobAttemptPath, false,
               HIDDEN_FILE_FILTER));
@@ -512,7 +513,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
       maybeIgnore(suppressExceptions, "Listing pending uploads", e);
     }
     // reached iff an IOE was caught and swallowed
-    return new ArrayList<>(0);
+    return ActiveCommit.empty();
   }
 
   @Override
@@ -558,8 +559,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
     boolean failed = false;
     try (DurationInfo d = new DurationInfo(LOG,
         "%s: aborting job in state %s ", r, jobIdString(context))) {
-      List<SinglePendingCommit> pending = listPendingUploadsToAbort(context);
-      abortPendingUploads(context, pending, suppressExceptions);
+      ActiveCommit pending = listPendingUploadsToAbort(context);
+      abortPendingUploads(context, pending, suppressExceptions, true);
     } catch (FileNotFoundException e) {
       // nothing to list
       LOG.debug("No job directory to read uploads from");
@@ -571,6 +572,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
     }
   }
 
+
   /**
    * Delete the working paths of a job.
    * <ol>
@@ -646,6 +648,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
           getRole(), context.getTaskAttemptID(), e);
       getCommitOperations().taskCompleted(false);
       throw e;
+    } finally {
+      destroyThreadPool();
     }
     getCommitOperations().taskCompleted(true);
   }
@@ -694,6 +698,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
       try {
         Tasks.foreach(taskOutput)
             .stopOnFailure()
+            .suppressExceptions(false)
             .executeWith(buildThreadPool(context))
             .run(stat -> {
               Path path = stat.getPath();
@@ -779,6 +784,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
       LOG.error("{}: exception when aborting task {}",
           getRole(), context.getTaskAttemptID(), e);
       throw e;
+    } finally {
+      destroyThreadPool();
     }
   }
 
@@ -901,4 +908,20 @@ public class StagingCommitter extends AbstractS3ACommitter {
         defVal).toUpperCase(Locale.ENGLISH);
   }
 
+  /**
+   * Pre-commit actions for a job.
+   * Loads all the pending files to verify they can be loaded
+   * and parsed.
+   * @param context job context
+   * @param pending pending commits
+   * @throws IOException any failure
+   */
+  @Override
+  public void preCommitJob(
+      final JobContext context,
+      final ActiveCommit pending) throws IOException {
+
+    // see if the files can be loaded.
+    precommitCheckPendingFiles(context, pending);
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
index 6e81452..79772ec 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
@@ -19,17 +19,21 @@
 package org.apache.hadoop.fs.s3a;
 
 import java.io.IOException;
+import java.util.Set;
 
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.Path;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getCurrentThreadNames;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.listInitialThreadsForLifecycleChecks;
 import static org.apache.hadoop.test.LambdaTestUtils.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.E_FS_CLOSED;
 
 /**
- * Tests of the S3A FileSystem which is closed; just make sure
- * that that basic file Ops fail meaningfully.
+ * Tests of the S3A FileSystem which is closed.
  */
 public class ITestS3AClosedFS extends AbstractS3ATestBase {
 
@@ -47,6 +51,16 @@ public class ITestS3AClosedFS extends AbstractS3ATestBase {
     // no op, as the FS is closed
   }
 
+  private static final Set<String> THREAD_SET =
+      listInitialThreadsForLifecycleChecks();
+
+  @AfterClass
+  public static void checkForThreadLeakage() {
+    Assertions.assertThat(getCurrentThreadNames())
+        .describedAs("The threads at the end of the test run")
+        .isSubsetOf(THREAD_SET);
+  }
+
   @Test
   public void testClosedGetFileStatus() throws Exception {
     intercept(IOException.class, E_FS_CLOSED,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 1889c05..bd38b40 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -63,7 +63,10 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
@@ -598,7 +601,7 @@ public final class S3ATestUtils {
     String tmpDir = conf.get(HADOOP_TMP_DIR, "target/build/test");
     if (testUniqueForkId != null) {
       // patch temp dir for the specific branch
-      tmpDir = tmpDir + File.pathSeparatorChar + testUniqueForkId;
+      tmpDir = tmpDir + File.separator + testUniqueForkId;
       conf.set(HADOOP_TMP_DIR, tmpDir);
     }
     conf.set(BUFFER_DIR, tmpDir);
@@ -1346,4 +1349,41 @@ public final class S3ATestUtils {
         STABILIZATION_TIME, PROBE_INTERVAL_MILLIS,
         () -> fs.getFileStatus(testFilePath));
   }
+
+  /**
+   * This creates a set containing all current threads and some well-known
+   * thread names whose existence should not fail test runs.
+   * They are generally static cleaner threads created by various classes
+   * on instantiation.
+   * @return a set of threads to use in later assertions.
+   */
+  public static Set<String> listInitialThreadsForLifecycleChecks() {
+    Set<String> threadSet = getCurrentThreadNames();
+    // static filesystem statistics cleaner
+    threadSet.add(
+        "org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner");
+    // AWS progress callbacks
+    threadSet.add("java-sdk-progress-listener-callback-thread");
+    // another AWS thread
+    threadSet.add("java-sdk-http-connection-reaper");
+    // java.lang.UNIXProcess. maybe if chmod is called?
+    threadSet.add("process reaper");
+    // once a quantile has been scheduled, the mutable quantile thread pool
+    // is initialized; it has a minimum thread size of 1.
+    threadSet.add("MutableQuantiles-0");
+    // IDE?
+    threadSet.add("Attach Listener");
+    return threadSet;
+  }
+
+  /**
+   * Get a set containing the names of all active threads.
+   * @return the current set of threads.
+   */
+  public static Set<String> getCurrentThreadNames() {
+    return Thread.getAllStackTraces().keySet()
+        .stream()
+        .map(Thread::getName)
+        .collect(Collectors.toCollection(TreeSet::new));
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
index 822e361..cacd54d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -25,7 +25,10 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -179,6 +182,20 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
   }
 
   /**
+   * This only looks for leakage of committer thread pools,
+   * and not any other leaked threads, such as those from S3A FS instances.
+   */
+  @AfterClass
+  public static void checkForThreadLeakage() {
+    List<String> committerThreads = getCurrentThreadNames().stream()
+        .filter(n -> n.startsWith(AbstractS3ACommitter.THREAD_PREFIX))
+        .collect(Collectors.toList());
+    Assertions.assertThat(committerThreads)
+        .describedAs("Outstanding committer threads")
+        .isEmpty();
+  }
+
+  /**
    * Add the specified job to the current list of jobs to abort in teardown.
    * @param jobData job data.
    */
@@ -518,6 +535,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
       describe("\ncommitting job");
       committer.commitJob(jContext);
       describe("commit complete\n");
+      verifyCommitterHasNoThreads(committer);
     }
   }
 
@@ -574,7 +592,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
 
     // Commit the task. This will promote data and metadata to where
     // job commits will pick it up on commit or abort.
-    committer.commitTask(tContext);
+    commitTask(committer, tContext);
     assertTaskAttemptPathDoesNotExist(committer, tContext);
 
     Configuration conf2 = jobData.job.getConfiguration();
@@ -600,6 +618,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     committer2.abortJob(jContext2, JobStatus.State.KILLED);
     // now, state of system may still have pending data
     assertNoMultipartUploadsPending(outDir);
+    verifyCommitterHasNoThreads(committer2);
   }
 
   protected void assertTaskAttemptPathDoesNotExist(
@@ -742,7 +761,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     describe("2. Committing task");
     assertTrue("No files to commit were found by " + committer,
         committer.needsTaskCommit(tContext));
-    committer.commitTask(tContext);
+    commitTask(committer, tContext);
 
     // this is only task commit; there MUST be no part- files in the dest dir
     waitForConsistency();
@@ -758,7 +777,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
 
     describe("3. Committing job");
     assertMultipartUploadsPending(outDir);
-    committer.commitJob(jContext);
+    commitJob(committer, jContext);
 
     // validate output
     describe("4. Validating content");
@@ -809,7 +828,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     // now fail job
     expectSimulatedFailureOnJobCommit(jContext, committer);
 
-    committer.commitJob(jContext);
+    commitJob(committer, jContext);
 
     // but the data got there, due to the order of operations.
     validateContent(outDir, shouldExpectSuccessMarker());
@@ -1011,6 +1030,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
 
     committer.abortJob(jobData.jContext, JobStatus.State.FAILED);
     assertJobAbortCleanedUp(jobData);
+    verifyCommitterHasNoThreads(committer);
   }
 
   /**
@@ -1064,6 +1084,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     // try again; expect abort to be idempotent.
     committer.abortJob(jContext, JobStatus.State.FAILED);
     assertNoMultipartUploadsPending(outDir);
+    verifyCommitterHasNoThreads(committer);
   }
 
   public void assertPart0000DoesNotExist(Path dir) throws Exception {
@@ -1223,8 +1244,8 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     validateTaskAttemptPathAfterWrite(dest);
     assertTrue("Committer does not have data to commit " + committer,
         committer.needsTaskCommit(tContext));
-    committer.commitTask(tContext);
-    committer.commitJob(jContext);
+    commitTask(committer, tContext);
+    commitJob(committer, jContext);
     // validate output
     verifySuccessMarker(outDir);
   }
@@ -1257,6 +1278,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     AbstractS3ACommitter committer2 = (AbstractS3ACommitter)
         outputFormat.getOutputCommitter(newAttempt);
     committer2.abortTask(tContext);
+    verifyCommitterHasNoThreads(committer2);
     assertNoMultipartUploadsPending(getOutDir());
   }
 
@@ -1306,19 +1328,19 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
       // at this point, job1 and job2 both have uncommitted tasks
 
       // commit tasks in order task 2, task 1.
-      committer2.commitTask(tContext2);
-      committer1.commitTask(tContext1);
+      commitTask(committer2, tContext2);
+      commitTask(committer1, tContext1);
 
       assertMultipartUploadsPending(job1Dest);
       assertMultipartUploadsPending(job2Dest);
 
       // commit jobs in order job 1, job 2
-      committer1.commitJob(jContext1);
+      commitJob(committer1, jContext1);
       assertNoMultipartUploadsPending(job1Dest);
       getPart0000(job1Dest);
       assertMultipartUploadsPending(job2Dest);
 
-      committer2.commitJob(jContext2);
+      commitJob(committer2, jContext2);
       getPart0000(job2Dest);
       assertNoMultipartUploadsPending(job2Dest);
     } finally {
@@ -1379,4 +1401,36 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
       TaskAttemptContext context) throws IOException {
   }
 
+  /**
+   * Commit a task then validate the state of the committer afterwards.
+   * @param committer committer
+   * @param tContext task context
+   * @throws IOException IO failure
+   */
+  protected void commitTask(final AbstractS3ACommitter committer,
+      final TaskAttemptContext tContext) throws IOException {
+    committer.commitTask(tContext);
+    verifyCommitterHasNoThreads(committer);
+  }
+
+  /**
+   * Commit a job then validate the state of the committer afterwards.
+   * @param committer committer
+   * @param jContext job context
+   * @throws IOException IO failure
+   */
+  protected void commitJob(final AbstractS3ACommitter committer,
+      final JobContext jContext) throws IOException {
+    committer.commitJob(jContext);
+    verifyCommitterHasNoThreads(committer);
+  }
+
+  /**
+   * Verify that the committer does not have a thread pool.
+   * @param committer committer to validate.
+   */
+  protected void verifyCommitterHasNoThreads(AbstractS3ACommitter committer) {
+    assertFalse("Committer has an active thread pool",
+        committer.hasThreadPool());
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
index 5e6fb82..4ee39f1 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
@@ -73,7 +73,7 @@ public class TestTasks extends HadoopTestBase {
    * more checks on single thread than parallel ops.
    * @return a list of parameter tuples.
    */
-  @Parameterized.Parameters
+  @Parameterized.Parameters(name = "threads={0}")
   public static Collection<Object[]> params() {
     return Arrays.asList(new Object[][]{
         {0},
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
index e3e4449..1045a29 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
@@ -632,10 +632,10 @@ public class ITestS3ACommitterMRJob extends AbstractYarnClusterITest {
         final FileStatus st = fs.getFileStatus(magicDir);
         StringBuilder result = new StringBuilder("Found magic dir which should"
             + " have been deleted at ").append(st).append('\n');
-        result.append("[");
+        result.append(" [");
         applyLocatedFiles(fs.listFiles(magicDir, true),
-            (status) -> result.append(status.getPath()).append('\n'));
-        result.append("[");
+            (status) -> result.append(" ").append(status.getPath()).append('\n'));
+        result.append("]");
         return result.toString();
       });
     }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
index fd585d0..f368bf2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
@@ -251,6 +251,12 @@ public class StagingTestBase {
     verify(mockS3).getFileStatus(path);
   }
 
+  /**
+   * Verify that mkdirs was invoked once.
+   * @param mockS3 mock
+   * @param path path to check
+   * @throws IOException from the mkdirs signature.
+   */
   public static void verifyMkdirsInvoked(FileSystem mockS3, Path path)
       throws IOException {
     verify(mockS3).mkdirs(path);
@@ -320,12 +326,7 @@ public class StagingTestBase {
 
     @Before
     public void setupJob() throws Exception {
-      this.jobConf = new JobConf();
-      jobConf.set(InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID,
-          UUID.randomUUID().toString());
-      jobConf.setBoolean(
-          CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
-          false);
+      this.jobConf = createJobConf();
 
       this.job = new JobContextImpl(jobConf, JOB_ID);
       this.results = new StagingTestBase.ClientResults();
@@ -338,6 +339,16 @@ public class StagingTestBase {
       wrapperFS.setAmazonS3Client(mockClient);
     }
 
+    protected JobConf createJobConf() {
+      JobConf conf = new JobConf();
+      conf.set(InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID,
+          UUID.randomUUID().toString());
+      conf.setBoolean(
+          CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+          false);
+      return conf;
+    }
+
     public S3AFileSystem getMockS3A() {
       return mockFS;
     }
@@ -461,6 +472,11 @@ public class StagingTestBase {
       return deletes;
     }
 
+    public List<String> getDeletePaths() {
+      return deletes.stream().map(DeleteObjectRequest::getKey).collect(
+          Collectors.toList());
+    }
+
     public void resetDeletes() {
       deletes.clear();
     }
@@ -478,6 +494,14 @@ public class StagingTestBase {
       requests.clear();
     }
 
+    public void addUpload(String id, String key) {
+      activeUploads.put(id, key);
+    }
+
+    public void addUploads(Map<String, String> uploadMap) {
+      activeUploads.putAll(uploadMap);
+    }
+
     @Override
     public String toString() {
       final StringBuilder sb = new StringBuilder(
@@ -648,8 +672,9 @@ public class StagingTestBase {
             }
             CompleteMultipartUploadRequest req = getArgumentAt(invocation,
                 0, CompleteMultipartUploadRequest.class);
+            String uploadId = req.getUploadId();
+            removeUpload(results, uploadId);
             results.commits.add(req);
-            results.activeUploads.remove(req.getUploadId());
 
             return newResult(req);
           }
@@ -669,14 +694,7 @@ public class StagingTestBase {
         AbortMultipartUploadRequest req = getArgumentAt(invocation,
             0, AbortMultipartUploadRequest.class);
         String id = req.getUploadId();
-        String p = results.activeUploads.remove(id);
-        if (p == null) {
-          // upload doesn't exist
-          AmazonS3Exception ex = new AmazonS3Exception(
-              "not found " + id);
-          ex.setStatusCode(404);
-          throw ex;
-        }
+        removeUpload(results, id);
         results.aborts.add(req);
         return null;
       }
@@ -729,6 +747,24 @@ public class StagingTestBase {
     return mockClient;
   }
 
+  /**
+   * Remove an upload from the upload map.
+   * @param results result set
+   * @param uploadId The upload ID to remove
+   * @throws AmazonS3Exception with error code 404 if the id is unknown.
+   */
+  protected static void removeUpload(final ClientResults results,
+      final String uploadId) {
+    String removed = results.activeUploads.remove(uploadId);
+    if (removed == null) {
+      // upload doesn't exist
+      AmazonS3Exception ex = new AmazonS3Exception(
+          "not found " + uploadId);
+      ex.setStatusCode(404);
+      throw ex;
+    }
+  }
+
   private static CompleteMultipartUploadResult newResult(
       CompleteMultipartUploadRequest req) {
     return new CompleteMultipartUploadResult();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestDirectoryCommitterScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestDirectoryCommitterScale.java
new file mode 100644
index 0000000..6d93e5f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestDirectoryCommitterScale.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.amazonaws.services.s3.model.PartETag;
+import com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.PENDINGSET_SUFFIX;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.BUCKET;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.outputPath;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.outputPathUri;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.pathIsDirectory;
+
+/**
+ * Scale test of the directory committer: if there are many, many files
+ * does job commit overload.
+ * This is a mock test as to avoid the overhead of going near S3;
+ * it does use a lot of local filesystem files though so as to
+ * simulate real large scale deployment better.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class TestDirectoryCommitterScale
+    extends StagingTestBase.JobCommitterTest<DirectoryStagingCommitter> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestDirectoryCommitterScale.class);
+
+  public static final int TASKS = 500;
+
+  public static final int FILES_PER_TASK = 10;
+
+  public static final int TOTAL_COMMIT_COUNT = FILES_PER_TASK * TASKS;
+
+  public static final int BLOCKS_PER_TASK = 1000;
+
+  private static File stagingDir;
+
+  private static LocalFileSystem localFS;
+
+  private static Path stagingPath;
+
+  private static Map<String, String> activeUploads =
+      Maps.newHashMap();
+
+  @Override
+  DirectoryCommitterForTesting newJobCommitter() throws Exception {
+    return new DirectoryCommitterForTesting(outputPath,
+        createTaskAttemptForJob());
+  }
+
+  @BeforeClass
+  public static void setupStaging() throws Exception {
+    stagingDir = File.createTempFile("staging", "");
+    stagingDir.delete();
+    stagingDir.mkdir();
+    stagingPath = new Path(stagingDir.toURI());
+    localFS = FileSystem.getLocal(new Configuration());
+  }
+
+
+  @AfterClass
+  public static void teardownStaging() throws IOException {
+    try {
+      if (stagingDir != null) {
+        FileUtils.deleteDirectory(stagingDir);
+      }
+    } catch (IOException ignored) {
+
+    }
+  }
+
+  @Override
+  protected JobConf createJobConf() {
+    JobConf conf = super.createJobConf();
+    conf.setInt(
+        CommitConstants.FS_S3A_COMMITTER_THREADS,
+        100);
+    return conf;
+  }
+
+  protected Configuration getJobConf() {
+    return getJob().getConfiguration();
+  }
+
+  @Test
+  public void test_010_createTaskFiles() throws Exception {
+    try (DurationInfo ignored =
+             new DurationInfo(LOG, "Creating %d test files in %s",
+                 TOTAL_COMMIT_COUNT, stagingDir)) {
+      createTasks();
+    }
+  }
+
+  /**
+   * Create the mock uploads of the tasks and save
+   * to .pendingset files.
+   * @throws IOException failure.
+   */
+  private void createTasks() throws IOException {
+    // create a stub multipart commit containing multiple files.
+
+    // step1: a list of tags.
+    // this is the md5sum of hadoop 3.2.1.tar
+    String tag = "9062dcf18ffaee254821303bbd11c72b";
+    List<PartETag> etags = IntStream.rangeClosed(1, BLOCKS_PER_TASK + 1)
+        .mapToObj(i -> new PartETag(i, tag))
+        .collect(Collectors.toList());
+    SinglePendingCommit base = new SinglePendingCommit();
+    base.setBucket(BUCKET);
+    base.setJobId("0000");
+    base.setLength(914688000);
+    base.bindCommitData(etags);
+    // these get overwritten
+    base.setDestinationKey("/base");
+    base.setUploadId("uploadId");
+    base.setUri(outputPathUri.toString());
+
+    SinglePendingCommit[] singles = new SinglePendingCommit[FILES_PER_TASK];
+    byte[] bytes = base.toBytes();
+    for (int i = 0; i < FILES_PER_TASK; i++) {
+      singles[i] = SinglePendingCommit.serializer().fromBytes(bytes);
+    }
+    // now create the files, using this as the template
+
+    int uploadCount = 0;
+    for (int task = 0; task < TASKS; task++) {
+      PendingSet pending = new PendingSet();
+      String taskId = String.format("task-%04d", task);
+
+      for (int i = 0; i < FILES_PER_TASK; i++) {
+        String uploadId = String.format("%05d-task-%04d-file-%02d",
+            uploadCount, task, i);
+        // longer paths to take up more space.
+        Path p = new Path(outputPath,
+            "datasets/examples/testdirectoryscale/"
+                + "year=2019/month=09/day=26/hour=20/second=53"
+                + uploadId);
+        URI dest = p.toUri();
+        SinglePendingCommit commit = singles[i];
+        String key = dest.getPath();
+        commit.setDestinationKey(key);
+        commit.setUri(dest.toString());
+        commit.touch(Instant.now().toEpochMilli());
+        commit.setTaskId(taskId);
+        commit.setUploadId(uploadId);
+        pending.add(commit);
+        activeUploads.put(uploadId, key);
+      }
+      Path path = new Path(stagingPath,
+          String.format("task-%04d." + PENDINGSET_SUFFIX, task));
+      pending.save(localFS, path, true);
+    }
+  }
+
+
+  @Test
+  public void test_020_loadFilesToAttempt() throws Exception {
+    DirectoryStagingCommitter committer = newJobCommitter();
+
+    Configuration jobConf = getJobConf();
+    jobConf.set(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
+    FileSystem mockS3 = getMockS3A();
+    pathIsDirectory(mockS3, outputPath);
+    try (DurationInfo ignored =
+             new DurationInfo(LOG, "listing pending uploads")) {
+      AbstractS3ACommitter.ActiveCommit activeCommit
+          = committer.listPendingUploadsToCommit(getJob());
+      Assertions.assertThat(activeCommit.getSourceFiles())
+          .describedAs("Source files of %s", activeCommit)
+          .hasSize(TASKS);
+    }
+  }
+
+  @Test
+  public void test_030_commitFiles() throws Exception {
+    DirectoryCommitterForTesting committer = newJobCommitter();
+    StagingTestBase.ClientResults results = getMockResults();
+    results.addUploads(activeUploads);
+    Configuration jobConf = getJobConf();
+    jobConf.set(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
+    S3AFileSystem mockS3 = getMockS3A();
+    pathIsDirectory(mockS3, outputPath);
+
+    try (DurationInfo ignored =
+             new DurationInfo(LOG, "Committing Job")) {
+      committer.commitJob(getJob());
+    }
+
+    Assertions.assertThat(results.getCommits())
+        .describedAs("commit count")
+        .hasSize(TOTAL_COMMIT_COUNT);
+    AbstractS3ACommitter.ActiveCommit activeCommit = committer.activeCommit;
+    Assertions.assertThat(activeCommit.getCommittedObjects())
+        .describedAs("committed objects in active commit")
+        .hasSize(Math.min(TOTAL_COMMIT_COUNT,
+            CommitConstants.SUCCESS_MARKER_FILE_LIMIT));
+    Assertions.assertThat(activeCommit.getCommittedFileCount())
+        .describedAs("committed objects in active commit")
+        .isEqualTo(TOTAL_COMMIT_COUNT);
+
+  }
+
+  @Test
+  public void test_040_abortFiles() throws Exception {
+    DirectoryStagingCommitter committer = newJobCommitter();
+    getMockResults().addUploads(activeUploads);
+    Configuration jobConf = getJobConf();
+    jobConf.set(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
+    FileSystem mockS3 = getMockS3A();
+    pathIsDirectory(mockS3, outputPath);
+
+    committer.abortJob(getJob(), JobStatus.State.FAILED);
+  }
+
+
+  /**
+   * Committer overridden for better testing.
+   */
+  private static final class DirectoryCommitterForTesting extends
+      DirectoryStagingCommitter {
+    private ActiveCommit activeCommit;
+
+    private DirectoryCommitterForTesting(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      super(outputPath, context);
+    }
+
+    @Override
+    protected void initOutput(Path out) throws IOException {
+      super.initOutput(out);
+      setOutputPath(out);
+    }
+
+    /**
+     * Returns the mock FS without checking FS type.
+     * @param out output path
+     * @param config job/task config
+     * @return a filesystem.
+     * @throws IOException failure to get the FS
+     */
+    @Override
+    protected FileSystem getDestinationFS(Path out, Configuration config)
+        throws IOException {
+      return out.getFileSystem(config);
+    }
+
+    @Override
+    public Path getJobAttemptPath(JobContext context) {
+      return stagingPath;
+    }
+
+    @Override
+    protected void commitJobInternal(final JobContext context,
+        final ActiveCommit pending)
+        throws IOException {
+      activeCommit = pending;
+      super.commitJobInternal(context, pending);
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
index 8939296..15ea754 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
@@ -35,6 +35,7 @@ import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
 import com.google.common.collect.Sets;
+import org.assertj.core.api.Assertions;
 import org.hamcrest.core.StringStartsWith;
 import org.junit.After;
 import org.junit.Before;
@@ -535,33 +536,31 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
           return jobCommitter.toString();
         });
 
-    assertEquals("Should have succeeded to commit some uploads",
-        5, results.getCommits().size());
-
-    assertEquals("Should have deleted the files that succeeded",
-        5, results.getDeletes().size());
 
     Set<String> commits = results.getCommits()
         .stream()
-        .map((commit) -> commit.getBucketName() + commit.getKey())
+        .map(commit ->
+            "s3a://" + commit.getBucketName() + "/" + commit.getKey())
         .collect(Collectors.toSet());
 
     Set<String> deletes = results.getDeletes()
         .stream()
-        .map((delete) -> delete.getBucketName() + delete.getKey())
+        .map(delete ->
+            "s3a://" + delete.getBucketName() + "/" + delete.getKey())
         .collect(Collectors.toSet());
 
-    assertEquals("Committed and deleted objects should match",
-        commits, deletes);
-
-    assertEquals("Mismatch in aborted upload count",
-        7, results.getAborts().size());
+    Assertions.assertThat(commits)
+        .describedAs("Committed objects compared to deleted paths %s", results)
+        .containsExactlyInAnyOrderElementsOf(deletes);
 
+    Assertions.assertThat(results.getAborts())
+        .describedAs("aborted count in %s", results)
+        .hasSize(7);
     Set<String> uploadIds = getCommittedIds(results.getCommits());
     uploadIds.addAll(getAbortedIds(results.getAborts()));
-
-    assertEquals("Should have committed/deleted or aborted all uploads",
-        uploads, uploadIds);
+    Assertions.assertThat(uploadIds)
+        .describedAs("Combined commit/delete and aborted upload IDs")
+        .containsExactlyInAnyOrderElementsOf(uploads);
 
     assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath);
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
index 994ecef..98075b8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
 import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
 
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@@ -84,17 +85,18 @@ public class TestStagingDirectoryOutputCommitter
         () -> committer.setupJob(getJob()));
 
     // but there are no checks in job commit (HADOOP-15469)
-    committer.commitJob(getJob());
+    // this is done by calling the preCommit method directly,
+    committer.preCommitJob(getJob(), AbstractS3ACommitter.ActiveCommit.empty());
 
-    reset((FileSystem) getMockS3A());
+    reset(getMockS3A());
     pathDoesNotExist(getMockS3A(), outputPath);
 
     committer.setupJob(getJob());
     verifyExistenceChecked(getMockS3A(), outputPath);
     verifyMkdirsInvoked(getMockS3A(), outputPath);
-    verifyNoMoreInteractions((FileSystem) getMockS3A());
+    verifyNoMoreInteractions(getMockS3A());
 
-    reset((FileSystem) getMockS3A());
+    reset(getMockS3A());
     pathDoesNotExist(getMockS3A(), outputPath);
     committer.commitJob(getJob());
     verifyCompletion(getMockS3A());
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
index e7410e3..872097f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
@@ -18,20 +18,21 @@
 
 package org.apache.hadoop.fs.s3a.commit.staging;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 import java.util.UUID;
 
-import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.MockS3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -59,37 +60,59 @@ public class TestStagingPartitionedJobCommit
   /**
    * Subclass of the Partitioned Staging committer used in the test cases.
    */
-  private static final class PartitionedStagingCommitterForTesting
+  private final class PartitionedStagingCommitterForTesting
       extends PartitionedCommitterForTesting {
 
-    private boolean aborted = false;
+    private boolean aborted;
 
     private PartitionedStagingCommitterForTesting(TaskAttemptContext context)
         throws IOException {
       super(StagingTestBase.outputPath, context);
     }
 
+    /**
+     * Generate pending uploads to commit.
+     * This is quite complex as the mock pending uploads need to be saved
+     * to a filesystem for the next stage of the commit process.
+     * To simulate multiple commit, more than one .pendingset file is created,
+     * @param context job context
+     * @return an active commit containing a list of paths to valid pending set
+     * file.
+     * @throws IOException IO failure
+     */
     @Override
-    protected List<SinglePendingCommit> listPendingUploadsToCommit(
+    protected ActiveCommit listPendingUploadsToCommit(
         JobContext context) throws IOException {
-      List<SinglePendingCommit> pending = Lists.newArrayList();
 
+      LocalFileSystem localFS = FileSystem.getLocal(getConf());
+      ActiveCommit activeCommit = new ActiveCommit(localFS,
+          new ArrayList<>(0));
+      // need to create some pending entries.
       for (String dateint : Arrays.asList("20161115", "20161116")) {
+        PendingSet pendingSet = new PendingSet();
         for (String hour : Arrays.asList("13", "14")) {
+          String uploadId = UUID.randomUUID().toString();
           String key = OUTPUT_PREFIX + "/dateint=" + dateint + "/hour=" + hour +
-              "/" + UUID.randomUUID().toString() + ".parquet";
+              "/" + uploadId + ".parquet";
           SinglePendingCommit commit = new SinglePendingCommit();
           commit.setBucket(BUCKET);
           commit.setDestinationKey(key);
           commit.setUri("s3a://" + BUCKET + "/" + key);
-          commit.setUploadId(UUID.randomUUID().toString());
+          commit.setUploadId(uploadId);
           ArrayList<String> etags = new ArrayList<>();
           etags.add("tag1");
           commit.setEtags(etags);
-          pending.add(commit);
+          pendingSet.add(commit);
+          // register the upload so commit operations are not rejected
+          getMockResults().addUpload(uploadId, key);
         }
+        File file = File.createTempFile("staging", ".pendingset");
+        file.deleteOnExit();
+        Path path = new Path(file.toURI());
+        pendingSet.save(localFS, path, true);
+        activeCommit.add(path);
       }
-      return pending;
+      return activeCommit;
     }
 
     @Override
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
index 8116b79..4b56826 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
@@ -27,6 +27,7 @@ import java.util.UUID;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.assertj.core.api.Assertions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -114,18 +115,7 @@ public class TestStagingPartitionedTaskCommit
     reset(mockS3);
 
     committer.commitTask(getTAC());
-    Set<String> files = Sets.newHashSet();
-    for (InitiateMultipartUploadRequest request :
-        getMockResults().getRequests().values()) {
-      assertEquals(BUCKET, request.getBucketName());
-      files.add(request.getKey());
-    }
-    assertEquals("Should have the right number of uploads",
-        relativeFiles.size(), files.size());
-
-    Set<String> expected = buildExpectedList(committer);
-
-    assertEquals("Should have correct paths", expected, files);
+    verifyFilesCreated(committer);
   }
 
   @Test
@@ -146,18 +136,29 @@ public class TestStagingPartitionedTaskCommit
     pathExists(mockS3, new Path(outputPath, relativeFiles.get(2)).getParent());
 
     committer.commitTask(getTAC());
+    verifyFilesCreated(committer);
+  }
+
+  /**
+   * Verify that the files created matches that expected.
+   * @param committer committer
+   */
+  protected void verifyFilesCreated(
+      final PartitionedStagingCommitter committer) {
     Set<String> files = Sets.newHashSet();
     for (InitiateMultipartUploadRequest request :
         getMockResults().getRequests().values()) {
       assertEquals(BUCKET, request.getBucketName());
       files.add(request.getKey());
     }
-    assertEquals("Should have the right number of uploads",
-        relativeFiles.size(), files.size());
+    Assertions.assertThat(files)
+        .describedAs("Should have the right number of uploads")
+        .hasSize(relativeFiles.size());
 
     Set<String> expected = buildExpectedList(committer);
-
-    assertEquals("Should have correct paths", expected, files);
+    Assertions.assertThat(files)
+        .describedAs("Should have correct paths")
+        .containsExactlyInAnyOrderElementsOf(expected);
   }
 
   @Test
@@ -180,18 +181,7 @@ public class TestStagingPartitionedTaskCommit
     pathExists(mockS3, new Path(outputPath, relativeFiles.get(3)).getParent());
 
     committer.commitTask(getTAC());
-    Set<String> files = Sets.newHashSet();
-    for (InitiateMultipartUploadRequest request :
-        getMockResults().getRequests().values()) {
-      assertEquals(BUCKET, request.getBucketName());
-      files.add(request.getKey());
-    }
-    assertEquals("Should have the right number of uploads",
-        relativeFiles.size(), files.size());
-
-    Set<String> expected = buildExpectedList(committer);
-
-    assertEquals("Should have correct paths", expected, files);
+    verifyFilesCreated(committer);
   }
 
   public Set<String> buildExpectedList(StagingCommitter committer) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org