You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/08/31 16:11:08 UTC

[42/47] hadoop git commit: HADOOP-15107. Stabilize/tune S3A committers; review correctness & docs. Contributed by Steve Loughran.

HADOOP-15107. Stabilize/tune S3A committers; review correctness & docs.
Contributed by Steve Loughran.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5a0babf7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5a0babf7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5a0babf7

Branch: refs/heads/HDFS-12943
Commit: 5a0babf76550f63dad4c17173c4da2bf335c6532
Parents: e8d138c
Author: Steve Loughran <st...@apache.org>
Authored: Thu Aug 30 14:49:53 2018 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Aug 30 14:49:53 2018 +0100

----------------------------------------------------------------------
 .../lib/output/PathOutputCommitter.java         |  12 +-
 .../java/org/apache/hadoop/fs/s3a/Invoker.java  |  15 +-
 .../fs/s3a/commit/AbstractS3ACommitter.java     |  16 +-
 .../fs/s3a/commit/S3ACommitterFactory.java      |  18 +-
 .../s3a/commit/magic/MagicS3GuardCommitter.java |   7 +
 .../staging/DirectoryStagingCommitter.java      |   8 +-
 .../staging/PartitionedStagingCommitter.java    |   9 +-
 .../hadoop/fs/s3a/commit/staging/Paths.java     |  14 +-
 .../fs/s3a/commit/staging/StagingCommitter.java |  50 ++++-
 .../tools/hadoop-aws/committer_architecture.md  |  94 ++++++---
 .../markdown/tools/hadoop-aws/committers.md     |   2 +-
 .../fs/s3a/commit/AbstractCommitITest.java      |  19 ++
 .../fs/s3a/commit/AbstractITCommitMRJob.java    |   5 +-
 .../fs/s3a/commit/AbstractITCommitProtocol.java |  63 ++++--
 .../fs/s3a/commit/ITestS3ACommitterFactory.java | 200 +++++++++++++++++++
 .../fs/s3a/commit/magic/ITMagicCommitMRJob.java |   6 +-
 .../commit/magic/ITestMagicCommitProtocol.java  |  25 ++-
 .../ITStagingCommitMRJobBadDest.java            |  62 ++++++
 .../integration/ITestStagingCommitProtocol.java |  13 ++
 19 files changed, 542 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
index 3679d9f..5e25f50 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
@@ -57,8 +57,8 @@ public abstract class PathOutputCommitter extends OutputCommitter {
   protected PathOutputCommitter(Path outputPath,
       TaskAttemptContext context) throws IOException {
     this.context = Preconditions.checkNotNull(context, "Null context");
-    LOG.debug("Creating committer with output path {} and task context"
-        + " {}", outputPath, context);
+    LOG.debug("Instantiating committer {} with output path {} and task context"
+        + " {}", this, outputPath, context);
   }
 
   /**
@@ -71,8 +71,8 @@ public abstract class PathOutputCommitter extends OutputCommitter {
   protected PathOutputCommitter(Path outputPath,
       JobContext context) throws IOException {
     this.context = Preconditions.checkNotNull(context, "Null context");
-    LOG.debug("Creating committer with output path {} and job context"
-        + " {}", outputPath, context);
+    LOG.debug("Instantiating committer {} with output path {} and job context"
+        + " {}", this, outputPath, context);
   }
 
   /**
@@ -103,6 +103,8 @@ public abstract class PathOutputCommitter extends OutputCommitter {
 
   @Override
   public String toString() {
-    return "PathOutputCommitter{context=" + context + '}';
+    return "PathOutputCommitter{context=" + context
+        + "; " + super.toString()
+        + '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
index a007ba1..45912a0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
@@ -130,8 +130,9 @@ public class Invoker {
   }
 
   /**
-   * Execute an operation and ignore all raised IOExceptions; log at INFO.
-   * @param log log to log at info.
+   * Execute an operation and ignore all raised IOExceptions; log at INFO;
+   * full stack only at DEBUG.
+   * @param log log to use.
    * @param action action to include in log
    * @param path optional path to include in log
    * @param operation operation to execute
@@ -145,13 +146,17 @@ public class Invoker {
     try {
       once(action, path, operation);
     } catch (IOException e) {
-      log.info("{}: {}", toDescription(action, path), e.toString(), e);
+      String description = toDescription(action, path);
+      String error = e.toString();
+      log.info("{}: {}", description, error);
+      log.debug("{}", description, e);
     }
   }
 
   /**
-   * Execute an operation and ignore all raised IOExceptions; log at INFO.
-   * @param log log to log at info.
+   * Execute an operation and ignore all raised IOExceptions; log at INFO;
+   * full stack only at DEBUG.
+   * @param log log to use.
    * @param action action to include in log
    * @param path optional path to include in log
    * @param operation operation to execute

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
index 5f1ddfa..d2501da 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
@@ -292,7 +292,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
     final StringBuilder sb = new StringBuilder(
         "AbstractS3ACommitter{");
     sb.append("role=").append(role);
-    sb.append(", name").append(getName());
+    sb.append(", name=").append(getName());
     sb.append(", outputPath=").append(getOutputPath());
     sb.append(", workPath=").append(workPath);
     sb.append('}');
@@ -532,8 +532,14 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
              new DurationInfo(LOG, "Aborting all pending commits under %s",
                  dest)) {
       CommitOperations ops = getCommitOperations();
-      List<MultipartUpload> pending = ops
-          .listPendingUploadsUnderPath(dest);
+      List<MultipartUpload> pending;
+      try {
+        pending = ops.listPendingUploadsUnderPath(dest);
+      } catch (IOException e) {
+        // raised if the listPendingUploads call failed.
+        maybeIgnore(suppressExceptions, "aborting pending uploads", e);
+        return;
+      }
       Tasks.foreach(pending)
           .executeWith(buildThreadPool(getJobContext()))
           .suppressExceptions(suppressExceptions)
@@ -656,7 +662,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
   }
 
   /**
-   * Execute an operation; maybe suppress any raised IOException.
+   * Log or rethrow a caught IOException.
    * @param suppress should raised IOEs be suppressed?
    * @param action action (for logging when the IOE is suppressed.
    * @param ex  exception
@@ -667,7 +673,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
       String action,
       IOException ex) throws IOException {
     if (suppress) {
-      LOG.info(action, ex);
+      LOG.debug(action, ex);
     } else {
       throw ex;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
index 6b170f9..36d0af1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
@@ -77,9 +77,20 @@ public class S3ACommitterFactory extends AbstractS3ACommitterFactory {
     AbstractS3ACommitterFactory factory = chooseCommitterFactory(fileSystem,
         outputPath,
         context.getConfiguration());
-    return factory != null ?
-      factory.createTaskCommitter(fileSystem, outputPath, context)
-      : createFileOutputCommitter(outputPath, context);
+    if (factory != null) {
+      PathOutputCommitter committer = factory.createTaskCommitter(
+          fileSystem, outputPath, context);
+      LOG.info("Using committer {} to output data to {}",
+          (committer instanceof AbstractS3ACommitter
+              ? ((AbstractS3ACommitter) committer).getName()
+              : committer.toString()),
+          outputPath);
+      return committer;
+    } else {
+      LOG.warn("Using standard FileOutputCommitter to commit work."
+          + " This is slow and potentially unsafe.");
+      return createFileOutputCommitter(outputPath, context);
+    }
   }
 
   /**
@@ -104,6 +115,7 @@ public class S3ACommitterFactory extends AbstractS3ACommitterFactory {
 
     String name = fsConf.getTrimmed(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
     name = taskConf.getTrimmed(FS_S3A_COMMITTER_NAME, name);
+    LOG.debug("Committer option is {}", name);
     switch (name) {
     case COMMITTER_NAME_FILE:
       factory = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
index c305141..c956a98 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
@@ -285,4 +285,11 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
     return CommitUtilsWithMR.getTempTaskAttemptPath(context, getOutputPath());
   }
 
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "MagicCommitter{");
+    sb.append('}');
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
index 3eda24f..23bb06b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
@@ -27,13 +27,11 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathExistsException;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
-import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
 
 /**
  * This commits to a directory.
@@ -70,10 +68,8 @@ public class DirectoryStagingCommitter extends StagingCommitter {
     if (getConflictResolutionMode(context, fs.getConf())
         == ConflictResolution.FAIL
         && fs.exists(outputPath)) {
-      LOG.debug("Failing commit by task attempt {} to write"
-              + " to existing output path {}",
-          context.getJobID(), getOutputPath());
-      throw new PathExistsException(outputPath.toString(), E_DEST_EXISTS);
+      throw failDestinationExists(outputPath,
+          "Setting job as " + getRole());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
index bfaf443..b51bcb5 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
@@ -31,14 +31,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathExistsException;
 import org.apache.hadoop.fs.s3a.commit.PathCommitException;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
-import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
 
 /**
  * Partitioned committer.
@@ -100,11 +98,8 @@ public class PartitionedStagingCommitter extends StagingCommitter {
         Path partitionPath = getFinalPath(partition + "/file",
             context).getParent();
         if (fs.exists(partitionPath)) {
-          LOG.debug("Failing commit by task attempt {} to write"
-              + " to existing path {} under {}",
-              context.getTaskAttemptID(), partitionPath, getOutputPath());
-          throw new PathExistsException(partitionPath.toString(),
-              E_DEST_EXISTS);
+          throw failDestinationExists(partitionPath,
+              "Committing task " + context.getTaskAttemptID());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
index d5d256a..a941572 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
@@ -167,13 +167,15 @@ public final class Paths {
             return FileSystem.getLocal(conf).makeQualified(
                 allocator.getLocalPathForWrite(uuid, conf));
           });
-    } catch (ExecutionException e) {
-      throw new RuntimeException(e.getCause());
-    } catch (UncheckedExecutionException e) {
-      if (e.getCause() instanceof RuntimeException) {
-        throw (RuntimeException) e.getCause();
+    } catch (ExecutionException | UncheckedExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof RuntimeException) {
+        throw (RuntimeException) cause;
       }
-      throw new RuntimeException(e);
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      }
+      throw new IOException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 2182eaa..6d02e86 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
@@ -500,6 +502,10 @@ public class StagingCommitter extends AbstractS3ACommitter {
           listAndFilter(attemptFS,
               wrappedJobAttemptPath, false,
               HIDDEN_FILE_FILTER));
+    } catch (FileNotFoundException e) {
+      // this can mean the job was aborted early on, so don't confuse people
+      // with long stack traces that aren't the underlying problem.
+      maybeIgnore(suppressExceptions, "Pending upload directory not found", e);
     } catch (IOException e) {
       // unable to work with endpoint, if suppressing errors decide our actions
       maybeIgnore(suppressExceptions, "Listing pending uploads", e);
@@ -565,13 +571,13 @@ public class StagingCommitter extends AbstractS3ACommitter {
   }
 
   /**
-   * Delete the working paths of a job. Does not attempt to clean up
-   * the work of the wrapped committer.
+   * Delete the working paths of a job.
    * <ol>
    *   <li>The job attempt path</li>
-   *   <li>$dest/__temporary</li>
+   *   <li>{@code $dest/__temporary}</li>
    *   <li>the local working directory for staged files</li>
    * </ol>
+   * Does not attempt to clean up the work of the wrapped committer.
    * @param context job context
    * @throws IOException IO failure
    */
@@ -836,6 +842,44 @@ public class StagingCommitter extends AbstractS3ACommitter {
   }
 
   /**
+   * Generate a {@link PathExistsException} because the destination exists.
+   * Lists some of the child entries first, to help diagnose the problem.
+   * @param path path which exists
+   * @param description description (usually task/job ID)
+   * @return an exception to throw
+   */
+  protected PathExistsException failDestinationExists(final Path path,
+      final String description) {
+
+    LOG.error("{}: Failing commit by job {} to write"
+            + " to existing output path {}.",
+        description,
+        getJobContext().getJobID(), path);
+    // List the first 10 descendants, to give some details
+    // on what is wrong but not overload things if there are many files.
+    try {
+      int limit = 10;
+      RemoteIterator<LocatedFileStatus> lf
+          = getDestFS().listFiles(path, true);
+      LOG.info("Partial Directory listing");
+      while (limit > 0 && lf.hasNext()) {
+        limit--;
+        LocatedFileStatus status = lf.next();
+        LOG.info("{}: {}",
+            status.getPath(),
+            status.isDirectory()
+                ? " dir"
+                : ("file size " + status.getLen() + " bytes"));
+      }
+    } catch (IOException e) {
+      LOG.info("Discarding exception raised when listing {}: " + e, path);
+      LOG.debug("stack trace ", e);
+    }
+    return new PathExistsException(path.toString(),
+        description + ": " + InternalCommitterConstants.E_DEST_EXISTS);
+  }
+
+  /**
    * Get the conflict mode option string.
    * @param context context with the config
    * @param fsConf filesystem config

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
index e4ba75d..3071754 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
@@ -230,7 +230,6 @@ None: directories are created on demand.
 Rename task attempt path to task committed path.
 
 ```python
-
 def needsTaskCommit(fs, jobAttemptPath, taskAttemptPath, dest):
   return fs.exists(taskAttemptPath)
 
@@ -276,12 +275,12 @@ def commitJob(fs, jobAttemptDir, dest):
 (See below for details on `mergePaths()`)
 
 
-A failure during job abort cannot be recovered from except by re-executing
+A failure during job commit cannot be recovered from except by re-executing
 the entire query:
 
 ```python
 def isCommitJobRepeatable() :
-  return True
+  return False
 ```
 
 Accordingly, it is a failure point in the protocol. With a low number of files
@@ -307,12 +306,28 @@ def cleanupJob(fs, dest):
 ```
 
 
-### Job Recovery
+### Job Recovery Before `commitJob()`
 
-1. Data under task committed paths is retained
-1. All directories under `$dest/_temporary/$appAttemptId/_temporary/` are deleted.
+For all committers, the recovery process takes place in the application
+master.
+1. The job history file of the previous attempt is loaded and scanned
+to determine which tasks were recorded as having succeeded.
+1. For each successful task, the job committer has its `recoverTask()` method
+invoked with a `TaskAttemptContext` built from the previous attempt's details.
+1. If the method does not raise an exception, it is considered to have been
+recovered, and not to be re-executed.
+1. All other tasks are queued for execution.
 
-Uncommitted/unexecuted tasks are (re)executed.
+For the v1 committer, task recovery is straightforward.
+The directory of the committed task from the previous attempt is
+moved under the directory of the current application attempt.
+
+```python
+def recoverTask(tac):
+  oldAttemptId = appAttemptId - 1
+  fs.rename('$dest/_temporary/oldAttemptId/${tac.taskId}',
+    '$dest/_temporary/appAttemptId/${tac.taskId}')
+```
 
 This significantly improves time to recover from Job driver (here MR AM) failure.
 The only lost work is that of all tasks in progress -those which had generated
@@ -330,6 +345,11 @@ failure simply by rerunning the entire job. This is implicitly the strategy
 in Spark, which does not attempt to recover any in-progress jobs. The faster
 your queries, the simpler your recovery strategy needs to be.
 
+### Job Recovery During `commitJob()`
+
+This is not possible; a failure during job commit requires the entire job
+to be re-executed after cleaning up the destination directory.
+
 ### `mergePaths(FileSystem fs, FileStatus src, Path dest)` Algorithm
 
 `mergePaths()` is the core algorithm to merge data; it is somewhat confusing
@@ -352,24 +372,23 @@ def mergePathsV1(fs, src, dest) :
       fs.delete(dest, recursive = True)
     fs.rename(src.getPath, dest)
   else :
-    # destination is directory, choose action on source type
-    if src.isDirectory :
-      if not toStat is None :
-        if not toStat.isDirectory :
-          # Destination exists and is not a directory
-          fs.delete(dest)
-          fs.rename(src.getPath(), dest)
-        else :
-          # Destination exists and is a directory
-          # merge all children under destination directory
-          for child in fs.listStatus(src.getPath) :
-            mergePathsV1(fs, child, dest + child.getName)
-      else :
-        # destination does not exist
+    # src is directory, choose action on dest type
+    if not toStat is None :
+      if not toStat.isDirectory :
+        # Destination exists and is not a directory
+        fs.delete(dest)
         fs.rename(src.getPath(), dest)
+      else :
+        # Destination exists and is a directory
+        # merge all children under destination directory
+        for child in fs.listStatus(src.getPath) :
+          mergePathsV1(fs, child, dest + child.getName)
+    else :
+      # destination does not exist
+      fs.rename(src.getPath(), dest)
 ```
 
-## v2 commit algorithm
+## The v2 Commit Algorithm
 
 
 The v2 algorithm directly commits task output into the destination directory.
@@ -506,12 +525,31 @@ Cost: `O(1)` for normal filesystems, `O(files)` for object stores.
 As no data is written to the destination directory, a task can be cleaned up
 by deleting the task attempt directory.
 
-### v2 Job Recovery
+### v2 Job Recovery Before `commitJob()`
+
+
+Because the data has been renamed into the destination directory, all tasks
+recorded as having being committed have no recovery needed at all:
+
+```python
+def recoverTask(tac):
+```
+
+All active and queued tasks are scheduled for execution.
+
+There is a weakness here, the same one on a failure during `commitTask()`:
+it is only safe to repeat a task which failed during that commit operation
+if the name of all generated files are constant across all task attempts.
+
+If the Job AM fails while a task attempt has been instructed to commit,
+and that commit is not recorded as having completed, the state of that
+in-progress task is unknown...really it isn't be safe to recover the
+job at this point.
+
 
-Because the data has been renamed into the destination directory, it is nominally
-recoverable. However, this assumes that the number and name of generated
-files are constant on retried tasks.
+### v2 Job Recovery During `commitJob()`
 
+This is straightforward: `commitJob()` is re-invoked.
 
 ## How MapReduce uses the committer in a task
 
@@ -896,7 +934,7 @@ and metadata.
 
         POST bucket.s3.aws.com/path?uploads
 
-    An UploadId is returned
+    An `UploadId` is returned
 
 1. Caller uploads one or more parts.
 
@@ -994,7 +1032,7 @@ Task outputs are directed to the local FS by `getTaskAttemptPath` and `getWorkPa
 
 The single-directory and partitioned committers handle conflict resolution by
 checking whether target paths exist in S3 before uploading any data.
-There are 3 conflict resolution modes, controlled by setting `fs.s3a.committer.staging.conflict-mode`:
+There are three conflict resolution modes, controlled by setting `fs.s3a.committer.staging.conflict-mode`:
 
 * `fail`: Fail a task if an output directory or partition already exists. (Default)
 * `append`: Upload data files without checking whether directories or partitions already exist.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
index 392cde2..09e123d 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
@@ -371,7 +371,7 @@ Put differently: start with the Directory Committer.
 
 To use an S3A committer, the property `mapreduce.outputcommitter.factory.scheme.s3a`
 must be set to the S3A committer factory, `org.apache.hadoop.fs.s3a.commit.staging.S3ACommitterFactory`.
-This is done in `core-default.xml`
+This is done in `mapred-default.xml`
 
 ```xml
 <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
index 90e8894..246bf9d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -174,6 +174,25 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
   }
 
   /**
+   * Create a random Job ID using the fork ID as part of the number.
+   * @return fork ID string in a format parseable by Jobs
+   * @throws Exception failure
+   */
+  protected String randomJobId() throws Exception {
+    String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID, "0001");
+    int l = testUniqueForkId.length();
+    String trailingDigits = testUniqueForkId.substring(l - 4, l);
+    try {
+      int digitValue = Integer.valueOf(trailingDigits);
+      return String.format("20070712%04d_%04d",
+          (long)(Math.random() * 1000),
+          digitValue);
+    } catch (NumberFormatException e) {
+      throw new Exception("Failed to parse " + trailingDigits, e);
+    }
+  }
+
+  /**
    * Teardown waits for the consistency delay and resets failure count, so
    * FS is stable, before the superclass teardown is called. This
    * should clean things up better.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
index 13dfd83..161db85 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
@@ -38,7 +38,6 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -266,9 +265,9 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
 
   /**
    * Override point to let implementations tune the MR Job conf.
-   * @param c configuration
+   * @param jobConf configuration
    */
-  protected void applyCustomConfigOptions(Configuration c) {
+  protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
 
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
index 4d7f524..5ae8f54 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -159,25 +159,6 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     cleanupDestDir();
   }
 
-  /**
-   * Create a random Job ID using the fork ID as part of the number.
-   * @return fork ID string in a format parseable by Jobs
-   * @throws Exception failure
-   */
-  private String randomJobId() throws Exception {
-    String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID, "0001");
-    int l = testUniqueForkId.length();
-    String trailingDigits = testUniqueForkId.substring(l - 4, l);
-    try {
-      int digitValue = Integer.valueOf(trailingDigits);
-      return String.format("20070712%04d_%04d",
-          (long)(Math.random() * 1000),
-          digitValue);
-    } catch (NumberFormatException e) {
-      throw new Exception("Failed to parse " + trailingDigits, e);
-    }
-  }
-
   @Override
   public void teardown() throws Exception {
     describe("teardown");
@@ -765,6 +746,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     JobContext jContext = jobData.jContext;
     TaskAttemptContext tContext = jobData.tContext;
     AbstractS3ACommitter committer = jobData.committer;
+    validateTaskAttemptWorkingDirectory(committer, tContext);
 
     // write output
     describe("1. Writing output");
@@ -1360,12 +1342,55 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
 
   }
 
+  @Test
+  public void testS3ACommitterFactoryBinding() throws Throwable {
+    describe("Verify that the committer factory returns this "
+        + "committer when configured to do so");
+    Job job = newJob();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
+        taskAttempt0);
+    String name = getCommitterName();
+    S3ACommitterFactory factory = new S3ACommitterFactory();
+    assertEquals("Wrong committer from factory",
+        createCommitter(outDir, tContext).getClass(),
+        factory.createOutputCommitter(outDir, tContext).getClass());
+  }
+
+  /**
+   * Validate the path of a file being written to during the write
+   * itself.
+   * @param p path
+   * @throws IOException IO failure
+   */
   protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
 
   }
 
+  /**
+   * Validate the path of a file being written to after the write
+   * operation has completed.
+   * @param p path
+   * @throws IOException IO failure
+   */
   protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
 
   }
 
+  /**
+   * Perform any actions needed to validate the working directory of
+   * a committer.
+   * For example: filesystem, path attributes
+   * @param committer committer instance
+   * @param context task attempt context
+   * @throws IOException IO failure
+   */
+  protected void validateTaskAttemptWorkingDirectory(
+      AbstractS3ACommitter committer,
+      TaskAttemptContext context) throws IOException {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
new file mode 100644
index 0000000..a8547d6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
+import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
+import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+
+/**
+ * Tests for some aspects of the committer factory.
+ * All tests are grouped into one single test so that only one
+ * S3A FS client is set up and used for the entire run.
+ * Saves time and money.
+ */
+public class ITestS3ACommitterFactory extends AbstractCommitITest {
+
+
+  protected static final String INVALID_NAME = "invalid-name";
+
+  /**
+   * Counter to guarantee that even in parallel test runs, no job has the same
+   * ID.
+   */
+
+  private String jobId;
+
+  // A random task attempt id for testing.
+  private String attempt0;
+
+  private TaskAttemptID taskAttempt0;
+
+  private Path outDir;
+
+  private S3ACommitterFactory factory;
+
+  private TaskAttemptContext tContext;
+
+  /**
+   * Parameterized list of bindings of committer name in config file to
+   * expected class instantiated.
+   */
+  private static final Object[][] bindings = {
+      {COMMITTER_NAME_FILE, FileOutputCommitter.class},
+      {COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class},
+      {COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class},
+      {InternalCommitterConstants.COMMITTER_NAME_STAGING,
+          StagingCommitter.class},
+      {COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class}
+  };
+
+  /**
+   * This is a ref to the FS conf, so changes here are visible
+   * to callers querying the FS config.
+   */
+  private Configuration filesystemConfRef;
+
+  private Configuration taskConfRef;
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    jobId = randomJobId();
+    attempt0 = "attempt_" + jobId + "_m_000000_0";
+    taskAttempt0 = TaskAttemptID.forName(attempt0);
+
+    outDir = path(getMethodName());
+    factory = new S3ACommitterFactory();
+    Configuration conf = new Configuration();
+    conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+    filesystemConfRef = getFileSystem().getConf();
+    tContext = new TaskAttemptContextImpl(conf, taskAttempt0);
+    taskConfRef = tContext.getConfiguration();
+  }
+
+  @Test
+  public void testEverything() throws Throwable {
+    testImplicitFileBinding();
+    testBindingsInTask();
+    testBindingsInFSConfig();
+    testInvalidFileBinding();
+    testInvalidTaskBinding();
+  }
+
+  /**
+   * Verify that if all config options are unset, the FileOutputCommitter
+   *
+   * is returned.
+   */
+  public void testImplicitFileBinding() throws Throwable {
+    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
+    filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
+    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
+  }
+
+  /**
+   * Verify that task bindings are picked up.
+   */
+  public void testBindingsInTask() throws Throwable {
+    // set this to an invalid value to be confident it is not
+    // being checked.
+    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID");
+    taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
+    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
+    for (Object[] binding : bindings) {
+      taskConfRef.set(FS_S3A_COMMITTER_NAME,
+          (String) binding[0]);
+      assertFactoryCreatesExpectedCommitter((Class) binding[1]);
+    }
+  }
+
+  /**
+   * Verify that FS bindings are picked up.
+   */
+  public void testBindingsInFSConfig() throws Throwable {
+    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
+    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
+    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
+    for (Object[] binding : bindings) {
+      taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]);
+      assertFactoryCreatesExpectedCommitter((Class) binding[1]);
+    }
+  }
+
+  /**
+   * Create an invalid committer via the FS binding,
+   */
+  public void testInvalidFileBinding() throws Throwable {
+    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
+    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
+    LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
+        () -> createCommitter());
+  }
+
+  /**
+   * Create an invalid committer via the task attempt.
+   */
+  public void testInvalidTaskBinding() throws Throwable {
+    filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
+    taskConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
+    LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
+        () -> createCommitter());
+  }
+
+  /**
+   * Assert that the factory creates the expected committer.
+   * @param expected expected committer class.
+   * @throws IOException IO failure.
+   */
+  protected void assertFactoryCreatesExpectedCommitter(
+      final Class expected)
+      throws IOException {
+    assertEquals("Wrong Committer from factory",
+        expected,
+        createCommitter().getClass());
+  }
+
+  /**
+   * Create a committer.
+   * @return the committer
+   * @throws IOException IO failure.
+   */
+  private PathOutputCommitter createCommitter() throws IOException {
+    return factory.createOutputCommitter(outDir, tContext);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
index 57eb8b2..b7be17a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
@@ -18,10 +18,10 @@
 
 package org.apache.hadoop.fs.s3a.commit.magic;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
 import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import org.apache.hadoop.mapred.JobConf;
 
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 
@@ -30,7 +30,7 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
  *
  * There's no need to disable the committer setting for the filesystem here,
  * because the committers are being instantiated in their own processes;
- * the settings in {@link #applyCustomConfigOptions(Configuration)} are
+ * the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are
  * passed down to these processes.
  */
 public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
@@ -54,7 +54,7 @@ public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
    * @param conf configuration
    */
   @Override
-  protected void applyCustomConfigOptions(Configuration conf) {
+  protected void applyCustomConfigOptions(JobConf conf) {
     conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
index 74c1d9d..057adf5 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.fs.s3a.commit.magic;
 
+import java.io.IOException;
+import java.net.URI;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -32,9 +35,8 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.hamcrest.CoreMatchers.containsString;
 
 /**
  * Test the magic committer's commit protocol.
@@ -116,6 +118,25 @@ public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
   }
 
   /**
+   * The magic committer paths are always on S3, and always have
+   * "__magic" in the path.
+   * @param committer committer instance
+   * @param context task attempt context
+   * @throws IOException IO failure
+   */
+  @Override
+  protected void validateTaskAttemptWorkingDirectory(
+      final AbstractS3ACommitter committer,
+      final TaskAttemptContext context) throws IOException {
+    URI wd = committer.getWorkPath().toUri();
+    assertEquals("Wrong schema for working dir " + wd
+        + " with committer " + committer,
+        "s3a", wd.getScheme());
+    assertThat(wd.getPath(),
+        containsString('/' + CommitConstants.MAGIC + '/'));
+  }
+
+  /**
    * The class provides a overridden implementation of commitJobInternal which
    * causes the commit failed for the first time then succeed.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java
new file mode 100644
index 0000000..be477a7
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging.integration;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+/**
+ * This is a test to verify that the committer will fail if the destination
+ * directory exists, and that this happens in job setup.
+ */
+public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
+
+  @Override
+  protected String committerName() {
+    return StagingCommitter.NAME;
+  }
+
+  /**
+   * create the destination directory and expect a failure.
+   * @param conf configuration
+   */
+  @Override
+  protected void applyCustomConfigOptions(JobConf conf) throws IOException {
+    // This is the destination in the S3 FS
+    String outdir = conf.get(FileOutputFormat.OUTDIR);
+    S3AFileSystem fs = getFileSystem();
+    Path outputPath = new Path(outdir);
+    fs.mkdirs(outputPath);
+  }
+
+  @Override
+  public void testMRJob() throws Exception {
+    LambdaTestUtils.intercept(FileAlreadyExistsException.class,
+        "Output directory",
+        super::testMRJob);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a0babf7/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
index 08c572e..180e743 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
@@ -118,6 +118,19 @@ public class ITestStagingCommitProtocol extends AbstractITCommitProtocol {
   }
 
   /**
+   * The staging committers always have the local FS for their work.
+   * @param committer committer instance
+   * @param context task attempt context
+   * @throws IOException IO failure
+   */
+  @Override
+  protected void validateTaskAttemptWorkingDirectory(final AbstractS3ACommitter committer,
+      final TaskAttemptContext context) throws IOException {
+    Path wd = context.getWorkingDirectory();
+    assertEquals("file", wd.toUri().getScheme());
+  }
+
+  /**
    * The class provides a overridden implementation of commitJobInternal which
    * causes the commit failed for the first time then succeed.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org