You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/11/26 17:28:31 UTC

[hadoop] branch branch-3.3 updated (c48c774 -> 1e59bf7)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from c48c774  HADOOP-17397. ABFS: SAS Test updates for version and permission update (#2492)
     new 1eeb9d9  HADOOP-17318. Support concurrent S3A commit jobs with same app attempt ID. (#2399)
     new 1e59bf7  HADOOP-17385. ITestS3ADeleteCost.testDirMarkersFileCreation failure (#2473).

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/resources/core-default.xml            |  13 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  18 +-
 .../apache/hadoop/fs/s3a/WriteOperationHelper.java |   9 +-
 .../hadoop/fs/s3a/commit/AbstractS3ACommitter.java | 376 ++++++++++++++++++---
 .../hadoop/fs/s3a/commit/CommitConstants.java      |  68 +++-
 .../hadoop/fs/s3a/commit/CommitOperations.java     |  17 +-
 .../hadoop/fs/s3a/commit/CommitUtilsWithMR.java    |  45 +--
 .../fs/s3a/commit/InternalCommitterConstants.java  |  31 +-
 .../hadoop/fs/s3a/commit/files/PendingSet.java     |  29 +-
 .../fs/s3a/commit/files/PersistentCommitData.java  |   2 +-
 .../fs/s3a/commit/files/SinglePendingCommit.java   |   2 +-
 .../hadoop/fs/s3a/commit/files/SuccessData.java    |  27 +-
 .../fs/s3a/commit/magic/MagicS3GuardCommitter.java |  26 +-
 .../staging/PartitionedStagingCommitter.java       |   5 +-
 .../apache/hadoop/fs/s3a/commit/staging/Paths.java |  16 +-
 .../fs/s3a/commit/staging/StagingCommitter.java    |  94 +-----
 .../commit/staging/StagingCommitterConstants.java  |  15 -
 .../site/markdown/tools/hadoop-aws/committers.md   | 314 ++++++++++++++---
 .../tools/hadoop-aws/troubleshooting_s3a.md        |  61 +++-
 .../apache/hadoop/fs/contract/s3a/S3AContract.java |   4 +
 .../apache/hadoop/fs/s3a/AbstractS3ATestBase.java  |   3 +
 .../hadoop/fs/s3a/commit/AbstractCommitITest.java  |  17 +-
 .../fs/s3a/commit/AbstractITCommitProtocol.java    | 301 ++++++++++++++++-
 .../fs/s3a/commit/LoggingTextOutputFormat.java     |   2 +-
 .../commit/integration/ITestS3ACommitterMRJob.java |   7 +-
 .../s3a/commit/magic/ITestMagicCommitProtocol.java |  43 ++-
 .../fs/s3a/commit/staging/StagingTestBase.java     |   4 +-
 .../s3a/commit/staging/TestStagingCommitter.java   | 147 ++++++--
 .../staging/TestStagingPartitionedJobCommit.java   |   2 +-
 .../integration/ITestStagingCommitProtocol.java    |  14 +-
 .../fs/s3a/commit/terasort/ITestTerasortOnS3A.java |   2 +-
 .../fs/s3a/performance/AbstractS3ACostTest.java    |   9 +
 .../fs/s3a/performance/ITestS3ADeleteCost.java     |   1 +
 .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java    |   2 +-
 34 files changed, 1421 insertions(+), 305 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/02: HADOOP-17318. Support concurrent S3A commit jobs with same app attempt ID. (#2399)

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 1eeb9d9d6784358a84020bd1e82da37ce3410364
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Wed Nov 18 13:34:36 2020 +0000

    HADOOP-17318. Support concurrent S3A commit jobs with same app attempt ID. (#2399)
    
    See also [SPARK-33402]: Jobs launched in same second have duplicate MapReduce JobIDs
    
    Contributed by Steve Loughran.
    
    Change-Id: Iae65333cddc84692997aae5d902ad8765b45772a
---
 .../src/main/resources/core-default.xml            |  13 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |   9 +-
 .../apache/hadoop/fs/s3a/WriteOperationHelper.java |   9 +-
 .../hadoop/fs/s3a/commit/AbstractS3ACommitter.java | 376 ++++++++++++++++++---
 .../hadoop/fs/s3a/commit/CommitConstants.java      |  68 +++-
 .../hadoop/fs/s3a/commit/CommitOperations.java     |  17 +-
 .../hadoop/fs/s3a/commit/CommitUtilsWithMR.java    |  45 +--
 .../fs/s3a/commit/InternalCommitterConstants.java  |  31 +-
 .../hadoop/fs/s3a/commit/files/PendingSet.java     |  29 +-
 .../fs/s3a/commit/files/PersistentCommitData.java  |   2 +-
 .../fs/s3a/commit/files/SinglePendingCommit.java   |   2 +-
 .../hadoop/fs/s3a/commit/files/SuccessData.java    |  27 +-
 .../fs/s3a/commit/magic/MagicS3GuardCommitter.java |  26 +-
 .../staging/PartitionedStagingCommitter.java       |   5 +-
 .../apache/hadoop/fs/s3a/commit/staging/Paths.java |  16 +-
 .../fs/s3a/commit/staging/StagingCommitter.java    |  94 +-----
 .../commit/staging/StagingCommitterConstants.java  |  15 -
 .../site/markdown/tools/hadoop-aws/committers.md   | 314 ++++++++++++++---
 .../tools/hadoop-aws/troubleshooting_s3a.md        |  61 +++-
 .../hadoop/fs/s3a/commit/AbstractCommitITest.java  |  17 +-
 .../fs/s3a/commit/AbstractITCommitProtocol.java    | 301 ++++++++++++++++-
 .../fs/s3a/commit/LoggingTextOutputFormat.java     |   2 +-
 .../commit/integration/ITestS3ACommitterMRJob.java |   7 +-
 .../s3a/commit/magic/ITestMagicCommitProtocol.java |  43 ++-
 .../fs/s3a/commit/staging/StagingTestBase.java     |   4 +-
 .../s3a/commit/staging/TestStagingCommitter.java   | 147 ++++++--
 .../staging/TestStagingPartitionedJobCommit.java   |   2 +-
 .../integration/ITestStagingCommitProtocol.java    |  14 +-
 .../fs/s3a/commit/terasort/ITestTerasortOnS3A.java |   2 +-
 .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java    |   2 +-
 30 files changed, 1395 insertions(+), 305 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index cf156af..2c3f14a 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1925,20 +1925,13 @@
 </property>
 
 <property>
-  <name>fs.s3a.committer.staging.abort.pending.uploads</name>
+  <name>fs.s3a.committer.abort.pending.uploads</name>
   <value>true</value>
   <description>
-    Should the staging committers abort all pending uploads to the destination
+    Should the committers abort all pending uploads to the destination
     directory?
 
-    Changing this if more than one partitioned committer is
-    writing to the same destination tree simultaneously; otherwise
-    the first job to complete will cancel all outstanding uploads from the
-    others. However, it may lead to leaked outstanding uploads from failed
-    tasks. If disabled, configure the bucket lifecycle to remove uploads
-    after a time period, and/or set up a workflow to explicitly delete
-    entries. Otherwise there is a risk that uncommitted uploads may run up
-    bills.
+    Set to false if more than one job is writing to the same directory tree.
   </description>
 </property>
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index f8e6ab2..cc37df7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -180,6 +180,8 @@ import static org.apache.hadoop.fs.s3a.auth.RolePolicies.STATEMENT_ALLOW_SSE_KMS
 import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations;
 import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable;
 import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
 import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
@@ -314,9 +316,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
-    // this is retained as a placeholder for when new deprecated keys
-    // need to be added.
     Configuration.DeprecationDelta[] deltas = {
+        new Configuration.DeprecationDelta(
+            FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS,
+            FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS)
     };
 
     if (deltas.length > 0) {
@@ -4593,7 +4596,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    */
   @Retries.OnceRaw
   void abortMultipartUpload(String destKey, String uploadId) {
-    LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
+    LOG.info("Aborting multipart upload {} to {}", uploadId, destKey);
     getAmazonS3Client().abortMultipartUpload(
         new AbortMultipartUploadRequest(getBucket(),
             destKey,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index 26d0942..1a0218a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -131,6 +131,8 @@ public class WriteOperationHelper implements WriteOperations {
    */
   void operationRetried(String text, Exception ex, int retries,
       boolean idempotent) {
+    LOG.info("{}: Retried {}: {}", text, retries, ex.toString());
+    LOG.debug("Stack", ex);
     owner.operationRetried(text, ex, retries, idempotent);
   }
 
@@ -323,7 +325,9 @@ public class WriteOperationHelper implements WriteOperations {
   public void abortMultipartUpload(String destKey, String uploadId,
       Retried retrying)
       throws IOException {
-    invoker.retry("Aborting multipart upload", destKey, true,
+    invoker.retry("Aborting multipart upload ID " + uploadId,
+        destKey,
+        true,
         retrying,
         () -> owner.abortMultipartUpload(
             destKey,
@@ -585,7 +589,8 @@ public class WriteOperationHelper implements WriteOperations {
   @Retries.RetryTranslated
   public UploadPartResult uploadPart(UploadPartRequest request)
       throws IOException {
-    return retry("upload part",
+    return retry("upload part #" + request.getPartNumber()
+        + " upload ID "+ request.getUploadId(),
         request.getKey(),
         true,
         () -> owner.uploadPart(request));
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
index 32d00a4..8310b99 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
@@ -20,13 +20,14 @@ package org.apache.hadoop.fs.s3a.commit;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.text.DateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import com.amazonaws.services.s3.model.MultipartUpload;
 import com.google.common.annotations.VisibleForTesting;
@@ -35,6 +36,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,8 +48,10 @@ import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DurationInfo;
@@ -58,6 +63,10 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
 import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
 
 /**
  * Abstract base class for S3A committers; allows for any commonality
@@ -86,12 +95,41 @@ import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
  * committer was large enough for more all the parallel POST requests.
  */
 public abstract class AbstractS3ACommitter extends PathOutputCommitter {
+
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractS3ACommitter.class);
 
   public static final String THREAD_PREFIX = "s3a-committer-pool-";
 
   /**
+   * Error string when task setup fails.
+   */
+  @VisibleForTesting
+  public static final String E_SELF_GENERATED_JOB_UUID
+      = "has a self-generated job UUID";
+
+  /**
+   * Unique ID for a Job.
+   * In MapReduce Jobs the YARN JobID suffices.
+   * On Spark this only be the YARN JobID
+   * it is known to be creating strongly unique IDs
+   * (i.e. SPARK-33402 is on the branch).
+   */
+  private final String uuid;
+
+  /**
+   * Source of the {@link #uuid} value.
+   */
+  private final JobUUIDSource uuidSource;
+
+  /**
+   * Has this instance been used for job setup?
+   * If so then it is safe for a locally generated
+   * UUID to be used for task setup.
+   */
+  private boolean jobSetup;
+
+  /**
    * Thread pool for task execution.
    */
   private ExecutorService threadPool;
@@ -147,14 +185,19 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
     this.jobContext = context;
     this.role = "Task committer " + context.getTaskAttemptID();
     setConf(context.getConfiguration());
+    Pair<String, JobUUIDSource> id = buildJobUUID(
+        conf, context.getJobID());
+    this.uuid = id.getLeft();
+    this.uuidSource = id.getRight();
+    LOG.info("Job UUID {} source {}", getUUID(), getUUIDSource().getText());
     initOutput(outputPath);
     LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
         role, jobName(context), jobIdString(context), outputPath);
     S3AFileSystem fs = getDestS3AFS();
-    createJobMarker = context.getConfiguration().getBoolean(
+    this.createJobMarker = context.getConfiguration().getBoolean(
         CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
         DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER);
-    commitOperations = new CommitOperations(fs);
+    this.commitOperations = new CommitOperations(fs);
   }
 
   /**
@@ -202,7 +245,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
    * @return the working path.
    */
   @Override
-  public Path getWorkPath() {
+  public final Path getWorkPath() {
     return workPath;
   }
 
@@ -210,16 +253,16 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
    * Set the work path for this committer.
    * @param workPath the work path to use.
    */
-  protected void setWorkPath(Path workPath) {
+  protected final void setWorkPath(Path workPath) {
     LOG.debug("Setting work path to {}", workPath);
     this.workPath = workPath;
   }
 
-  public Configuration getConf() {
+  public final Configuration getConf() {
     return conf;
   }
 
-  protected void setConf(Configuration conf) {
+  protected final void setConf(Configuration conf) {
     this.conf = conf;
   }
 
@@ -308,6 +351,24 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
    */
   public abstract String getName();
 
+  /**
+   * The Job UUID, as passed in or generated.
+   * @return the UUID for the job.
+   */
+  @VisibleForTesting
+  public final String getUUID() {
+    return uuid;
+  }
+
+  /**
+   * Source of the UUID.
+   * @return how the job UUID was retrieved/generated.
+   */
+  @VisibleForTesting
+  public final JobUUIDSource getUUIDSource() {
+    return uuidSource;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(
@@ -316,6 +377,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
     sb.append(", name=").append(getName());
     sb.append(", outputPath=").append(getOutputPath());
     sb.append(", workPath=").append(workPath);
+    sb.append(", uuid='").append(getUUID()).append('\'');
+    sb.append(", uuid source=").append(getUUIDSource());
     sb.append('}');
     return sb.toString();
   }
@@ -394,6 +457,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
       // create a success data structure and then save it
       SuccessData successData = new SuccessData();
       successData.setCommitter(getName());
+      successData.setJobId(uuid);
+      successData.setJobIdSource(uuidSource.getText());
       successData.setDescription(getRole());
       successData.setHostname(NetUtils.getLocalHostname());
       Date now = new Date();
@@ -411,26 +476,60 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
    * be deleted; creating it now ensures there is something at the end
    * while the job is in progress -and if nothing is created, that
    * it is still there.
+   * <p>
+   *   The option {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}
+   *   is set to the job UUID; if generated locally
+   *   {@link InternalCommitterConstants#SPARK_WRITE_UUID} is also patched.
+   *   The field {@link #jobSetup} is set to true to note that
+   *   this specific committer instance was used to set up a job.
+   * </p>
    * @param context context
    * @throws IOException IO failure
    */
 
   @Override
   public void setupJob(JobContext context) throws IOException {
-    try (DurationInfo d = new DurationInfo(LOG, "preparing destination")) {
+    try (DurationInfo d = new DurationInfo(LOG,
+        "Job %s setting up", getUUID())) {
+      // record that the job has been set up
+      jobSetup = true;
+      // patch job conf with the job UUID.
+      Configuration c = context.getConfiguration();
+      c.set(FS_S3A_COMMITTER_UUID, getUUID());
+      c.set(FS_S3A_COMMITTER_UUID_SOURCE, getUUIDSource().getText());
+      Path dest = getOutputPath();
       if (createJobMarker){
-        commitOperations.deleteSuccessMarker(getOutputPath());
+        commitOperations.deleteSuccessMarker(dest);
       }
-      getDestFS().mkdirs(getOutputPath());
+      getDestFS().mkdirs(dest);
+      // do a scan for surplus markers
+      warnOnActiveUploads(dest);
     }
   }
 
+  /**
+   * Task setup. Fails if the the UUID was generated locally, and
+   * the same committer wasn't used for job setup.
+   * {@inheritDoc}
+   * @throws PathCommitException if the task UUID options are unsatisfied.
+   */
   @Override
   public void setupTask(TaskAttemptContext context) throws IOException {
+    TaskAttemptID attemptID = context.getTaskAttemptID();
     try (DurationInfo d = new DurationInfo(LOG, "Setup Task %s",
-        context.getTaskAttemptID())) {
+        attemptID)) {
+      // reject attempts to set up the task where the output won't be
+      // picked up
+      if (!jobSetup
+          && getUUIDSource() == JobUUIDSource.GeneratedLocally) {
+        // on anything other than a test run, the context must not have been
+        // generated locally.
+        throw new PathCommitException(getOutputPath().toString(),
+            "Task attempt " + attemptID
+                + " " + E_SELF_GENERATED_JOB_UUID);
+      }
       Path taskAttemptPath = getTaskAttemptPath(context);
-      FileSystem fs = getTaskAttemptFilesystem(context);
+      FileSystem fs = taskAttemptPath.getFileSystem(getConf());
       fs.mkdirs(taskAttemptPath);
     }
   }
@@ -474,12 +573,12 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
           .stopOnFailure()
           .suppressExceptions(false)
           .executeWith(buildSubmitter(context))
-          .abortWith(path ->
-              loadAndAbort(commitContext, pending, path, true, false))
-          .revertWith(path ->
-              loadAndRevert(commitContext, pending, path))
-          .run(path ->
-              loadAndCommit(commitContext, pending, path));
+          .abortWith(status ->
+              loadAndAbort(commitContext, pending, status, true, false))
+          .revertWith(status ->
+              loadAndRevert(commitContext, pending, status))
+          .run(status ->
+              loadAndCommit(commitContext, pending, status));
     }
   }
 
@@ -504,7 +603,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
           .stopOnFailure()
           .suppressExceptions(false)
           .executeWith(buildSubmitter(context))
-          .run(path -> PendingSet.load(sourceFS, path));
+          .run(status -> PendingSet.load(sourceFS, status));
     }
   }
 
@@ -512,17 +611,26 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
    * Load a pendingset file and commit all of its contents.
    * @param commitContext context to commit through
    * @param activeCommit commit state
-   * @param path path to load
+   * @param status file to load
    * @throws IOException failure
    */
   private void loadAndCommit(
       final CommitOperations.CommitContext commitContext,
       final ActiveCommit activeCommit,
-      final Path path) throws IOException {
+      final FileStatus status) throws IOException {
 
+    final Path path = status.getPath();
     try (DurationInfo ignored =
-             new DurationInfo(LOG, false, "Committing %s", path)) {
-      PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path);
+             new DurationInfo(LOG,
+                 "Loading and committing files in pendingset %s", path)) {
+      PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(),
+          status);
+      String jobId = pendingSet.getJobId();
+      if (!StringUtils.isEmpty(jobId) && !getUUID().equals(jobId)) {
+        throw new PathCommitException(path,
+            String.format("Mismatch in Job ID (%s) and commit job ID (%s)",
+                getUUID(), jobId));
+      }
       Tasks.foreach(pendingSet.getCommits())
           .stopOnFailure()
           .suppressExceptions(false)
@@ -543,17 +651,19 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
    * Load a pendingset file and revert all of its contents.
    * @param commitContext context to commit through
    * @param activeCommit commit state
-   * @param path path to load
+   * @param status status of file to load
    * @throws IOException failure
    */
   private void loadAndRevert(
       final CommitOperations.CommitContext commitContext,
       final ActiveCommit activeCommit,
-      final Path path) throws IOException {
+      final FileStatus status) throws IOException {
 
+    final Path path = status.getPath();
     try (DurationInfo ignored =
              new DurationInfo(LOG, false, "Committing %s", path)) {
-      PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path);
+      PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(),
+          status);
       Tasks.foreach(pendingSet.getCommits())
           .suppressExceptions(true)
           .run(commitContext::revertCommit);
@@ -564,21 +674,22 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
    * Load a pendingset file and abort all of its contents.
    * @param commitContext context to commit through
    * @param activeCommit commit state
-   * @param path path to load
+   * @param status status of file to load
    * @param deleteRemoteFiles should remote files be deleted?
    * @throws IOException failure
    */
   private void loadAndAbort(
       final CommitOperations.CommitContext commitContext,
       final ActiveCommit activeCommit,
-      final Path path,
+      final FileStatus status,
       final boolean suppressExceptions,
       final boolean deleteRemoteFiles) throws IOException {
 
+    final Path path = status.getPath();
     try (DurationInfo ignored =
              new DurationInfo(LOG, false, "Aborting %s", path)) {
       PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(),
-          path);
+          status);
       FileSystem fs = getDestFS();
       Tasks.foreach(pendingSet.getCommits())
           .executeWith(singleThreadSubmitter())
@@ -659,6 +770,13 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
    */
   protected void abortPendingUploadsInCleanup(
       boolean suppressExceptions) throws IOException {
+    // return early if aborting is disabled.
+    if (!shouldAbortUploadsInCleanup()) {
+      LOG.debug("Not cleanup up pending uploads to {} as {} is false ",
+          getOutputPath(),
+          FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS);
+      return;
+    }
     Path dest = getOutputPath();
     try (DurationInfo ignored =
              new DurationInfo(LOG, "Aborting all pending commits under %s",
@@ -674,14 +792,27 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
         maybeIgnore(suppressExceptions, "aborting pending uploads", e);
         return;
       }
-      Tasks.foreach(pending)
-          .executeWith(buildSubmitter(getJobContext()))
-          .suppressExceptions(suppressExceptions)
-          .run(u -> commitContext.abortMultipartCommit(
-              u.getKey(), u.getUploadId()));
+      if (!pending.isEmpty()) {
+        LOG.warn("{} pending uploads were found -aborting", pending.size());
+        LOG.warn("If other tasks/jobs are writing to {},"
+            + "this action may cause them to fail", dest);
+        Tasks.foreach(pending)
+            .executeWith(buildSubmitter(getJobContext()))
+            .suppressExceptions(suppressExceptions)
+            .run(u -> commitContext.abortMultipartCommit(
+                u.getKey(), u.getUploadId()));
+      } else {
+        LOG.info("No pending uploads were found");
+      }
     }
   }
 
+  private boolean shouldAbortUploadsInCleanup() {
+    return getConf()
+        .getBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS,
+            DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS);
+  }
+
   /**
    * Subclass-specific pre-Job-commit actions.
    * The staging committers all load the pending files to verify that
@@ -1045,6 +1176,166 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
   }
 
   /**
+   * Scan for active uploads and list them along with a warning message.
+   * Errors are ignored.
+   * @param path output path of job.
+   */
+  protected void warnOnActiveUploads(final Path path) {
+    List<MultipartUpload> pending;
+    try {
+      pending = getCommitOperations()
+          .listPendingUploadsUnderPath(path);
+    } catch (IOException e) {
+      LOG.debug("Failed to list uploads under {}",
+          path, e);
+      return;
+    }
+    if (!pending.isEmpty()) {
+      // log a warning
+      LOG.warn("{} active upload(s) in progress under {}",
+          pending.size(),
+          path);
+      LOG.warn("Either jobs are running concurrently"
+          + " or failed jobs are not being cleaned up");
+      // and the paths + timestamps
+      DateFormat df = DateFormat.getDateTimeInstance();
+      pending.forEach(u ->
+          LOG.info("[{}] {}",
+              df.format(u.getInitiated()),
+              u.getKey()));
+      if (shouldAbortUploadsInCleanup()) {
+        LOG.warn("This committer will abort these uploads in job cleanup");
+      }
+    }
+  }
+
+  /**
+   * Build the job UUID.
+   *
+   * <p>
+   *  In MapReduce jobs, the application ID is issued by YARN, and
+   *  unique across all jobs.
+   * </p>
+   * <p>
+   * Spark will use a fake app ID based on the current time.
+   * This can lead to collisions on busy clusters unless
+   * the specific spark release has SPARK-33402 applied.
+   * This appends a random long value to the timestamp, so
+   * is unique enough that the risk of collision is almost
+   * nonexistent.
+   * </p>
+   * <p>
+   *   The order of selection of a uuid is
+   * </p>
+   * <ol>
+   *   <li>Value of
+   *   {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}.</li>
+   *   <li>Value of
+   *   {@link InternalCommitterConstants#SPARK_WRITE_UUID}.</li>
+   *   <li>If enabled through
+   *   {@link CommitConstants#FS_S3A_COMMITTER_GENERATE_UUID}:
+   *   Self-generated uuid.</li>
+   *   <li>If {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID}
+   *   is not set: Application ID</li>
+   * </ol>
+   * The UUID bonding takes place during construction;
+   * the staging committers use it to set up their wrapped
+   * committer to a path in the cluster FS which is unique to the
+   * job.
+   * <p>
+   *  In MapReduce jobs, the application ID is issued by YARN, and
+   *  unique across all jobs.
+   * </p>
+   * In {@link #setupJob(JobContext)} the job context's configuration
+   * will be patched
+   * be valid in all sequences where the job has been set up for the
+   * configuration passed in.
+   * <p>
+   *   If the option {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID}
+   *   is set, then an external UUID MUST be passed in.
+   *   This can be used to verify that the spark engine is reliably setting
+   *   unique IDs for staging.
+   * </p>
+   * @param conf job/task configuration
+   * @param jobId job ID from YARN or spark.
+   * @return Job UUID and source of it.
+   * @throws PathCommitException no UUID was found and it was required
+   */
+  public static Pair<String, JobUUIDSource>
+      buildJobUUID(Configuration conf, JobID jobId)
+      throws PathCommitException {
+
+    String jobUUID = conf.getTrimmed(FS_S3A_COMMITTER_UUID, "");
+
+    if (!jobUUID.isEmpty()) {
+      return Pair.of(jobUUID, JobUUIDSource.CommitterUUIDProperty);
+    }
+    // there is no job UUID.
+    // look for one from spark
+    jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, "");
+    if (!jobUUID.isEmpty()) {
+      return Pair.of(jobUUID, JobUUIDSource.SparkWriteUUID);
+    }
+
+    // there is no UUID configuration in the job/task config
+
+    // Check the job hasn't declared a requirement for the UUID.
+    // This allows or fail-fast validation of Spark behavior.
+    if (conf.getBoolean(FS_S3A_COMMITTER_REQUIRE_UUID,
+        DEFAULT_S3A_COMMITTER_REQUIRE_UUID)) {
+      throw new PathCommitException("", E_NO_SPARK_UUID);
+    }
+
+    // see if the job can generate a random UUI`
+    if (conf.getBoolean(FS_S3A_COMMITTER_GENERATE_UUID,
+        DEFAULT_S3A_COMMITTER_GENERATE_UUID)) {
+      // generate a random UUID. This is OK for a job, for a task
+      // it means that the data may not get picked up.
+      String newId = UUID.randomUUID().toString();
+      LOG.warn("No job ID in configuration; generating a random ID: {}",
+          newId);
+      return Pair.of(newId, JobUUIDSource.GeneratedLocally);
+    }
+    // if no other option was supplied, return the job ID.
+    // This is exactly what MR jobs expect, but is not what
+    // Spark jobs can do as there is a risk of jobID collision.
+    return Pair.of(jobId.toString(), JobUUIDSource.JobID);
+  }
+
+  /**
+   * Enumeration of Job UUID source.
+   */
+  public enum JobUUIDSource {
+    SparkWriteUUID(SPARK_WRITE_UUID),
+    CommitterUUIDProperty(FS_S3A_COMMITTER_UUID),
+    JobID("JobID"),
+    GeneratedLocally("Generated Locally");
+
+    private final String text;
+
+    JobUUIDSource(final String text) {
+      this.text = text;
+    }
+
+    /**
+     * Source for messages.
+     * @return text
+     */
+    public String getText() {
+      return text;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "JobUUIDSource{");
+      sb.append("text='").append(text).append('\'');
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
+  /**
    * State of the active commit operation.
    *
    * It contains a list of all pendingset files to load as the source
@@ -1071,7 +1362,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
         = new ActiveCommit(null, new ArrayList<>());
 
     /** All pendingset files to iterate through. */
-    private final List<Path> sourceFiles;
+    private final List<FileStatus> sourceFiles;
 
     /**
      * Filesystem for the source files.
@@ -1101,8 +1392,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
      */
     public ActiveCommit(
         final FileSystem sourceFS,
-        final List<Path> sourceFiles) {
-      this.sourceFiles = sourceFiles;
+        final List<? extends FileStatus> sourceFiles) {
+      this.sourceFiles = (List<FileStatus>) sourceFiles;
       this.sourceFS = sourceFS;
     }
 
@@ -1115,10 +1406,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
     public static ActiveCommit fromStatusList(
         final FileSystem pendingFS,
         final List<? extends FileStatus> statuses) {
-      return new ActiveCommit(pendingFS,
-          statuses.stream()
-              .map(FileStatus::getPath)
-              .collect(Collectors.toList()));
+      return new ActiveCommit(pendingFS, statuses);
     }
 
     /**
@@ -1129,7 +1417,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
       return EMPTY;
     }
 
-    public List<Path> getSourceFiles() {
+    public List<FileStatus> getSourceFiles() {
       return sourceFiles;
     }
 
@@ -1174,8 +1462,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
       return sourceFiles.isEmpty();
     }
 
-    public void add(Path path) {
-      sourceFiles.add(path);
+    public void add(FileStatus status) {
+      sourceFiles.add(status);
     }
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
index e7c0492..3224a5a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
@@ -240,20 +240,39 @@ public final class CommitConstants {
 
 
   /**
-   * Should the staging committers abort all pending uploads to the destination
-   * directory? Default: true.
-   *
-   * Changing this is if more than one partitioned committer is
+   * Should committers abort all pending uploads to the destination
+   * directory?
+   * <p>
+   * Deprecated: switch to {@link #FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS}.
+   */
+  @Deprecated
+  public static final String FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS =
+      "fs.s3a.committer.staging.abort.pending.uploads";
+
+  /**
+   * Should committers abort all pending uploads to the destination
+   * directory?
+   * <p>
+   * Value: {@value}.
+   * <p>
+   * Change this is if more than one committer is
    * writing to the same destination tree simultaneously; otherwise
    * the first job to complete will cancel all outstanding uploads from the
-   * others. However, it may lead to leaked outstanding uploads from failed
-   * tasks. If disabled, configure the bucket lifecycle to remove uploads
+   * others. If disabled, configure the bucket lifecycle to remove uploads
    * after a time period, and/or set up a workflow to explicitly delete
    * entries. Otherwise there is a risk that uncommitted uploads may run up
    * bills.
    */
-  public static final String FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS =
-      "fs.s3a.committer.staging.abort.pending.uploads";
+  public static final String FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS =
+      "fs.s3a.committer.abort.pending.uploads";
+
+  /**
+   * Default configuration value for
+   * {@link #FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS}.
+   * Value: {@value}.
+   */
+  public static final boolean DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS =
+      true;
 
   /**
    * The limit to the number of committed objects tracked during
@@ -264,4 +283,37 @@ public final class CommitConstants {
   /** Extra Data key for task attempt in pendingset files. */
   public static final String TASK_ATTEMPT_ID = "task.attempt.id";
 
+  /**
+   * Require the spark UUID to be passed down: {@value}.
+   * This is to verify that SPARK-33230 has been applied to spark, and that
+   * {@link InternalCommitterConstants#SPARK_WRITE_UUID} is set.
+   * <p>
+   *   MUST ONLY BE SET WITH SPARK JOBS.
+   * </p>
+   */
+  public static final String FS_S3A_COMMITTER_REQUIRE_UUID =
+      "fs.s3a.committer.require.uuid";
+
+  /**
+   * Default value for {@link #FS_S3A_COMMITTER_REQUIRE_UUID}: {@value}.
+   */
+  public static final boolean DEFAULT_S3A_COMMITTER_REQUIRE_UUID =
+      false;
+
+  /**
+   * Generate a UUID in job setup rather than fall back to
+   * YARN Application attempt ID.
+   * <p>
+   *   MUST ONLY BE SET WITH SPARK JOBS.
+   * </p>
+   */
+  public static final String FS_S3A_COMMITTER_GENERATE_UUID =
+      "fs.s3a.committer.generate.uuid";
+
+  /**
+   * Default value for {@link #FS_S3A_COMMITTER_GENERATE_UUID}: {@value}.
+   */
+  public static final boolean DEFAULT_S3A_COMMITTER_GENERATE_UUID =
+      false;
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
index 155e86a..a558b09 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
@@ -159,7 +159,11 @@ public class CommitOperations {
     LOG.debug("Committing single commit {}", commit);
     MaybeIOE outcome;
     String destKey = "unknown destination";
-    try {
+    try (DurationInfo d = new DurationInfo(LOG,
+        "Committing file %s size %s",
+        commit.getDestinationKey(),
+        commit.getLength())) {
+
       commit.validate();
       destKey = commit.getDestinationKey();
       long l = innerCommit(commit, operationState);
@@ -273,7 +277,7 @@ public class CommitOperations {
                     ? (" defined in " + commit.getFilename())
                     : "";
     String uploadId = commit.getUploadId();
-    LOG.info("Aborting commit to object {}{}", destKey, origin);
+    LOG.info("Aborting commit ID {} to object {}{}", uploadId, destKey, origin);
     abortMultipartCommit(destKey, uploadId);
   }
 
@@ -287,7 +291,8 @@ public class CommitOperations {
    */
   private void abortMultipartCommit(String destKey, String uploadId)
       throws IOException {
-    try {
+    try (DurationInfo d = new DurationInfo(LOG,
+        "Aborting commit ID %s to path %s", uploadId, destKey)) {
       writeOperations.abortMultipartCommit(destKey, uploadId);
     } finally {
       statistics.commitAborted();
@@ -462,7 +467,11 @@ public class CommitOperations {
     String uploadId = null;
 
     boolean threw = true;
-    try {
+    try (DurationInfo d = new DurationInfo(LOG,
+        "Upload staged file from %s to %s",
+        localFile.getAbsolutePath(),
+        destPath)) {
+
       statistics.commitCreated();
       uploadId = writeOperations.initiateMultiPartUpload(destKey);
       long length = localFile.length();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java
index c6c0da8..9e5ee86 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java
@@ -58,7 +58,7 @@ public final class CommitUtilsWithMR {
   /**
    * Get the Application Attempt ID for this job.
    * @param context the context to look in
-   * @return the Application Attempt ID for a given job.
+   * @return the Application Attempt ID for a given job, or 0
    */
   public static int getAppAttemptId(JobContext context) {
     return context.getConfiguration().getInt(
@@ -67,33 +67,32 @@ public final class CommitUtilsWithMR {
 
   /**
    * Compute the "magic" path for a job attempt.
-   * @param appAttemptId the ID of the application attempt for this job.
+   * @param jobUUID unique Job ID.
    * @param dest the final output directory
    * @return the path to store job attempt data.
    */
-  public static Path getMagicJobAttemptPath(int appAttemptId, Path dest) {
+  public static Path getMagicJobAttemptPath(String jobUUID, Path dest) {
     return new Path(getMagicJobAttemptsPath(dest),
-        formatAppAttemptDir(appAttemptId));
+        formatAppAttemptDir(jobUUID));
   }
 
   /**
    * Format the application attempt directory.
-   * @param attemptId attempt ID
+   * @param jobUUID unique Job ID.
    * @return the directory name for the application attempt
    */
-  public static String formatAppAttemptDir(int attemptId) {
-    return String.format("app-attempt-%04d", attemptId);
+  public static String formatAppAttemptDir(String jobUUID) {
+    return String.format("job-%s", jobUUID);
   }
 
   /**
    * Compute the path where the output of magic task attempts are stored.
-   * @param context the context of the job with magic tasks.
+   * @param jobUUID unique Job ID.
    * @param dest destination of work
    * @return the path where the output of magic task attempts are stored.
    */
-  public static Path getMagicTaskAttemptsPath(JobContext context, Path dest) {
-    return new Path(getMagicJobAttemptPath(
-        getAppAttemptId(context), dest), "tasks");
+  public static Path getMagicTaskAttemptsPath(String jobUUID, Path dest) {
+    return new Path(getMagicJobAttemptPath(jobUUID, dest), "tasks");
   }
 
   /**
@@ -102,48 +101,56 @@ public final class CommitUtilsWithMR {
    * This path is marked as a base path for relocations, so subdirectory
    * information is preserved.
    * @param context the context of the task attempt.
+   * @param jobUUID unique Job ID.
    * @param dest The output path to commit work into
    * @return the path where a task attempt should be stored.
    */
   public static Path getMagicTaskAttemptPath(TaskAttemptContext context,
+      String jobUUID,
       Path dest) {
-    return new Path(getBaseMagicTaskAttemptPath(context, dest), BASE);
+    return new Path(getBaseMagicTaskAttemptPath(context, jobUUID, dest),
+        BASE);
   }
 
   /**
    * Get the base Magic attempt path, without any annotations to mark relative
    * references.
    * @param context task context.
+   * @param jobUUID unique Job ID.
    * @param dest The output path to commit work into
    * @return the path under which all attempts go
    */
   public static Path getBaseMagicTaskAttemptPath(TaskAttemptContext context,
+      String jobUUID,
       Path dest) {
-    return new Path(getMagicTaskAttemptsPath(context, dest),
+    return new Path(getMagicTaskAttemptsPath(jobUUID, dest),
           String.valueOf(context.getTaskAttemptID()));
   }
 
   /**
    * Compute a path for temporary data associated with a job.
    * This data is <i>not magic</i>
-   * @param appAttemptId the ID of the application attempt for this job.
+   * @param jobUUID unique Job ID.
    * @param out output directory of job
    * @return the path to store temporary job attempt data.
    */
-  public static Path getTempJobAttemptPath(int appAttemptId, Path out) {
+  public static Path getTempJobAttemptPath(String jobUUID,
+      Path out) {
     return new Path(new Path(out, TEMP_DATA),
-        formatAppAttemptDir(appAttemptId));
+        formatAppAttemptDir(jobUUID));
   }
 
   /**
-   * Compute the path where the output of a given job attempt will be placed.
+   * Compute the path where the output of a given task attempt will be placed.
    * @param context task context
+   * @param jobUUID unique Job ID.
    * @param out output directory of job
    * @return the path to store temporary job attempt data.
    */
   public static Path getTempTaskAttemptPath(TaskAttemptContext context,
-      Path out) {
-    return new Path(getTempJobAttemptPath(getAppAttemptId(context), out),
+      final String jobUUID, Path out) {
+    return new Path(
+        getTempJobAttemptPath(jobUUID, out),
         String.valueOf(context.getTaskAttemptID()));
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java
index 2821fce..461c9a5 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java
@@ -46,8 +46,14 @@ public final class InternalCommitterConstants {
   /**
    * A unique identifier to use for this work: {@value}.
    */
-  public static final String FS_S3A_COMMITTER_STAGING_UUID =
-      "fs.s3a.committer.staging.uuid";
+  public static final String FS_S3A_COMMITTER_UUID =
+      "fs.s3a.committer.uuid";
+
+  /**
+   * Where did the UUID come from? {@value}.
+   */
+  public static final String FS_S3A_COMMITTER_UUID_SOURCE =
+      "fs.s3a.committer.uuid.source";
 
   /**
    * Directory committer factory: {@value}.
@@ -97,4 +103,25 @@ public final class InternalCommitterConstants {
   /** Error message for a path without a magic element in the list: {@value}. */
   public static final String E_NO_MAGIC_PATH_ELEMENT
       = "No " + MAGIC + " element in path";
+
+  /**
+   * The UUID for jobs: {@value}.
+   * This was historically created in Spark 1.x's SQL queries, but "went away".
+   */
+  public static final String SPARK_WRITE_UUID =
+      "spark.sql.sources.writeJobUUID";
+
+  /**
+   * Java temp dir: {@value}.
+   */
+  public static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
+
+  /**
+   * Incoming Job/task configuration didn't contain any option
+   * {@link #SPARK_WRITE_UUID}.
+   */
+  public static final String E_NO_SPARK_UUID =
+      "Job/task context does not contain a unique ID in "
+          + SPARK_WRITE_UUID;
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
index 4793b78..8ad0342 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.commit.ValidationFailure;
@@ -56,7 +57,7 @@ public class PendingSet extends PersistentCommitData {
    * If this is changed the value of {@link #serialVersionUID} will change,
    * to avoid deserialization problems.
    */
-  public static final int VERSION = 1;
+  public static final int VERSION = 2;
 
   /**
    * Serialization ID: {@value}.
@@ -67,6 +68,9 @@ public class PendingSet extends PersistentCommitData {
   /** Version marker. */
   private int version = VERSION;
 
+  /** Job ID, if known. */
+  private String jobId = "";
+
   /**
    * Commit list.
    */
@@ -111,6 +115,19 @@ public class PendingSet extends PersistentCommitData {
   }
 
   /**
+   * Load an instance from a file, then validate it.
+   * @param fs filesystem
+   * @param status status of file to load
+   * @return the loaded instance
+   * @throws IOException IO failure
+   * @throws ValidationFailure if the data is invalid
+   */
+  public static PendingSet load(FileSystem fs, FileStatus status)
+      throws IOException {
+    return load(fs, status.getPath());
+  }
+
+  /**
    * Add a commit.
    * @param commit the single commit
    */
@@ -198,4 +215,14 @@ public class PendingSet extends PersistentCommitData {
   public void putExtraData(String key, String value) {
     extraData.put(key, value);
   }
+
+  /** @return Job ID, if known. */
+  public String getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(String jobId) {
+    this.jobId = jobId;
+  }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java
index cc27d07..dba44b9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java
@@ -40,7 +40,7 @@ public abstract class PersistentCommitData implements Serializable {
    * If this is changed the value of {@code serialVersionUID} will change,
    * to avoid deserialization problems.
    */
-  public static final int VERSION = 1;
+  public static final int VERSION = 2;
 
   /**
    * Validate the data: those fields which must be non empty, must be set.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
index c848f80..f97f243 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
@@ -207,7 +207,7 @@ public class SinglePendingCommit extends PersistentCommitData
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(
-        "DelayedCompleteData{");
+        "SinglePendingCommit{");
     sb.append("version=").append(version);
     sb.append(", uri='").append(uri).append('\'');
     sb.append(", destination='").append(destinationKey).append('\'');
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
index e0273fa..b7509d6 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
@@ -68,7 +68,7 @@ public class SuccessData extends PersistentCommitData {
   /**
    * Serialization ID: {@value}.
    */
-  private static final long serialVersionUID = 507133045258460084L;
+  private static final long serialVersionUID = 507133045258460083L + VERSION;
 
   /**
    * Name to include in persisted data, so as to differentiate from
@@ -103,6 +103,14 @@ public class SuccessData extends PersistentCommitData {
    */
   private String description;
 
+  /** Job ID, if known. */
+  private String jobId = "";
+
+  /**
+   * Source of the job ID.
+   */
+  private String jobIdSource = "";
+
   /**
    * Metrics.
    */
@@ -325,4 +333,21 @@ public class SuccessData extends PersistentCommitData {
   public void addDiagnostic(String key, String value) {
     diagnostics.put(key, value);
   }
+
+  /** @return Job ID, if known. */
+  public String getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(String jobId) {
+    this.jobId = jobId;
+  }
+
+  public String getJobIdSource() {
+    return jobIdSource;
+  }
+
+  public void setJobIdSource(final String jobIdSource) {
+    this.jobIdSource = jobIdSource;
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
index 30417ea..b330cee 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
@@ -97,6 +97,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
   public void setupJob(JobContext context) throws IOException {
     try (DurationInfo d = new DurationInfo(LOG,
         "Setup Job %s", jobIdString(context))) {
+      super.setupJob(context);
       Path jobAttemptPath = getJobAttemptPath(context);
       getDestinationFS(jobAttemptPath,
           context.getConfiguration()).mkdirs(jobAttemptPath);
@@ -131,16 +132,6 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
     }
   }
 
-  @Override
-  public void setupTask(TaskAttemptContext context) throws IOException {
-    try (DurationInfo d = new DurationInfo(LOG,
-        "Setup Task %s", context.getTaskAttemptID())) {
-      Path taskAttemptPath = getTaskAttemptPath(context);
-      FileSystem fs = taskAttemptPath.getFileSystem(getConf());
-      fs.mkdirs(taskAttemptPath);
-    }
-  }
-
   /**
    * Did this task write any files in the work directory?
    * Probes for a task existing by looking to see if the attempt dir exists.
@@ -208,13 +199,14 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
       throw failures.get(0).getValue();
     }
     // patch in IDs
-    String jobId = String.valueOf(context.getJobID());
+    String jobId = getUUID();
     String taskId = String.valueOf(context.getTaskAttemptID());
     for (SinglePendingCommit commit : pendingSet.getCommits()) {
       commit.setJobId(jobId);
       commit.setTaskId(taskId);
     }
     pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId);
+    pendingSet.setJobId(jobId);
     Path jobAttemptPath = getJobAttemptPath(context);
     TaskAttemptID taskAttemptID = context.getTaskAttemptID();
     Path taskOutcomePath = new Path(jobAttemptPath,
@@ -259,11 +251,12 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
 
   /**
    * Compute the path where the output of a given job attempt will be placed.
+   * For the magic committer, the path includes the job UUID.
    * @param appAttemptId the ID of the application attempt for this job.
    * @return the path to store job attempt data.
    */
   protected Path getJobAttemptPath(int appAttemptId) {
-    return getMagicJobAttemptPath(appAttemptId, getOutputPath());
+    return getMagicJobAttemptPath(getUUID(), getOutputPath());
   }
 
   /**
@@ -274,12 +267,12 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
    * @return the path where a task attempt should be stored.
    */
   public Path getTaskAttemptPath(TaskAttemptContext context) {
-    return getMagicTaskAttemptPath(context, getOutputPath());
+    return getMagicTaskAttemptPath(context, getUUID(), getOutputPath());
   }
 
   @Override
   protected Path getBaseTaskAttemptPath(TaskAttemptContext context) {
-    return getBaseMagicTaskAttemptPath(context, getOutputPath());
+    return getBaseMagicTaskAttemptPath(context, getUUID(), getOutputPath());
   }
 
   /**
@@ -289,13 +282,16 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
    * @return a path for temporary data.
    */
   public Path getTempTaskAttemptPath(TaskAttemptContext context) {
-    return CommitUtilsWithMR.getTempTaskAttemptPath(context, getOutputPath());
+    return CommitUtilsWithMR.getTempTaskAttemptPath(context,
+        getUUID(),
+        getOutputPath());
   }
 
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(
         "MagicCommitter{");
+    sb.append(super.toString());
     sb.append('}');
     return sb.toString();
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
index 7be5406..214c7ab 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
@@ -198,8 +198,9 @@ public class PartitionedStagingCommitter extends StagingCommitter {
           .stopOnFailure()
           .suppressExceptions(false)
           .executeWith(submitter)
-          .run(path -> {
-            PendingSet pendingSet = PendingSet.load(sourceFS, path);
+          .run(status -> {
+            PendingSet pendingSet = PendingSet.load(sourceFS,
+                status);
             Path lastParent = null;
             for (SinglePendingCommit commit : pendingSet.getCommits()) {
               Path parent = commit.destinationPath().getParent();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
index a941572..ceb03a3 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.JAVA_IO_TMPDIR;
 import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*;
 
 /**
@@ -141,14 +142,18 @@ public final class Paths {
   }
 
   /**
-   * A cache of temporary folders. There's a risk here that the cache
-   * gets too big
+   * A cache of temporary folders, using a generated ID which must be unique for
+   * each active task attempt.
    */
-  private static Cache<TaskAttemptID, Path> tempFolders = CacheBuilder
+  private static Cache<String, Path> tempFolders = CacheBuilder
       .newBuilder().build();
 
   /**
    * Get the task attempt temporary directory in the local filesystem.
+   * This must be unique to all tasks on all jobs running on all processes
+   * on this host.
+   * It's constructed as uuid+task-attempt-ID, relying on UUID to be unique
+   * for each job.
    * @param conf configuration
    * @param uuid some UUID, such as a job UUID
    * @param attemptID attempt ID
@@ -162,10 +167,11 @@ public final class Paths {
     try {
       final LocalDirAllocator allocator =
           new LocalDirAllocator(Constants.BUFFER_DIR);
-      return tempFolders.get(attemptID,
+      String name = uuid + "-" + attemptID;
+      return tempFolders.get(name,
           () -> {
             return FileSystem.getLocal(conf).makeQualified(
-                allocator.getLocalPathForWrite(uuid, conf));
+                allocator.getLocalPathForWrite(name, conf));
           });
     } catch (ExecutionException | UncheckedExecutionException e) {
       Throwable cause = e.getCause();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 9cc932b..53f811f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -26,7 +26,6 @@ import java.util.Locale;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +45,6 @@ import org.apache.hadoop.fs.s3a.commit.Tasks;
 import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.util.DurationInfo;
@@ -55,7 +53,6 @@ import static com.google.common.base.Preconditions.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.Invoker.*;
-import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
 import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
@@ -95,7 +92,6 @@ public class StagingCommitter extends AbstractS3ACommitter {
   public static final String NAME = "staging";
   private final Path constructorOutputPath;
   private final long uploadPartSize;
-  private final String uuid;
   private final boolean uniqueFilenames;
   private final FileOutputCommitter wrappedCommitter;
 
@@ -118,15 +114,14 @@ public class StagingCommitter extends AbstractS3ACommitter {
     Configuration conf = getConf();
     this.uploadPartSize = conf.getLongBytes(
         MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
-    this.uuid = getUploadUUID(conf, context.getJobID());
     this.uniqueFilenames = conf.getBoolean(
         FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
         DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES);
-    setWorkPath(buildWorkPath(context, uuid));
+    setWorkPath(buildWorkPath(context, getUUID()));
     this.wrappedCommitter = createWrappedCommitter(context, conf);
     setOutputPath(constructorOutputPath);
     Path finalOutputPath = getOutputPath();
-    Preconditions.checkNotNull(finalOutputPath, "Output path cannot be null");
+    checkNotNull(finalOutputPath, "Output path cannot be null");
     S3AFileSystem fs = getS3AFileSystem(finalOutputPath,
         context.getConfiguration(), false);
     s3KeyPrefix = fs.pathToKey(finalOutputPath);
@@ -156,7 +151,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
 
     // explicitly choose commit algorithm
     initFileOutputCommitterOptions(context);
-    commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf, uuid);
+    commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf,
+        getUUID());
     return new FileOutputCommitter(commitsDirectory, context);
   }
 
@@ -175,7 +171,10 @@ public class StagingCommitter extends AbstractS3ACommitter {
   public String toString() {
     final StringBuilder sb = new StringBuilder("StagingCommitter{");
     sb.append(super.toString());
+    sb.append(", commitsDirectory=").append(commitsDirectory);
+    sb.append(", uniqueFilenames=").append(uniqueFilenames);
     sb.append(", conflictResolution=").append(conflictResolution);
+    sb.append(", uploadPartSize=").append(uploadPartSize);
     if (wrappedCommitter != null) {
       sb.append(", wrappedCommitter=").append(wrappedCommitter);
     }
@@ -184,40 +183,6 @@ public class StagingCommitter extends AbstractS3ACommitter {
   }
 
   /**
-   * Get the UUID of an upload; may be the job ID.
-   * Spark will use a fake app ID based on the current minute and job ID 0.
-   * To avoid collisions, the key policy is:
-   * <ol>
-   *   <li>Value of {@link InternalCommitterConstants#FS_S3A_COMMITTER_STAGING_UUID}.</li>
-   *   <li>Value of {@code "spark.sql.sources.writeJobUUID"}.</li>
-   *   <li>Value of {@code "spark.app.id"}.</li>
-   *   <li>JobId passed in.</li>
-   * </ol>
-   * The staging UUID is set in in {@link #setupJob(JobContext)} and so will
-   * be valid in all sequences where the job has been set up for the
-   * configuration passed in.
-   * @param conf job/task configuration
-   * @param jobId Job ID
-   * @return an ID for use in paths.
-   */
-  public static String getUploadUUID(Configuration conf, String jobId) {
-    return conf.getTrimmed(
-        InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID,
-        conf.getTrimmed(SPARK_WRITE_UUID,
-            conf.getTrimmed(SPARK_APP_ID, jobId)));
-  }
-
-  /**
-   * Get the UUID of a Job.
-   * @param conf job/task configuration
-   * @param jobId Job ID
-   * @return an ID for use in paths.
-   */
-  public static String getUploadUUID(Configuration conf, JobID jobId) {
-    return getUploadUUID(conf, jobId.toString());
-  }
-
-  /**
    * Get the work path for a task.
    * @param context job/task complex
    * @param uuid UUID
@@ -309,7 +274,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
    * @return the location of pending job attempts.
    */
   private static Path getPendingJobAttemptsPath(Path out) {
-    Preconditions.checkNotNull(out, "Null 'out' path");
+    checkNotNull(out, "Null 'out' path");
     return new Path(out, TEMPORARY);
   }
 
@@ -330,12 +295,12 @@ public class StagingCommitter extends AbstractS3ACommitter {
    * @param context task context
    */
   private static void validateContext(TaskAttemptContext context) {
-    Preconditions.checkNotNull(context, "null context");
-    Preconditions.checkNotNull(context.getTaskAttemptID(),
+    checkNotNull(context, "null context");
+    checkNotNull(context.getTaskAttemptID(),
         "null task attempt ID");
-    Preconditions.checkNotNull(context.getTaskAttemptID().getTaskID(),
+    checkNotNull(context.getTaskAttemptID().getTaskID(),
         "null task ID");
-    Preconditions.checkNotNull(context.getTaskAttemptID().getJobID(),
+    checkNotNull(context.getTaskAttemptID().getJobID(),
         "null job ID");
   }
 
@@ -377,7 +342,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
 
     // get files on the local FS in the attempt path
     Path attemptPath = getTaskAttemptPath(context);
-    Preconditions.checkNotNull(attemptPath,
+    checkNotNull(attemptPath,
         "No attemptPath path in {}", this);
 
     LOG.debug("Scanning {} for files to commit", attemptPath);
@@ -401,7 +366,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
    */
   protected String getFinalKey(String relative, JobContext context) {
     if (uniqueFilenames) {
-      return getS3KeyPrefix(context) + "/" + Paths.addUUID(relative, uuid);
+      return getS3KeyPrefix(context) + "/"
+          + Paths.addUUID(relative, getUUID());
     } else {
       return getS3KeyPrefix(context) + "/" + relative;
     }
@@ -452,11 +418,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
    */
   @Override
   public void setupJob(JobContext context) throws IOException {
-    LOG.debug("{}, Setting up job {}", getRole(), jobIdString(context));
-    context.getConfiguration().set(
-        InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, uuid);
-    wrappedCommitter.setupJob(context);
     super.setupJob(context);
+    wrappedCommitter.setupJob(context);
   }
 
   /**
@@ -540,19 +503,6 @@ public class StagingCommitter extends AbstractS3ACommitter {
   }
 
   @Override
-  protected void abortPendingUploadsInCleanup(boolean suppressExceptions)
-      throws IOException {
-    if (getConf()
-        .getBoolean(FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS, true)) {
-      super.abortPendingUploadsInCleanup(suppressExceptions);
-    } else {
-      LOG.info("Not cleanup up pending uploads to {} as {} is false ",
-          getOutputPath(),
-          FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS);
-    }
-  }
-
-  @Override
   protected void abortJobInternal(JobContext context,
       boolean suppressExceptions) throws IOException {
     String r = getRole();
@@ -608,8 +558,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
     Path taskAttemptPath = getTaskAttemptPath(context);
     try (DurationInfo d = new DurationInfo(LOG,
         "%s: setup task attempt path %s ", getRole(), taskAttemptPath)) {
-      // create the local FS
-      taskAttemptPath.getFileSystem(getConf()).mkdirs(taskAttemptPath);
+      super.setupTask(context);
       wrappedCommitter.setupTask(context);
     }
   }
@@ -833,15 +782,6 @@ public class StagingCommitter extends AbstractS3ACommitter {
   }
 
   /**
-   * A UUID for this upload, as calculated with.
-   * {@link #getUploadUUID(Configuration, String)}
-   * @return the UUID for files
-   */
-  protected String getUUID() {
-    return uuid;
-  }
-
-  /**
    * Returns the {@link ConflictResolution} mode for this commit.
    *
    * @param context the JobContext for this commit
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java
index c41715b..ee2b9ec 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java
@@ -46,19 +46,4 @@ public final class StagingCommitterConstants {
    */
   public static final String STAGING_UPLOADS = "staging-uploads";
 
-  // Spark configuration keys
-
-  /**
-   * The UUID for jobs: {@value}.
-   */
-  public static final String SPARK_WRITE_UUID =
-      "spark.sql.sources.writeJobUUID";
-
-  /**
-   * The App ID for jobs.
-   */
-
-  public static final String SPARK_APP_ID = "spark.app.id";
-
-  public static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
 }
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
index dee77b6..0a65786 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
@@ -530,26 +530,33 @@ Amazon S3, that means S3Guard must *always* be enabled.
 
 Conflict management is left to the execution engine itself.
 
-## Committer Configuration Options
+## Common Committer Options
 
 
-| Option | Magic | Directory | Partitioned | Meaning | Default |
-|--------|-------|-----------|-------------|---------|---------|
-| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | X | X | X | Write a `_SUCCESS` file  at the end of each job | `true` |
-| `fs.s3a.committer.threads` | X | X | X | Number of threads in committers for parallel operations on files. | 8 |
-| `fs.s3a.committer.staging.conflict-mode` |  | X | X | Conflict resolution: `fail`, `append` or `replace`| `append` |
-| `fs.s3a.committer.staging.unique-filenames` |  | X | X | Generate unique filenames | `true` |
-| `fs.s3a.committer.magic.enabled` | X |  | | Enable "magic committer" support in the filesystem | `false` |
+| Option | Meaning | Default |
+|--------|---------|---------|
+| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | Write a `_SUCCESS` file on the successful completion of the job. | `true` |
+| `fs.s3a.buffer.dir` | Local filesystem directory for data being written and/or staged. | `${hadoop.tmp.dir}/s3a` |
+| `fs.s3a.committer.magic.enabled` | Enable "magic committer" support in the filesystem. | `false` |
+| `fs.s3a.committer.abort.pending.uploads` | list and abort all pending uploads under the destination path when the job is committed or aborted. | `true` |
+| `fs.s3a.committer.threads` | Number of threads in committers for parallel operations on files. | 8 |
+| `fs.s3a.committer.generate.uuid` | Generate a Job UUID if none is passed down from Spark | `false` |
+| `fs.s3a.committer.require.uuid` |Require the Job UUID to be passed down from Spark | `false` |
 
 
+## Staging committer (Directory and Partitioned) options
 
 
-| Option | Magic | Directory | Partitioned | Meaning | Default |
-|--------|-------|-----------|-------------|---------|---------|
-| `fs.s3a.buffer.dir` | X | X | X | Local filesystem directory for data being written and/or staged. | |
-| `fs.s3a.committer.staging.tmp.path` |  | X | X | Path in the cluster filesystem for temporary data | `tmp/staging` |
+| Option | Meaning | Default |
+|--------|---------|---------|
+| `fs.s3a.committer.staging.conflict-mode` | Conflict resolution: `fail`, `append` or `replace`| `append` |
+| `fs.s3a.committer.staging.tmp.path` | Path in the cluster filesystem for temporary data. | `tmp/staging` |
+| `fs.s3a.committer.staging.unique-filenames` | Generate unique filenames. | `true` |
+| `fs.s3a.committer.staging.abort.pending.uploads` | Deprecated; replaced by `fs.s3a.committer.abort.pending.uploads`. |  `(false)` |
 
 
+### Common Committer Options
+
 ```xml
 <property>
   <name>fs.s3a.committer.name</name>
@@ -580,6 +587,60 @@ Conflict management is left to the execution engine itself.
 </property>
 
 <property>
+  <name>fs.s3a.committer.abort.pending.uploads</name>
+  <value>true</value>
+  <description>
+    Should the committers abort all pending uploads to the destination
+    directory?
+
+    Set to false if more than one job is writing to the same directory tree.
+    Was:  "fs.s3a.committer.staging.abort.pending.uploads" when only used
+    by the staging committers.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.outputcommitter.factory.scheme.s3a</name>
+  <value>org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory</value>
+  <description>
+    The committer factory to use when writing data to S3A filesystems.
+    If mapreduce.outputcommitter.factory.class is set, it will
+    override this property.
+
+    (This property is set in mapred-default.xml)
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.require.uuid</name>
+  <value>false</value>
+  <description>
+    Should the committer fail to initialize if a unique ID isn't set in
+    "spark.sql.sources.writeJobUUID" or fs.s3a.committer.staging.uuid
+    This helps guarantee that unique IDs for jobs are being
+    passed down in spark applications.
+  
+    Setting this option outside of spark will stop the S3A committer
+    in job setup. In MapReduce workloads the job attempt ID is unique
+    and so no unique ID need be passed down.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.generate.uuid</name>
+  <value>false</value>
+  <description>
+    Generate a Job UUID if none is passed down from Spark.
+    This uuid is only generated if the fs.s3a.committer.require.uuid flag
+    is false. 
+  </description>
+</property>
+```
+
+### Staging Committer Options
+
+```xml
+<property>
   <name>fs.s3a.committer.staging.tmp.path</name>
   <value>tmp/staging</value>
   <description>
@@ -613,38 +674,45 @@ Conflict management is left to the execution engine itself.
   </description>
 </property>
 
-<property>
-  <name>s.s3a.committer.staging.abort.pending.uploads</name>
-  <value>true</value>
-  <description>
-    Should the staging committers abort all pending uploads to the destination
-    directory?
 
-    Changing this if more than one partitioned committer is
-    writing to the same destination tree simultaneously; otherwise
-    the first job to complete will cancel all outstanding uploads from the
-    others. However, it may lead to leaked outstanding uploads from failed
-    tasks. If disabled, configure the bucket lifecycle to remove uploads
-    after a time period, and/or set up a workflow to explicitly delete
-    entries. Otherwise there is a risk that uncommitted uploads may run up
-    bills.
-  </description>
-</property>
+```
 
-<property>
-  <name>mapreduce.outputcommitter.factory.scheme.s3a</name>
-  <value>org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory</value>
-  <description>
-    The committer factory to use when writing data to S3A filesystems.
-    If mapreduce.outputcommitter.factory.class is set, it will
-    override this property.
+## <a name="concurrent-jobs"></a> Concurrent Jobs writing to the same destination
 
-    (This property is set in mapred-default.xml)
-  </description>
+It is sometimes possible for multiple jobs to simultaneously write to the same destination path.
+
+Before attempting this, the committers must be set to not delete all incomplete uploads on job commit,
+by setting `fs.s3a.committer.abort.pending.uploads` to `false`
+
+```xml
+<property>
+  <name>fs.s3a.committer.abort.pending.uploads</name>
+  <value>false</value>
 </property>
+```
+
+If more than one job is writing to the same destination path then every task MUST
+be creating files with paths/filenames unique to the specific job.
+It is not enough for them to be unique by task `part-00000.snappy.parquet`,
+because each job will have tasks with the same name, so generate files with conflicting operations.
 
+For the staging committers, setting `fs.s3a.committer.staging.unique-filenames` to ensure unique names are
+generated during the upload. Otherwise, use what configuration options are available in the specific `FileOutputFormat`.
+
+Note: by default, the option `mapreduce.output.basename` sets the base name for files;
+changing that from the default `part` value to something unique for each job may achieve this.
+
+For example, for any job executed through Hadoop MapReduce, the Job ID can be used in the filename.
+
+```xml
+<property>
+  <name>mapreduce.output.basename</name>
+  <value>part-${mapreduce.job.id}</value>
+</property>
 ```
 
+Even with these settings, the outcome of concurrent jobs to the same destination is
+inherently nondeterministic -use with caution.
 
 ## Troubleshooting
 
@@ -700,7 +768,7 @@ Delegation token support is disabled
  Exiting with status 46: 46: The magic committer is not enabled for s3a://landsat-pds
 ```
 
-## Error message: "File being created has a magic path, but the filesystem has magic file support disabled:
+### Error message: "File being created has a magic path, but the filesystem has magic file support disabled"
 
 A file is being written to a path which is used for "magic" files,
 files which are actually written to a different destination than their stated path
@@ -781,7 +849,7 @@ If you have subclassed `FileOutputCommitter` and want to move to the
 factory model, please get in touch.
 
 
-## Job/Task fails with PathExistsException: Destination path exists and committer conflict resolution mode is "fail"
+### Job/Task fails with PathExistsException: Destination path exists and committer conflict resolution mode is "fail"
 
 This surfaces when either of two conditions are met.
 
@@ -795,7 +863,7 @@ during task commit, which will cause the entire job to fail.
 If you are trying to write data and want write conflicts to be rejected, this is the correct
 behavior: there was data at the destination so the job was aborted.
 
-## Staging committer task fails with IOException: No space left on device
+### Staging committer task fails with IOException: No space left on device
 
 There's not enough space on the local hard disk (real or virtual)
 to store all the uncommitted data of the active tasks on that host.
@@ -821,3 +889,169 @@ generating less data each.
 1. Use the magic committer. This only needs enough disk storage to buffer
 blocks of the currently being written file during their upload process,
 so can use a lot less disk space.
+
+### Jobs run with directory/partitioned committers complete but the output is empty.
+
+Make sure that `fs.s3a.committer.staging.tmp.path` is set to a path on the shared cluster
+filesystem (usually HDFS). It MUST NOT be set to a local directory, as then the job committer,
+running on a different host *will not see the lists of pending uploads to commit*.
+
+### Magic output committer task fails "The specified upload does not exist" "Error Code: NoSuchUpload"
+
+The magic committer is being used and a task writing data to the S3 store fails
+with an error message about the upload not existing.
+
+```
+java.io.FileNotFoundException: upload part #1 upload
+    YWHTRqBaxlsutujKYS3eZHfdp6INCNXbk0JVtydX_qzL5fZcoznxRbbBZRfswOjomddy3ghRyguOqywJTfGG1Eq6wOW2gitP4fqWrBYMroasAygkmXNYF7XmUyFHYzja
+    on test/ITestMagicCommitProtocol-testParallelJobsToSameDestPaths/part-m-00000:
+    com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not
+    exist. The upload ID may be invalid, or the upload may have been aborted or
+    completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload;
+    Request ID: EBE6A0C9F8213AC3; S3 Extended Request ID:
+    cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=),
+    S3 Extended Request ID:
+    cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=:NoSuchUpload
+
+    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:259)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:112)
+    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:315)
+    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:407)
+    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:311)
+    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:286)
+    at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:154)
+    at org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:590)
+    at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.lambda$uploadBlockAsync$0(S3ABlockOutputStream.java:652)
+
+Caused by: com.amazonaws.services.s3.model.AmazonS3Exception:
+    The specified upload does not exist.
+    The upload ID may be invalid, or the upload may have been aborted or completed.
+    (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: EBE6A0C9F8213AC3; S3 Extended Request ID:
+    cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=),
+    S3 Extended Request ID: cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
+    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4920)
+    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4866)
+    at com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3715)
+    at com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3700)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:2343)
+    at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:594)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:110)
+    ... 15 more
+```
+
+The block write failed because the previously created upload was aborted before the data could be written.
+
+Causes
+
+1. Another job has written to the same directory tree with an S3A committer
+   -and when that job was committed, all incomplete uploads were aborted.
+1. The `hadoop s3guard uploads --abort` command has being called on/above the directory.
+1. Some other program is cancelling uploads to that bucket/path under it.
+1. The job is lasting over 24h and a bucket lifecycle policy is aborting the uploads.
+
+The `_SUCCESS` file from the previous job may provide diagnostics.
+
+If the cause is Concurrent Jobs, see [Concurrent Jobs writing to the same destination](#concurrent-jobs).
+
+### Job commit fails "java.io.FileNotFoundException: Completing multipart upload" "The specified upload does not exist"
+
+The job commit fails with an error about the specified upload not existing.
+
+```
+java.io.FileNotFoundException: Completing multipart upload on
+    test/DELAY_LISTING_ME/ITestDirectoryCommitProtocol-testParallelJobsToSameDestPaths/part-m-00001:
+    com.amazonaws.services.s3.model.AmazonS3Exception:
+    The specified upload does not exist.
+    The upload ID may be invalid, or the upload may have been aborted or completed.
+    (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload;
+    Request ID: 8E6173241D2970CB; S3 Extended Request ID:
+    Pg6x75Q60UrbSJgfShCFX7czFTZAHR1Cy7W0Kh+o1uj60CG9jw7hL40tSa+wa7BRLbaz3rhX8Ds=),
+    S3 Extended Request ID:
+    Pg6x75Q60UrbSJgfShCFX7czFTZAHR1Cy7W0Kh+o1uj60CG9jw7hL40tSa+wa7BRLbaz3rhX8Ds=:NoSuchUpload
+
+    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:259)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:112)
+    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:315)
+    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:407)
+    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:311)
+    at org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:261)
+    at org.apache.hadoop.fs.s3a.WriteOperationHelper.commitUpload(WriteOperationHelper.java:549)
+    at org.apache.hadoop.fs.s3a.commit.CommitOperations.innerCommit(CommitOperations.java:199)
+    at org.apache.hadoop.fs.s3a.commit.CommitOperations.commit(CommitOperations.java:168)
+    at org.apache.hadoop.fs.s3a.commit.CommitOperations.commitOrFail(CommitOperations.java:144)
+    at org.apache.hadoop.fs.s3a.commit.CommitOperations.access$100(CommitOperations.java:74)
+    at org.apache.hadoop.fs.s3a.commit.CommitOperations$CommitContext.commitOrFail(CommitOperations.java:612)
+    at org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.lambda$loadAndCommit$5(AbstractS3ACommitter.java:535)
+    at org.apache.hadoop.fs.s3a.commit.Tasks$Builder.runSingleThreaded(Tasks.java:164)
+    at org.apache.hadoop.fs.s3a.commit.Tasks$Builder.run(Tasks.java:149)
+    at org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.loadAndCommit(AbstractS3ACommitter.java:534)
+    at org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.lambda$commitPendingUploads$2(AbstractS3ACommitter.java:482)
+    at org.apache.hadoop.fs.s3a.commit.Tasks$Builder$1.run(Tasks.java:253)
+
+Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist.
+    The upload ID may be invalid, or the upload may have been aborted or completed.
+    (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: 8E6173241D2970CB;
+    S3 Extended Request ID: Pg6x75Q60UrbSJgfShCFX7czFTZAHR1Cy7W0Kh+o1uj60CG9jw7hL40tSa+wa7BRLbaz3rhX8Ds=),
+
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
+    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4920)
+    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4866)
+    at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:3464)
+    at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$finalizeMultipartUpload$1(WriteOperationHelper.java:267)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:110)
+```
+
+The problem is likely to be that of the previous one: concurrent jobs are writing the same output directory,
+or another program has cancelled all pending uploads.
+
+See [Concurrent Jobs writing to the same destination](#concurrent-jobs).
+
+### Job commit fails `java.io.FileNotFoundException` "File hdfs://.../staging-uploads/_temporary/0 does not exist"
+
+The Staging committer will fail in job commit if the intermediate directory on the cluster FS is missing during job commit.
+
+This is possible if another job used the same staging upload directory and,
+ after committing its work, it deleted the directory.
+
+A unique Job ID is required for each spark job run by a specific user.
+Spark generates job IDs for its committers using the current timestamp,
+and if two jobs/stages are started in the same second, they will have the same job ID.
+
+See [SPARK-33230](https://issues.apache.org/jira/browse/SPARK-33230).
+
+This is fixed in all spark releases which have the patch applied.
+
+You can set the property `fs.s3a.committer.staging.require.uuid` to fail
+the staging committers fast if a unique Job ID isn't found in
+`spark.sql.sources.writeJobUUID`.
+
+### Job setup fails `Job/task context does not contain a unique ID in spark.sql.sources.writeJobUUID`
+
+This will surface in job setup if the option `fs.s3a.committer.require.uuid` is `true`, and
+one of the following conditions are met
+
+1. The committer is being used in a Hadoop MapReduce job, whose job attempt ID is unique
+   -there is no need to add this requirement.
+   Fix: unset `fs.s3a.committer.require.uuid`.
+1. The committer is being used in spark, and the version of spark being used does not
+   set the `spark.sql.sources.writeJobUUID` property.
+   Either upgrade to a new spark release, or set `fs.s3a.committer.generate.uuid` to true.
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index c611ad1..3c5dfce 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -248,6 +248,57 @@ As an example, the endpoint for S3 Frankfurt is `s3.eu-central-1.amazonaws.com`:
 </property>
 ```
 
+### `Class does not implement AWSCredentialsProvider`
+
+A credential provider listed in `fs.s3a.aws.credentials.provider` does not implement
+the interface `com.amazonaws.auth.AWSCredentialsProvider`.
+
+```
+  Cause: java.lang.RuntimeException: java.io.IOException: Class class com.amazonaws.auth.EnvironmentVariableCredentialsProvider does not implement AWSCredentialsProvider
+  at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:686)
+  at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:621)
+  at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:219)
+  at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:126)
+  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
+  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
+  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
+  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
+  at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:306)
+  at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:433)
+  ...
+  Cause: java.io.IOException: Class class com.amazonaws.auth.EnvironmentVariableCredentialsProvider does not implement AWSCredentialsProvider
+  at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProvider(S3AUtils.java:722)
+  at org.apache.hadoop.fs.s3a.S3AUtils.buildAWSProviderList(S3AUtils.java:687)
+  at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:620)
+  at org.apache.hadoop.fs.s3a.S3AFileSystem.bindAWSClient(S3AFileSystem.java:673)
+  at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:414)
+  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3462)
+  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:171)
+  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3522)
+  at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:3496)
+  at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:591)
+```
+
+There's two main causes
+
+1. A class listed there is not an implementation of the interface.
+   Fix: review the settings and correct as appropriate.
+1. A class listed there does implement the interface, but it has been loaded in a different
+   classloader, so the JVM does not consider it to be an implementation.
+   Fix: learn the entire JVM classloader model and see if you can then debug it.
+   Tip: having both the AWS Shaded SDK and individual AWS SDK modules on your classpath
+   may be a cause of this.
+
+If you see this and you are trying to use the S3A connector with Spark, then the cause can
+be that the isolated classloader used to load Hive classes is interfering with the S3A
+connector's dynamic loading of `com.amazonaws` classes. To fix this, declare that that
+the classes in the aws SDK are loaded from the same classloader which instantiated
+the S3A FileSystem instance:
+
+```
+spark.sql.hive.metastore.sharedPrefixes com.amazonaws.
+```
+
 ## <a name="access_denied"></a> "The security token included in the request is invalid"
 
 You are trying to use session/temporary credentials and the session token
@@ -1262,11 +1313,11 @@ Number of parts in multipart upload exceeded
 ```
 org.apache.hadoop.fs.PathIOException: `test/testMultiPartUploadFailure': Number of parts in multipart upload exceeded. Current part count = X, Part count limit = Y
 
-	at org.apache.hadoop.fs.s3a.WriteOperationHelper.newUploadPartRequest(WriteOperationHelper.java:432)
-	at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.uploadBlockAsync(S3ABlockOutputStream.java:627)
-	at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.access$000(S3ABlockOutputStream.java:532)
-	at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.uploadCurrentBlock(S3ABlockOutputStream.java:316)
-	at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.write(S3ABlockOutputStream.java:301)
+    at org.apache.hadoop.fs.s3a.WriteOperationHelper.newUploadPartRequest(WriteOperationHelper.java:432)
+    at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.uploadBlockAsync(S3ABlockOutputStream.java:627)
+    at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.access$000(S3ABlockOutputStream.java:532)
+    at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.uploadCurrentBlock(S3ABlockOutputStream.java:316)
+    at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.write(S3ABlockOutputStream.java:301)
 ```
 
 This is a known issue where upload fails if number of parts
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
index 1cf3fb4..9947ece 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -359,11 +359,13 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
    * and that it can be loaded.
    * The contents will be logged and returned.
    * @param dir directory to scan
+   * @param jobId job ID, only verified if non-empty
    * @return the loaded success data
    * @throws IOException IO Failure
    */
-  protected SuccessData verifySuccessMarker(Path dir) throws IOException {
-    return validateSuccessFile(dir, "", getFileSystem(), "query", 0);
+  protected SuccessData verifySuccessMarker(Path dir, String jobId)
+      throws IOException {
+    return validateSuccessFile(dir, "", getFileSystem(), "query", 0, jobId);
   }
 
   /**
@@ -442,6 +444,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
    * @param fs filesystem
    * @param origin origin (e.g. "teragen" for messages)
    * @param minimumFileCount minimum number of files to have been created
+   * @param jobId job ID, only verified if non-empty
    * @return the success data
    * @throws IOException IO failure
    */
@@ -449,7 +452,8 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
       final String committerName,
       final S3AFileSystem fs,
       final String origin,
-      final int minimumFileCount) throws IOException {
+      final int minimumFileCount,
+      final String jobId) throws IOException {
     SuccessData successData = loadSuccessFile(fs, outputPath, origin);
     String commitDetails = successData.toString();
     LOG.info("Committer name " + committerName + "\n{}",
@@ -463,8 +467,13 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
           committerName, successData.getCommitter());
     }
     Assertions.assertThat(successData.getFilenames())
-        .describedAs("Files committed")
+        .describedAs("Files committed in " + commitDetails)
         .hasSizeGreaterThanOrEqualTo(minimumFileCount);
+    if (StringUtils.isNotEmpty(jobId)) {
+      Assertions.assertThat(successData.getJobId())
+          .describedAs("JobID in " + commitDetails)
+          .isEqualTo(jobId);
+    }
     return successData;
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
index 03759a5..89d505f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -41,6 +42,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapFile;
@@ -69,7 +71,12 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.E_SELF_GENERATED_JOB_UUID;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
 import static org.apache.hadoop.test.LambdaTestUtils.*;
 
 /**
@@ -377,6 +384,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     private final TaskAttemptContext tContext;
     private final AbstractS3ACommitter committer;
     private final Configuration conf;
+    private Path writtenTextPath; // null if not written to
 
     public JobData(Job job,
         JobContext jContext,
@@ -467,7 +475,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
 
     if (writeText) {
       // write output
-      writeTextOutput(tContext);
+      jobData.writtenTextPath = writeTextOutput(tContext);
     }
     return jobData;
   }
@@ -659,12 +667,14 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
    * file existence and contents, as well as optionally, the success marker.
    * @param dir directory to scan.
    * @param expectSuccessMarker check the success marker?
+   * @param expectedJobId job ID, verified if non-empty and success data loaded
    * @throws Exception failure.
    */
-  private void validateContent(Path dir, boolean expectSuccessMarker)
-      throws Exception {
+  private void validateContent(Path dir,
+      boolean expectSuccessMarker,
+      String expectedJobId) throws Exception {
     if (expectSuccessMarker) {
-      verifySuccessMarker(dir);
+      SuccessData successData = verifySuccessMarker(dir, expectedJobId);
     }
     Path expectedFile = getPart0000(dir);
     log().debug("Validating content in {}", expectedFile);
@@ -793,7 +803,8 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
 
     // validate output
     describe("4. Validating content");
-    validateContent(outDir, shouldExpectSuccessMarker());
+    validateContent(outDir, shouldExpectSuccessMarker(),
+        committer.getUUID());
     assertNoMultipartUploadsPending(outDir);
   }
 
@@ -810,7 +821,8 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     commit(committer, jContext, tContext);
 
     // validate output
-    validateContent(outDir, shouldExpectSuccessMarker());
+    validateContent(outDir, shouldExpectSuccessMarker(),
+        committer.getUUID());
 
     assertNoMultipartUploadsPending(outDir);
 
@@ -875,7 +887,8 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
 
     // validate output
     S3AFileSystem fs = getFileSystem();
-    SuccessData successData = validateSuccessFile(outDir, "", fs, "query", 1);
+    SuccessData successData = validateSuccessFile(outDir, "", fs, "query", 1,
+        "");
     Assertions.assertThat(successData.getFilenames())
         .describedAs("Files committed")
         .hasSize(1);
@@ -911,7 +924,8 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     commitJob(committer, jContext);
 
     // but the data got there, due to the order of operations.
-    validateContent(outDir, shouldExpectSuccessMarker());
+    validateContent(outDir, shouldExpectSuccessMarker(),
+        committer.getUUID());
     expectJobCommitToFail(jContext, committer);
   }
 
@@ -1007,7 +1021,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     describe("\nvalidating");
 
     // validate output
-    verifySuccessMarker(outDir);
+    verifySuccessMarker(outDir, committer.getUUID());
 
     describe("validate output of %s", outDir);
     validateMapFileOutputContent(fs, outDir);
@@ -1269,7 +1283,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
 
     // validate output
     // There's no success marker in the subdirectory
-    validateContent(outSubDir, false);
+    validateContent(outSubDir, false, "");
   }
 
   /**
@@ -1327,7 +1341,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     commitTask(committer, tContext);
     commitJob(committer, jContext);
     // validate output
-    verifySuccessMarker(outDir);
+    verifySuccessMarker(outDir, committer.getUUID());
   }
 
   /**
@@ -1387,7 +1401,9 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     assertNotEquals(job1Dest, job2Dest);
 
     // create the second job
-    Job job2 = newJob(job2Dest, new JobConf(getConfiguration()), attempt20);
+    Job job2 = newJob(job2Dest,
+        unsetUUIDOptions(new JobConf(getConfiguration())),
+        attempt20);
     Configuration conf2 = job2.getConfiguration();
     conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
     try {
@@ -1400,7 +1416,13 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
       setup(jobData2);
       abortInTeardown(jobData2);
       // make sure the directories are different
-      assertEquals(job2Dest, committer2.getOutputPath());
+      assertNotEquals("Committer output paths",
+          committer1.getOutputPath(),
+          committer2.getOutputPath());
+
+      assertNotEquals("job UUIDs",
+          committer1.getUUID(),
+          committer2.getUUID());
 
       // job2 setup, write some data there
       writeTextOutput(tContext2);
@@ -1430,6 +1452,259 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
 
   }
 
+
+  /**
+   * Run two jobs with the same destination and different output paths.
+   * <p></p>
+   * This only works if the jobs are set to NOT delete all outstanding
+   * uploads under the destination path.
+   * <p></p>
+   * See HADOOP-17318.
+   */
+  @Test
+  public void testParallelJobsToSameDestination() throws Throwable {
+
+    describe("Run two jobs to the same destination, assert they both complete");
+    Configuration conf = getConfiguration();
+    conf.setBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS, false);
+
+    // this job has a job ID generated and set as the spark UUID;
+    // the config is also set to require it.
+    // This mimics the Spark setup process.
+
+    String stage1Id = UUID.randomUUID().toString();
+    conf.set(SPARK_WRITE_UUID, stage1Id);
+    conf.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+
+    // create the job and write data in its task attempt
+    JobData jobData = startJob(true);
+    Job job1 = jobData.job;
+    AbstractS3ACommitter committer1 = jobData.committer;
+    JobContext jContext1 = jobData.jContext;
+    TaskAttemptContext tContext1 = jobData.tContext;
+    Path job1TaskOutputFile = jobData.writtenTextPath;
+
+    // the write path
+    Assertions.assertThat(committer1.getWorkPath().toString())
+        .describedAs("Work path path of %s", committer1)
+        .contains(stage1Id);
+    // now build up a second job
+    String jobId2 = randomJobId();
+
+    // second job will use same ID
+    String attempt2 = taskAttempt0.toString();
+    TaskAttemptID taskAttempt2 = taskAttempt0;
+
+    // create the second job
+    Configuration c2 = unsetUUIDOptions(new JobConf(conf));
+    c2.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+    Job job2 = newJob(outDir,
+        c2,
+        attempt2);
+    Configuration jobConf2 = job2.getConfiguration();
+    jobConf2.set("mapreduce.output.basename", "task2");
+    String stage2Id = UUID.randomUUID().toString();
+    jobConf2.set(SPARK_WRITE_UUID,
+        stage2Id);
+
+    JobContext jContext2 = new JobContextImpl(jobConf2,
+        taskAttempt2.getJobID());
+    TaskAttemptContext tContext2 =
+        new TaskAttemptContextImpl(jobConf2, taskAttempt2);
+    AbstractS3ACommitter committer2 = createCommitter(outDir, tContext2);
+    Assertions.assertThat(committer2.getJobAttemptPath(jContext2))
+        .describedAs("Job attempt path of %s", committer2)
+        .isNotEqualTo(committer1.getJobAttemptPath(jContext1));
+    Assertions.assertThat(committer2.getTaskAttemptPath(tContext2))
+        .describedAs("Task attempt path of %s", committer2)
+        .isNotEqualTo(committer1.getTaskAttemptPath(tContext1));
+    Assertions.assertThat(committer2.getWorkPath().toString())
+        .describedAs("Work path path of %s", committer2)
+        .isNotEqualTo(committer1.getWorkPath().toString())
+        .contains(stage2Id);
+    Assertions.assertThat(committer2.getUUIDSource())
+        .describedAs("UUID source of %s", committer2)
+        .isEqualTo(AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID);
+    JobData jobData2 = new JobData(job2, jContext2, tContext2, committer2);
+    setup(jobData2);
+    abortInTeardown(jobData2);
+
+    // the sequence is designed to ensure that job2 has active multipart
+    // uploads during/after job1's work
+
+    // if the committer is a magic committer, MPUs start in the write,
+    // otherwise in task commit.
+    boolean multipartInitiatedInWrite =
+        committer2 instanceof MagicS3GuardCommitter;
+
+    // job2. Here we start writing a file and have that write in progress
+    // when job 1 commits.
+
+    LoggingTextOutputFormat.LoggingLineRecordWriter<Object, Object>
+        recordWriter2 = new LoggingTextOutputFormat<>().getRecordWriter(
+            tContext2);
+
+    LOG.info("Commit Task 1");
+    commitTask(committer1, tContext1);
+
+    if (multipartInitiatedInWrite) {
+      // magic committer runs -commit job1 while a job2 TA has an open
+      // writer (and hence: open MP Upload)
+      LOG.info("With Multipart Initiated In Write: Commit Job 1");
+      commitJob(committer1, jContext1);
+    }
+
+    // job2/task writes its output to the destination and
+    // closes the file
+    writeOutput(recordWriter2, tContext2);
+
+    // get the output file
+    Path job2TaskOutputFile = recordWriter2.getDest();
+
+
+    // commit the second task
+    LOG.info("Commit Task 2");
+    commitTask(committer2, tContext2);
+
+    if (!multipartInitiatedInWrite) {
+      // if not a magic committer, commit the job now. Because at
+      // this point the staging committer tasks from job2 will be pending
+      LOG.info("With Multipart NOT Initiated In Write: Commit Job 1");
+      assertJobAttemptPathExists(committer1, jContext1);
+      commitJob(committer1, jContext1);
+    }
+
+    // run the warning scan code, which will find output.
+    // this can be manually reviewed in the logs to verify
+    // readability
+    committer2.warnOnActiveUploads(outDir);
+    // and second job
+    LOG.info("Commit Job 2");
+    assertJobAttemptPathExists(committer2, jContext2);
+    commitJob(committer2, jContext2);
+
+    // validate the output
+    Path job1Output = new Path(outDir, job1TaskOutputFile.getName());
+    Path job2Output = new Path(outDir, job2TaskOutputFile.getName());
+    assertNotEquals("Job output file filenames must be different",
+        job1Output, job2Output);
+
+    // job1 output must be there
+    assertPathExists("job 1 output", job1Output);
+    // job 2 file is there
+    assertPathExists("job 2 output", job2Output);
+
+    // and nothing is pending
+    assertNoMultipartUploadsPending(outDir);
+
+  }
+
+  /**
+   * Verify self-generated UUID logic.
+   * A committer used for job setup can also use it for task setup,
+   * but a committer which generated a job ID but was only
+   * used for task setup -that is rejected.
+   * Task abort will still work.
+   */
+  @Test
+  public void testSelfGeneratedUUID() throws Throwable {
+    describe("Run two jobs to the same destination, assert they both complete");
+    Configuration conf = getConfiguration();
+
+    unsetUUIDOptions(conf);
+    // job is set to generate UUIDs
+    conf.setBoolean(FS_S3A_COMMITTER_GENERATE_UUID, true);
+
+    // create the job. don't write anything
+    JobData jobData = startJob(false);
+    AbstractS3ACommitter committer = jobData.committer;
+    String uuid = committer.getUUID();
+    Assertions.assertThat(committer.getUUIDSource())
+        .describedAs("UUID source of %s", committer)
+        .isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally);
+
+    // examine the job configuration and verify that it has been updated
+    Configuration jobConf = jobData.conf;
+    Assertions.assertThat(jobConf.get(FS_S3A_COMMITTER_UUID, null))
+        .describedAs("Config option " + FS_S3A_COMMITTER_UUID)
+        .isEqualTo(uuid);
+    Assertions.assertThat(jobConf.get(FS_S3A_COMMITTER_UUID_SOURCE, null))
+        .describedAs("Config option " + FS_S3A_COMMITTER_UUID_SOURCE)
+        .isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally
+            .getText());
+
+    // because the task was set up in the job, it can have task
+    // setup called, even though it had a random ID.
+    committer.setupTask(jobData.tContext);
+
+    // but a new committer will not be set up
+    TaskAttemptContext tContext2 =
+        new TaskAttemptContextImpl(conf, taskAttempt1);
+    AbstractS3ACommitter committer2 = createCommitter(outDir, tContext2);
+    Assertions.assertThat(committer2.getUUIDSource())
+        .describedAs("UUID source of %s", committer2)
+        .isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally);
+    assertNotEquals("job UUIDs",
+        committer.getUUID(),
+        committer2.getUUID());
+    // Task setup MUST fail.
+    intercept(PathCommitException.class,
+        E_SELF_GENERATED_JOB_UUID, () -> {
+        committer2.setupTask(tContext2);
+        return committer2;
+      });
+    // task abort with the self-generated option is fine.
+    committer2.abortTask(tContext2);
+  }
+
+  /**
+   * Verify the option to require a UUID applies and
+   * when a committer is instantiated without those options,
+   * it fails early.
+   */
+  @Test
+  public void testRequirePropagatedUUID() throws Throwable {
+    Configuration conf = getConfiguration();
+
+    unsetUUIDOptions(conf);
+    conf.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+    conf.setBoolean(FS_S3A_COMMITTER_GENERATE_UUID, true);
+
+    // create the job, expect a failure, even if UUID generation
+    // is enabled.
+    intercept(PathCommitException.class, E_NO_SPARK_UUID, () ->
+        startJob(false));
+  }
+
+  /**
+   * Strip staging/spark UUID options.
+   * @param conf config
+   * @return the patched config
+   */
+  protected Configuration unsetUUIDOptions(final Configuration conf) {
+    conf.unset(SPARK_WRITE_UUID);
+    conf.unset(FS_S3A_COMMITTER_UUID);
+    conf.unset(FS_S3A_COMMITTER_GENERATE_UUID);
+    conf.unset(FS_S3A_COMMITTER_REQUIRE_UUID);
+    return conf;
+  }
+
+  /**
+   * Assert that a committer's job attempt path exists.
+   * For the staging committers, this is in the cluster FS.
+   * @param committer committer
+   * @param jobContext job context
+   * @throws IOException failure
+   */
+  protected void assertJobAttemptPathExists(
+      final AbstractS3ACommitter committer,
+      final JobContext jobContext) throws IOException {
+    Path attemptPath = committer.getJobAttemptPath(jobContext);
+    ContractTestUtils.assertIsDirectory(
+        attemptPath.getFileSystem(committer.getConf()),
+        attemptPath);
+  }
+
   @Test
   public void testS3ACommitterFactoryBinding() throws Throwable {
     describe("Verify that the committer factory returns this "
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java
index 1ac8038..5d1e919 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java
@@ -66,7 +66,7 @@ public class LoggingTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
     }
     Path file = getDefaultWorkFile(job, extension);
     FileSystem fs = file.getFileSystem(conf);
-    FSDataOutputStream fileOut = fs.create(file, false);
+    FSDataOutputStream fileOut = fs.create(file, true);
     LOG.debug("Creating LineRecordWriter with destination {}", file);
     if (isCompressed) {
       return new LoggingLineRecordWriter<>(
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
index caf54d1..bf67f07 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
@@ -77,7 +77,7 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS;
-import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
 import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
 import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -254,7 +254,7 @@ public class ITestS3ACommitterMRJob extends AbstractYarnClusterITest {
     jobConf.set("mock-results-file", committerPath);
 
     // setting up staging options is harmless for other committers
-    jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID);
+    jobConf.set(FS_S3A_COMMITTER_UUID, commitUUID);
 
     mrJob.setInputFormatClass(TextInputFormat.class);
     FileInputFormat.addInputPath(mrJob,
@@ -310,7 +310,8 @@ public class ITestS3ACommitterMRJob extends AbstractYarnClusterITest {
         committerName(),
         fs,
         "MR job " + jobID,
-        1);
+        1,
+        "");
     String commitData = successData.toString();
 
     FileStatus[] results = fs.listStatus(outputPath,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
index 057adf5..f6d6307 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
@@ -21,6 +21,9 @@ package org.apache.hadoop.fs.s3a.commit.magic;
 import java.io.IOException;
 import java.net.URI;
 
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -34,6 +37,7 @@ import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 import static org.hamcrest.CoreMatchers.containsString;
@@ -91,14 +95,14 @@ public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
   }
 
   @Override
-  protected AbstractS3ACommitter createCommitter(
+  protected MagicS3GuardCommitter createCommitter(
       Path outputPath,
       TaskAttemptContext context)
       throws IOException {
     return new MagicS3GuardCommitter(outputPath, context);
   }
 
-  public AbstractS3ACommitter createFailingCommitter(
+  public MagicS3GuardCommitter createFailingCommitter(
       TaskAttemptContext tContext) throws IOException {
     return new CommitterWithFailedThenSucceed(getOutDir(), tContext);
   }
@@ -137,6 +141,41 @@ public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
   }
 
   /**
+   * Verify that the __magic path for the application/tasks use the
+   * committer UUID to ensure uniqueness in the case of more than
+   * one job writing to the same destination path.
+   */
+  @Test
+  public void testCommittersPathsHaveUUID() throws Throwable {
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(
+        getConfiguration(),
+        getTaskAttempt0());
+    MagicS3GuardCommitter committer = createCommitter(getOutDir(), tContext);
+
+    String ta0 = getTaskAttempt0().toString();
+    // magic path for the task attempt
+    Path taskAttemptPath = committer.getTaskAttemptPath(tContext);
+    Assertions.assertThat(taskAttemptPath.toString())
+        .describedAs("task path of %s", committer)
+        .contains(committer.getUUID())
+        .contains(MAGIC)
+        .doesNotContain(TEMP_DATA)
+        .endsWith(BASE)
+        .contains(ta0);
+
+    // temp path for files which the TA will create with an absolute path
+    // and which need renaming into place.
+    Path tempTaskAttemptPath = committer.getTempTaskAttemptPath(tContext);
+    Assertions.assertThat(tempTaskAttemptPath.toString())
+        .describedAs("Temp task path of %s", committer)
+        .contains(committer.getUUID())
+        .contains(TEMP_DATA)
+        .doesNotContain(MAGIC)
+        .doesNotContain(BASE)
+        .contains(ta0);
+  }
+
+  /**
    * The class provides a overridden implementation of commitJobInternal which
    * causes the commit failed for the first time then succeed.
    */
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
index f368bf2..031089e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
@@ -341,7 +341,7 @@ public class StagingTestBase {
 
     protected JobConf createJobConf() {
       JobConf conf = new JobConf();
-      conf.set(InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID,
+      conf.set(InternalCommitterConstants.FS_S3A_COMMITTER_UUID,
           UUID.randomUUID().toString());
       conf.setBoolean(
           CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
@@ -401,7 +401,7 @@ public class StagingTestBase {
 
       // get the task's configuration copy so modifications take effect
       String tmp = System.getProperty(
-          StagingCommitterConstants.JAVA_IO_TMPDIR);
+          InternalCommitterConstants.JAVA_IO_TMPDIR);
       tempDir = new File(tmp);
       tac.getConfiguration().set(Constants.BUFFER_DIR, tmp + "/buffer");
       tac.getConfiguration().set(
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
index 15ea754..f552fa9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.AWSClientIOException;
 import org.apache.hadoop.fs.s3a.MockS3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
+import org.apache.hadoop.fs.s3a.commit.PathCommitException;
 import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.mapred.JobConf;
@@ -84,8 +86,13 @@ import static org.apache.hadoop.test.LambdaTestUtils.*;
 public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
 
   private static final JobID JOB_ID = new JobID("job", 1);
+
+  public static final TaskID TASK_ID = new TaskID(JOB_ID, TaskType.REDUCE, 2);
+
   private static final TaskAttemptID AID = new TaskAttemptID(
-      new TaskID(JOB_ID, TaskType.REDUCE, 2), 3);
+      TASK_ID, 1);
+  private static final TaskAttemptID AID2 = new TaskAttemptID(
+      TASK_ID, 2);
   private static final Logger LOG =
       LoggerFactory.getLogger(TestStagingCommitter.class);
 
@@ -141,8 +148,8 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
     jobConf.setInt(FS_S3A_COMMITTER_THREADS, numThreads);
     jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
         uniqueFilenames);
-    jobConf.set(FS_S3A_COMMITTER_STAGING_UUID,
-        UUID.randomUUID().toString());
+    jobConf.set(FS_S3A_COMMITTER_UUID,
+        uuid());
     jobConf.set(RETRY_INTERVAL, "100ms");
     jobConf.setInt(RETRY_LIMIT, 1);
 
@@ -190,36 +197,137 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
     }
   }
 
+  private Configuration newConfig() {
+    return new Configuration(false);
+  }
+
   @Test
   public void testUUIDPropagation() throws Exception {
-    Configuration config = new Configuration();
-    String jobUUID = addUUID(config);
-    assertEquals("Upload UUID", jobUUID,
-        StagingCommitter.getUploadUUID(config, JOB_ID));
+    Configuration config = newConfig();
+    String uuid = uuid();
+    config.set(SPARK_WRITE_UUID, uuid);
+    config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+    Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
+        .buildJobUUID(config, JOB_ID);
+    assertEquals("Job UUID", uuid, t3.getLeft());
+    assertEquals("Job UUID source: " + t3,
+        AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID,
+        t3.getRight());
+  }
+
+  /**
+   * If the Spark UUID is required, then binding will fail
+   * if a UUID did not get passed in.
+   */
+  @Test
+  public void testUUIDValidation() throws Exception {
+    Configuration config = newConfig();
+    config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+    intercept(PathCommitException.class, E_NO_SPARK_UUID, () ->
+        AbstractS3ACommitter.buildJobUUID(config, JOB_ID));
+  }
+
+  /**
+   * Validate ordering of UUID retrieval.
+   */
+  @Test
+  public void testUUIDLoadOrdering() throws Exception {
+    Configuration config = newConfig();
+    config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
+    String uuid = uuid();
+    // MUST be picked up
+    config.set(FS_S3A_COMMITTER_UUID, uuid);
+    config.set(SPARK_WRITE_UUID, "something");
+    Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
+        .buildJobUUID(config, JOB_ID);
+    assertEquals("Job UUID", uuid, t3.getLeft());
+    assertEquals("Job UUID source: " + t3,
+        AbstractS3ACommitter.JobUUIDSource.CommitterUUIDProperty,
+        t3.getRight());
+  }
+
+  /**
+   * Verify that unless the config enables self-generation, JobIDs
+   * are used.
+   */
+  @Test
+  public void testJobIDIsUUID() throws Exception {
+    Configuration config = newConfig();
+    Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
+        .buildJobUUID(config, JOB_ID);
+    assertEquals("Job UUID source: " + t3,
+        AbstractS3ACommitter.JobUUIDSource.JobID,
+        t3.getRight());
+    // parse it as a JobID
+    JobID.forName(t3.getLeft());
   }
 
+  /**
+   * Verify self-generated UUIDs are supported when enabled,
+   * and come before JobID.
+   */
+  @Test
+  public void testSelfGeneratedUUID() throws Exception {
+    Configuration config = newConfig();
+    config.setBoolean(FS_S3A_COMMITTER_GENERATE_UUID, true);
+    Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
+        .buildJobUUID(config, JOB_ID);
+    assertEquals("Job UUID source: " + t3,
+        AbstractS3ACommitter.JobUUIDSource.GeneratedLocally,
+        t3.getRight());
+    // parse it
+    UUID.fromString(t3.getLeft());
+  }
+
+  /**
+   * Create a UUID and add it as the staging UUID.
+   * @param config config to patch
+   * @return the UUID
+   */
   private String addUUID(Configuration config) {
-    String jobUUID = UUID.randomUUID().toString();
-    config.set(FS_S3A_COMMITTER_STAGING_UUID, jobUUID);
+    String jobUUID = uuid();
+    config.set(FS_S3A_COMMITTER_UUID, jobUUID);
     return jobUUID;
   }
 
+  /**
+   * Create a new UUID.
+   * @return a uuid as a string.
+   */
+  private String uuid() {
+    return UUID.randomUUID().toString();
+  }
+
   @Test
   public void testAttemptPathConstructionNoSchema() throws Exception {
-    Configuration config = new Configuration();
+    Configuration config = newConfig();
     final String jobUUID = addUUID(config);
     config.set(BUFFER_DIR, "/tmp/mr-local-0,/tmp/mr-local-1");
     String commonPath = "file:/tmp/mr-local-";
+    Assertions.assertThat(getLocalTaskAttemptTempDir(config,
+        jobUUID, tac.getTaskAttemptID()).toString())
+        .describedAs("Missing scheme should produce local file paths")
+        .startsWith(commonPath)
+        .contains(jobUUID);
+  }
 
-    assertThat("Missing scheme should produce local file paths",
-        getLocalTaskAttemptTempDir(config,
-            jobUUID, tac.getTaskAttemptID()).toString(),
-        StringStartsWith.startsWith(commonPath));
+  @Test
+  public void testAttemptPathsDifferentByTaskAttempt() throws Exception {
+    Configuration config = newConfig();
+    final String jobUUID = addUUID(config);
+    config.set(BUFFER_DIR, "file:/tmp/mr-local-0");
+    String attempt1Path = getLocalTaskAttemptTempDir(config,
+        jobUUID, AID).toString();
+    String attempt2Path = getLocalTaskAttemptTempDir(config,
+        jobUUID, AID2).toString();
+    Assertions.assertThat(attempt2Path)
+        .describedAs("local task attempt dir of TA1 must not match that of TA2")
+        .isNotEqualTo(attempt1Path);
   }
 
   @Test
   public void testAttemptPathConstructionWithSchema() throws Exception {
-    Configuration config = new Configuration();
+    Configuration config = newConfig();
     final String jobUUID = addUUID(config);
     String commonPath = "file:/tmp/mr-local-";
 
@@ -234,7 +342,7 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
 
   @Test
   public void testAttemptPathConstructionWrongSchema() throws Exception {
-    Configuration config = new Configuration();
+    Configuration config = newConfig();
     final String jobUUID = addUUID(config);
     config.set(BUFFER_DIR,
         "hdfs://nn:8020/tmp/mr-local-0,hdfs://nn:8020/tmp/mr-local-1");
@@ -270,7 +378,7 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
     assertEquals("Should name the commits file with the task ID: " + results,
         "task_job_0001_r_000002", stats[0].getPath().getName());
 
-    PendingSet pending = PendingSet.load(dfs, stats[0].getPath());
+    PendingSet pending = PendingSet.load(dfs, stats[0]);
     assertEquals("Should have one pending commit", 1, pending.size());
     SinglePendingCommit commit = pending.getCommits().get(0);
     assertEquals("Should write to the correct bucket:" + results,
@@ -310,8 +418,7 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
     assertEquals("Should name the commits file with the task ID",
         "task_job_0001_r_000002", stats[0].getPath().getName());
 
-    PendingSet pending = PendingSet.load(dfs,
-        stats[0].getPath());
+    PendingSet pending = PendingSet.load(dfs, stats[0]);
     assertEquals("Should have one pending commit", 1, pending.size());
   }
 
@@ -334,7 +441,7 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
         "task_job_0001_r_000002", stats[0].getPath().getName());
 
     List<SinglePendingCommit> pending =
-        PendingSet.load(dfs, stats[0].getPath()).getCommits();
+        PendingSet.load(dfs, stats[0]).getCommits();
     assertEquals("Should have correct number of pending commits",
         files.size(), pending.size());
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
index 872097f..86b677c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
@@ -110,7 +110,7 @@ public class TestStagingPartitionedJobCommit
         file.deleteOnExit();
         Path path = new Path(file.toURI());
         pendingSet.save(localFS, path, true);
-        activeCommit.add(path);
+        activeCommit.add(localFS.getFileStatus(path));
       }
       return activeCommit;
     }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
index 180e743..a4dface 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.fs.s3a.commit.staging.integration;
 
 import java.io.IOException;
+import java.util.UUID;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -34,6 +36,7 @@ import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
 import org.apache.hadoop.fs.s3a.commit.staging.Paths;
 import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -68,8 +71,15 @@ public class ITestStagingCommitProtocol extends AbstractITCommitProtocol {
 
     // identify working dir for staging and delete
     Configuration conf = getConfiguration();
-    String uuid = StagingCommitter.getUploadUUID(conf,
-        getTaskAttempt0().getJobID());
+    String uuid = UUID.randomUUID().toString();
+    conf.set(InternalCommitterConstants.SPARK_WRITE_UUID,
+        uuid);
+    Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
+        .buildJobUUID(conf, JobID.forName("job_" + getJobId()));
+    assertEquals("Job UUID", uuid, t3.getLeft());
+    assertEquals("Job UUID source: " + t3,
+        AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID,
+        t3.getRight());
     Path tempDir = Paths.getLocalTaskAttemptTempDir(conf, uuid,
         getTaskAttempt0());
     rmdir(tempDir, conf);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java
index dc6c6d1..3a28fef 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java
@@ -242,7 +242,7 @@ public class ITestTerasortOnS3A extends AbstractYarnClusterITest {
         + "(" + StringUtils.join(", ", args) + ")"
         + " failed", 0, result);
     validateSuccessFile(dest, committerName(), getFileSystem(), stage,
-        minimumFileCount);
+        minimumFileCount, "");
     completedStage(stage, d);
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
index 315d1fe..8d29c29 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -103,7 +103,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
         KEY_HUGE_PARTITION_SIZE,
         DEFAULT_HUGE_PARTITION_SIZE);
     assertTrue("Partition size too small: " + partitionSize,
-        partitionSize > MULTIPART_MIN_SIZE);
+        partitionSize >= MULTIPART_MIN_SIZE);
     conf.setLong(SOCKET_SEND_BUFFER, _1MB);
     conf.setLong(SOCKET_RECV_BUFFER, _1MB);
     conf.setLong(MIN_MULTIPART_THRESHOLD, partitionSize);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/02: HADOOP-17385. ITestS3ADeleteCost.testDirMarkersFileCreation failure (#2473).

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 1e59bf7394da98d57f61c26d34c19f1394eadd23
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Thu Nov 26 12:50:33 2020 +0000

    HADOOP-17385. ITestS3ADeleteCost.testDirMarkersFileCreation failure (#2473).
    
    Contributed by Steve Loughran
    
    The addition of deprecated S3A configuration options in HADOOP-17318
    triggered a reload of default (xml resource) configurations, which breaks
    tests which fail if there's a per-bucket setting inconsistent with test
    setup.
    
    Creating an S3AFS instance before creating the Configuration() instance
    for test runs gets that reload out the way before test setup takes
    place.
    
    Along with the fix, extra changes in the failing test suite to fail
    fast when marker policy isn't as expected, and to log FS state better.
    
    Rather than create and discard an instance, add a new static method
    to S3AFS and invoke it in test setup. This forces the load
    
    Change-Id: Id52b1c46912c6fedd2ae270e2b1eb2222a360329
---
 .../src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 9 +++++++++
 .../test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java | 4 ++++
 .../test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java  | 3 +++
 .../apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java    | 9 +++++++++
 .../org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java | 1 +
 5 files changed, 26 insertions(+)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index cc37df7..9d4479e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -4955,6 +4955,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   }
 
   /**
+   * This is purely for testing, as it force initializes all static
+   * initializers. See HADOOP-17385 for details.
+   */
+  @InterfaceAudience.Private
+  public static void initializeClass() {
+    LOG.debug("Initialize S3A class");
+  }
+
+  /**
    * The implementation of context accessors.
    */
   private class ContextAccessorsImpl implements ContextAccessors {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
index 1311020..0d3dd4c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.contract.s3a;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 
 /**
@@ -53,6 +54,9 @@ public class S3AContract extends AbstractBondedFSContract {
    */
   public S3AContract(Configuration conf, boolean addContractResource) {
     super(conf);
+    // Force deprecated key load through the
+    // static initializers. See: HADOOP-17385
+    S3AFileSystem.initializeClass();
     //insert the base features
     if (addContractResource) {
       addConfResource(CONTRACT_XML);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
index da679cd..73a503a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
@@ -60,6 +60,9 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
     // filesystems which add default configuration resources to do it before
     // our tests start adding/removing options. See HADOOP-16626.
     FileSystem.getLocal(new Configuration());
+    // Force deprecated key load through the
+    // static initializers. See: HADOOP-17385
+    S3AFileSystem.initializeClass();
     super.setup();
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
index d7e277f..2b2fb7c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.Statistic;
 import org.apache.hadoop.fs.s3a.Tristate;
+import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
 import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
@@ -141,6 +142,14 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase {
 
     isDeleting = !isKeeping;
 
+    // check that the FS has the expected state
+    DirectoryPolicy markerPolicy = fs.getDirectoryMarkerPolicy();
+    Assertions.assertThat(markerPolicy.getMarkerPolicy())
+        .describedAs("Marker policy for filesystem %s", fs)
+        .isEqualTo(isKeepingMarkers()
+            ? DirectoryPolicy.MarkerPolicy.Keep
+            : DirectoryPolicy.MarkerPolicy.Delete);
+
     // insert new metrics so as to keep the list sorted
     costValidator = OperationCostValidator.builder(getFileSystem())
         .withMetrics(
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java
index 618b491..0b5afc6 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java
@@ -267,6 +267,7 @@ public class ITestS3ADeleteCost extends AbstractS3ACostTest {
 
     verifyMetrics(() -> {
       file(new Path(srcDir, "source.txt"));
+      LOG.info("Metrics: {}\n{}", getMetricSummary(), getFileSystem());
       return "after touch(fs, srcFilePath) " + getMetricSummary();
     },
         with(DIRECTORIES_CREATED, 0),


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org