You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/02/16 03:50:45 UTC

[GitHub] [hadoop] sidseth commented on a change in pull request #2971: MAPREDUCE-7341. Intermediate Manifest Committer

sidseth commented on a change in pull request #2971:
URL: https://github.com/apache/hadoop/pull/2971#discussion_r804257022



##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Minimal subset of google rate limiter class.
+ * Can be used to throttle use of object stores where excess load
+ * will trigger cluster-wide throttling, backoff etc and so collapse
+ * performance.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface RateLimiting {
+
+  /**
+   * Acquire rate limiter capacity.
+   * If there is not enough space, the permits will be acquired,
+   * but the subsequent call will block until the capacity has been
+   * refilled.
+   * @param capacity capacity to acquire.
+   * @return time in milliseconds spent waiting for output.
+   */
+  int acquire(int capacity);

Review comment:
       Should this return a long - if time in milliseconds? Maybe the int is intentional.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java
##########
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageEventCallbacks;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.RateLimitingFactory;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;
+
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.*;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.buildJobUUID;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.getAppAttemptId;
+
+/**
+ * The configuration for the committer as built up from the job configuration
+ * and data passed down from the committer factory.
+ * Isolated for ease of dev/test
+ */
+public final class ManifestCommitterConfig implements IOStatisticsSource {
+
+  /**
+   * Final destination of work.
+   * This is <i>unqualified</i>.
+   */
+  private final Path destinationDir;
+
+  /**
+   * Role: used in log/text messages.
+   */
+  private final String role;

Review comment:
       What's the difference between role and name? Can either be automatically inferred? A bit strange to send these in the constructor.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java
##########
@@ -0,0 +1,751 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperations;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperationsThroughFileSystem;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbortTaskStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitJobStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitTaskStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJobStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageEventCallbacks;
+import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;
+
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_COMPLETED_COUNT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_FAILED_COUNT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_ABORT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.STAGE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.updateCommonContextOnCommitterExit;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.updateCommonContextOnCommitterEntry;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createIOStatisticsStore;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createJobSummaryFilename;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createManifestOutcome;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestPathForTask;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage.cleanupStageOptionsFromConfig;
+
+/**
+ * This is the Intermediate-Manifest committer.
+ * At every entry point it updates the thread's audit context with
+ * the current stage info; this is a placeholder for
+ * adding audit information to stores other than S3A.
+ *
+ * This is tagged as public/stable. This is mandatory
+ * for the classname and PathOutputCommitter implementation
+ * classes.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ManifestCommitter extends PathOutputCommitter implements
+    IOStatisticsSource, StageEventCallbacks {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      ManifestCommitter.class);
+
+  /**
+   * Role: task committer.
+   */
+  public static final String TASK_COMMITTER = "task committer";
+
+  /**
+   * Role: job committer.
+   */
+  public static final String JOB_COMMITTER = "job committer";
+
+  /**
+   * Committer Configuration as extracted from
+   * the job/task context and set in the constructor.
+   */
+  private final ManifestCommitterConfig baseConfig;
+
+  /**
+   * Destination of the job.
+   */
+  private final Path destinationDir;
+
+  /**
+   * For tasks, the attempt directory.
+   * Null for jobs.
+   */
+  private final Path taskAttemptDir;
+
+  /**
+   * IOStatistics to update.
+   */
+  private final IOStatisticsStore iostatistics;
+
+  /**
+   *  The job Manifest Success data; only valid after a job successfully
+   *  commits.
+   */
+  private ManifestSuccessData successReport;
+
+  /**
+   * The active stage; is updated by a callback from within the stages.
+   */
+  private String activeStage;
+
+  /**
+   * The task manifest of the task commit.
+   * Null unless this is a task attempt and the
+   * task has successfully been committed.
+   */
+  private TaskManifest taskAttemptCommittedManifest;
+
+  /**
+   * Create a committer.
+   * @param outputPath output path
+   * @param context job/task context
+   * @throws IOException failure.
+   */
+  ManifestCommitter(final Path outputPath,
+      final TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    this.destinationDir = resolveDestinationDirectory(outputPath,
+        context.getConfiguration());
+    this.iostatistics = createIOStatisticsStore().build();
+    this.baseConfig = enterCommitter(
+        context.getTaskAttemptID() != null,
+        context);
+
+    this.taskAttemptDir = baseConfig.getTaskAttemptDir();
+    LOG.info("Created ManifestCommitter with JobID {},"
+            + " Task Attempt {} and destination {}",
+        context.getJobID(), context.getTaskAttemptID(), outputPath);
+  }
+
+  /**
+   * Committer method invoked; generates a config for it.
+   * Calls {@code #updateCommonContextOnCommitterEntry()}
+   * to update the audit context.
+   * @param isTask is this a task entry point?
+   * @param context context
+   * @return committer config
+   */
+  private ManifestCommitterConfig enterCommitter(boolean isTask,
+      JobContext context) {
+    ManifestCommitterConfig committerConfig =
+        new ManifestCommitterConfig(
+            getOutputPath(),
+            isTask ? TASK_COMMITTER : JOB_COMMITTER,
+            context,
+            iostatistics,
+            this);
+    updateCommonContextOnCommitterEntry(committerConfig);
+    return committerConfig;
+  }
+
+  /**
+   * Set up a job through a {@link SetupJobStage}.
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException IO Failure.
+   */
+  public void setupJob(final JobContext jobContext) throws IOException {
+    ManifestCommitterConfig committerConfig = enterCommitter(false,
+        jobContext);
+    StageConfig stageConfig =
+        committerConfig
+            .createJobStageConfig()
+            .withOperations(createStoreOperations())
+            .build();
+    // set up the job.
+    new SetupJobStage(stageConfig)
+        .apply(committerConfig.getCreateJobMarker());
+    logCommitterStatisticsAtDebug();
+  }
+
+  /**
+   * Set up a task through a {@link SetupTaskStage}.
+   * Classic FileOutputCommitter is a no-op here, relying
+   * on RecordWriters to create the dir implicitly on file
+   * create().
+   * FileOutputCommitter also uses the existence of that
+   * file as a flag to indicate task commit is needed.
+   * @param context task context.
+   * @throws IOException IO Failure.
+   */
+  public void setupTask(final TaskAttemptContext context)

Review comment:
       Nit: Override annotations missing on several methods.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java
##########
@@ -0,0 +1,751 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperations;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperationsThroughFileSystem;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbortTaskStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitJobStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitTaskStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJobStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageEventCallbacks;
+import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;
+
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_COMPLETED_COUNT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_FAILED_COUNT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_ABORT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.STAGE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.updateCommonContextOnCommitterExit;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.updateCommonContextOnCommitterEntry;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createIOStatisticsStore;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createJobSummaryFilename;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createManifestOutcome;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestPathForTask;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage.cleanupStageOptionsFromConfig;
+
+/**
+ * This is the Intermediate-Manifest committer.
+ * At every entry point it updates the thread's audit context with
+ * the current stage info; this is a placeholder for
+ * adding audit information to stores other than S3A.
+ *
+ * This is tagged as public/stable. This is mandatory
+ * for the classname and PathOutputCommitter implementation
+ * classes.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ManifestCommitter extends PathOutputCommitter implements
+    IOStatisticsSource, StageEventCallbacks {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      ManifestCommitter.class);
+
+  /**
+   * Role: task committer.
+   */
+  public static final String TASK_COMMITTER = "task committer";
+
+  /**
+   * Role: job committer.
+   */
+  public static final String JOB_COMMITTER = "job committer";
+
+  /**
+   * Committer Configuration as extracted from
+   * the job/task context and set in the constructor.
+   */
+  private final ManifestCommitterConfig baseConfig;
+
+  /**
+   * Destination of the job.
+   */
+  private final Path destinationDir;
+
+  /**
+   * For tasks, the attempt directory.
+   * Null for jobs.
+   */
+  private final Path taskAttemptDir;
+
+  /**
+   * IOStatistics to update.
+   */
+  private final IOStatisticsStore iostatistics;
+
+  /**
+   *  The job Manifest Success data; only valid after a job successfully
+   *  commits.
+   */
+  private ManifestSuccessData successReport;
+
+  /**
+   * The active stage; is updated by a callback from within the stages.
+   */
+  private String activeStage;
+
+  /**
+   * The task manifest of the task commit.
+   * Null unless this is a task attempt and the
+   * task has successfully been committed.
+   */
+  private TaskManifest taskAttemptCommittedManifest;
+
+  /**
+   * Create a committer.
+   * @param outputPath output path
+   * @param context job/task context
+   * @throws IOException failure.
+   */
+  ManifestCommitter(final Path outputPath,
+      final TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    this.destinationDir = resolveDestinationDirectory(outputPath,
+        context.getConfiguration());
+    this.iostatistics = createIOStatisticsStore().build();
+    this.baseConfig = enterCommitter(
+        context.getTaskAttemptID() != null,
+        context);
+
+    this.taskAttemptDir = baseConfig.getTaskAttemptDir();
+    LOG.info("Created ManifestCommitter with JobID {},"
+            + " Task Attempt {} and destination {}",
+        context.getJobID(), context.getTaskAttemptID(), outputPath);
+  }
+
+  /**
+   * Committer method invoked; generates a config for it.
+   * Calls {@code #updateCommonContextOnCommitterEntry()}
+   * to update the audit context.
+   * @param isTask is this a task entry point?
+   * @param context context
+   * @return committer config
+   */
+  private ManifestCommitterConfig enterCommitter(boolean isTask,
+      JobContext context) {
+    ManifestCommitterConfig committerConfig =
+        new ManifestCommitterConfig(
+            getOutputPath(),
+            isTask ? TASK_COMMITTER : JOB_COMMITTER,
+            context,
+            iostatistics,
+            this);
+    updateCommonContextOnCommitterEntry(committerConfig);
+    return committerConfig;
+  }
+
+  /**
+   * Set up a job through a {@link SetupJobStage}.
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException IO Failure.
+   */
+  public void setupJob(final JobContext jobContext) throws IOException {
+    ManifestCommitterConfig committerConfig = enterCommitter(false,
+        jobContext);
+    StageConfig stageConfig =
+        committerConfig
+            .createJobStageConfig()
+            .withOperations(createStoreOperations())
+            .build();
+    // set up the job.
+    new SetupJobStage(stageConfig)
+        .apply(committerConfig.getCreateJobMarker());
+    logCommitterStatisticsAtDebug();
+  }
+
+  /**
+   * Set up a task through a {@link SetupTaskStage}.
+   * Classic FileOutputCommitter is a no-op here, relying
+   * on RecordWriters to create the dir implicitly on file
+   * create().
+   * FileOutputCommitter also uses the existence of that
+   * file as a flag to indicate task commit is needed.
+   * @param context task context.
+   * @throws IOException IO Failure.
+   */
+  public void setupTask(final TaskAttemptContext context)
+      throws IOException {
+    ManifestCommitterConfig committerConfig =
+        enterCommitter(true, context);
+    StageConfig stageConfig =
+        committerConfig
+            .createJobStageConfig()
+            .withOperations(createStoreOperations())
+            .build();
+    // create task attempt dir; delete if present. Or fail?
+    new SetupTaskStage(stageConfig).apply("");
+    logCommitterStatisticsAtDebug();
+  }
+
+  /**
+   * Always return true.
+   * This way, even if there is no output, stats are collected.
+   * @param context task context.
+   * @return true
+   * @throws IOException IO Failure.
+   */
+  public boolean needsTaskCommit(final TaskAttemptContext context)
+      throws IOException {
+    LOG.info("Probe for needsTaskCommit({})",
+        context.getTaskAttemptID());
+    return true;
+  }
+
+  /**
+   * Failure during Job Commit is not recoverable from.
+   *
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return false, always
+   * @throws IOException never
+   */
+  @Override
+  public boolean isCommitJobRepeatable(final JobContext jobContext)
+      throws IOException {
+    LOG.info("Probe for isCommitJobRepeatable({}): returning false",
+        jobContext.getJobID());
+    return false;
+  }
+
+  /**
+   * Declare that task recovery is not supported.
+   * It would be, if someone added the code *and tests*.
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return false, always
+   * @throws IOException never
+   */
+  @Override
+  public boolean isRecoverySupported(final JobContext jobContext)
+      throws IOException {
+    LOG.info("Probe for isRecoverySupported({}): returning false",
+        jobContext.getJobID());
+    return false;
+  }
+
+  /**
+   *
+   * @param taskContext Context of the task whose output is being recovered
+   * @throws IOException always
+   */
+  @Override
+  public void recoverTask(final TaskAttemptContext taskContext)
+      throws IOException {
+    LOG.warn("Rejecting recoverTask({}) call", taskContext.getTaskAttemptID());
+    throw new IOException("Cannot recover task "
+        + taskContext.getTaskAttemptID());
+  }
+
+  /**
+   * Commit the task.
+   * This is where the task attempt tree list takes place.
+   * @param context task context.
+   * @throws IOException IO Failure.
+   */
+  public void commitTask(final TaskAttemptContext context)
+      throws IOException {
+    ManifestCommitterConfig committerConfig = enterCommitter(true,
+        context);
+    try {
+      StageConfig stageConfig = committerConfig.createJobStageConfig()
+          .withOperations(createStoreOperations())
+          .build();
+      taskAttemptCommittedManifest = new CommitTaskStage(stageConfig)
+          .apply(null).getTaskManifest();
+      iostatistics.incrementCounter(COMMITTER_TASKS_COMPLETED_COUNT, 1);
+    } catch (IOException e) {
+      iostatistics.incrementCounter(COMMITTER_TASKS_FAILED_COUNT, 1);
+      throw e;
+    } finally {
+      logCommitterStatisticsAtDebug();
+      updateCommonContextOnCommitterExit();
+    }
+
+  }
+
+  /**
+   * Abort a task.
+   * @param context task context
+   * @throws IOException failure during the delete
+   */
+  public void abortTask(final TaskAttemptContext context)

Review comment:
       Question: Is this ever called outside of the task itself (e.g. from drivers / AMs)

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/StoreOperations.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * FileSystem operations which are needed to generate the task manifest.
+ * The specific choice of which implementation to use is configurable.
+ *
+ * If a store provides a custom file commit operation then it must
+ * offer a subclass of this which returns true in
+ * {@link #storeSupportsResilientCommit()}
+ * and then implement
+ * {@link #commitFile(FileOrDirEntry)}.
+ *
+ */

Review comment:
       interfacestability, audience, etc despite being defined at the package level.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobCommitStage.java
##########
@@ -0,0 +1,1009 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperations;
+import org.apache.hadoop.util.OperationDuration;
+import org.apache.hadoop.util.Preconditions;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_IS_FILE;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_STATUS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_MKDIRS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_RENAME;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.createTracker;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.IO_ACQUIRE_READ_PERMIT_BLOCKED;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.IO_ACQUIRE_WRITE_PERMIT_BLOCKED;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MSYNC;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.enterStageWorker;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_GET_FILE_STATUS;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_LIST;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_OPEN_FILE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_COMMIT_FILE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_CREATE_FILE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_DELETE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_MKDIR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_RENAME;
+
+/**
+ * A Stage in Task/Job Commit.
+ * A stage can be executed once only, creating the return value of the
+ * {@link #apply(Object)} method, and, potentially, updating the state of the
+ * store via {@link StoreOperations}.
+ * IOStatistics will also be updated.
+ * Stages are expected to be combined to form the commit protocol.
+ * @param <IN> Type of arguments to the stage.
+ * @param <OUT> Type of result.
+ */
+public abstract class AbstractJobCommitStage<IN, OUT>
+    implements JobStage<IN, OUT> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbstractJobCommitStage.class);
+
+  /**
+   * Error text on rename failure: {@value}.
+   */
+  public static final String FAILED_TO_RENAME = "Failed to ";
+
+  /**
+   * Error text on when clean up is to trash, but
+   * the FS has trash disabled: {@value}.
+   */
+  public static final String E_TRASH_DISABLED = "Trash is disabled";
+
+  /**
+   * Is this a task stage? If so, toString() includes task
+   * info..
+   */
+  private final boolean isTaskStage;
+
+  /**
+   * Configuration of all the stages in the ongoing committer
+   * operation.
+   */
+  private final StageConfig stageConfig;
+
+  /**
+   * Name of the stage for statistics and logging.
+   */
+  private final String stageStatisticName;
+
+  /**
+   * Callbacks to update store.
+   * This is not made visible to the stages; they must
+   * go through the wrapper classes in this class, which
+   * add statistics and logging.
+   */
+  private final StoreOperations operations;
+
+  /**
+   * Submitter for doing IO against the store.
+   */
+  private final TaskPool.Submitter ioProcessors;
+
+  /**
+   * Used to stop any re-entrancy of the rename.
+   * This is an execute-once operation.
+   */
+  private final AtomicBoolean executed = new AtomicBoolean(false);
+
+  /**
+   * Tracker of the duration of the execution of the stage.
+   * set after {@link #executeStage(Object)} completes.
+   */
+  private DurationTracker stageExecutionTracker;
+
+  /**
+   * Name for logging.
+   */
+  private final String name;
+
+  /**
+   * Constructor.
+   * @param isTaskStage Is this a task stage?
+   * @param stageConfig stage-independent configuration.
+   * @param stageStatisticName name of the stage for statistics/logging
+   * @param requireIOProcessors are the IO processors required?
+   */
+  protected AbstractJobCommitStage(
+      final boolean isTaskStage,
+      final StageConfig stageConfig,
+      final String stageStatisticName,
+      final boolean requireIOProcessors) {
+    this.isTaskStage = isTaskStage;
+    this.stageStatisticName = stageStatisticName;
+    this.stageConfig = stageConfig;
+    requireNonNull(stageConfig.getDestinationDir(), "Destination Directory");
+    requireNonNull(stageConfig.getJobId(), "Job ID");
+    requireNonNull(stageConfig.getJobAttemptDir(), "Job attempt directory");
+    this.operations = requireNonNull(stageConfig.getOperations(),
+        "Operations callbacks");
+    // and the processors of work if required.
+    this.ioProcessors = bindProcessor(
+        requireIOProcessors,
+        stageConfig.getIoProcessors());
+    String stageName;
+    if (isTaskStage) {
+      // force fast failure.
+      getRequiredTaskId();
+      getRequiredTaskAttemptId();
+      getRequiredTaskAttemptDir();
+      stageName = String.format("[Task-Attempt %s]", getRequiredTaskAttemptId());

Review comment:
       Should these names also reflect the actual stage ... not just task vs job ?

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/RenameFilesStage.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER_FILE_LIMIT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_RENAME_FILES;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createManifestOutcome;
+import static org.apache.hadoop.thirdparty.com.google.common.collect.Iterables.concat;
+
+/**
+ * This stage renames all the files.
+ * It retuns a manifest success data file summarizing the
+ * output, but does not add iostatistics to it.
+ */
+public class RenameFilesStage extends
+    AbstractJobCommitStage<List<TaskManifest>, ManifestSuccessData> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      RenameFilesStage.class);
+
+  /**
+   * List of all files committed.
+   */
+  private final List<FileOrDirEntry> filesCommitted = new ArrayList<>();
+
+  /**
+   * Total file size.
+   */
+  private long totalFileSize = 0;
+
+  public RenameFilesStage(final StageConfig stageConfig) {
+    super(false, stageConfig, OP_STAGE_JOB_RENAME_FILES, true);
+  }
+
+  /**
+   * Get the list of files committed.
+   * Access is not synchronized.
+   * @return direct access to the list of files.
+   */
+  public synchronized  List<FileOrDirEntry> getFilesCommitted() {
+    return filesCommitted;
+  }
+
+  /**
+   * Get the total file size of the committed task.
+   * @return a number greater than or equal to zero.
+   */
+  public synchronized long getTotalFileSize() {
+    return totalFileSize;
+  }
+
+  /**
+   * Rename files in job commit.
+   * @param taskManifests a list of task manifests containing files.
+   * @return the job report.
+   * @throws IOException failure
+   */
+  @Override
+  protected ManifestSuccessData executeStage(

Review comment:
       As a future enhancement... would it make sense to try rationalizing the files to be moved with an aim to move entire directories instead of individual files?
   e.g. ${outputDir}/_temporary/${jobId}/0/taskAttempt01-1/actual-dir/[100s of files]
   If the target - presumably always `${outputDir}/actual-dir` does not exist - move ${outputDir}/_temporary/${jobId}/0/taskAttempt01-1/actual-dir/ to ${outputDir}/_temporary/${jobId}/0/taskAttempt01-1/actual-dir instead of moving the files under it individually.
   
   Actually, thinking some more about this - if task directories have the same output dirs created, this is pointless, which is quite likely.
   This, then comes to - can taskCommit actually move files to the destination (or in-fact write the files straight to the destination)? I think the answer to this is also a no. There's a risk of the AM/driver not hearing back from a commitTask, after a task commits, and triggering another attempt of the same task.
   
   This, however, is almost equivalent to writing out the manifest file as part of the commit. We end up with a single manifest per task-attempt because the commit renames it to be based on the taskId instead of the taskAttemptId, and this manifest contains information about the actual files to be moved (can think of at least one scenario where this potentially as well - fairly special set of circumstances).
   
   Don't think this facilitates tasks moving files to a common (not task specific) location, or does it (files generated in task attempt locations will likely have the same name ... so if 2 tasks somehow end up being told to commit, they could end up clobbering each others files ... moving the manifest, however, is a single op - so that is why the approach works)

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/RenameFilesStage.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER_FILE_LIMIT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_RENAME_FILES;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createManifestOutcome;
+import static org.apache.hadoop.thirdparty.com.google.common.collect.Iterables.concat;
+
+/**
+ * This stage renames all the files.
+ * It retuns a manifest success data file summarizing the
+ * output, but does not add iostatistics to it.
+ */
+public class RenameFilesStage extends
+    AbstractJobCommitStage<List<TaskManifest>, ManifestSuccessData> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      RenameFilesStage.class);
+
+  /**
+   * List of all files committed.
+   */
+  private final List<FileOrDirEntry> filesCommitted = new ArrayList<>();
+
+  /**
+   * Total file size.
+   */
+  private long totalFileSize = 0;
+
+  public RenameFilesStage(final StageConfig stageConfig) {
+    super(false, stageConfig, OP_STAGE_JOB_RENAME_FILES, true);
+  }
+
+  /**
+   * Get the list of files committed.
+   * Access is not synchronized.
+   * @return direct access to the list of files.
+   */
+  public synchronized  List<FileOrDirEntry> getFilesCommitted() {
+    return filesCommitted;
+  }
+
+  /**
+   * Get the total file size of the committed task.
+   * @return a number greater than or equal to zero.
+   */
+  public synchronized long getTotalFileSize() {
+    return totalFileSize;
+  }
+
+  /**
+   * Rename files in job commit.
+   * @param taskManifests a list of task manifests containing files.
+   * @return the job report.
+   * @throws IOException failure
+   */
+  @Override
+  protected ManifestSuccessData executeStage(
+      final List<TaskManifest> taskManifests)
+      throws IOException {
+
+    final ManifestSuccessData success = createManifestOutcome(getStageConfig(),
+        OP_STAGE_JOB_COMMIT);
+    final int manifestCount = taskManifests.size();
+
+    LOG.info("{}: Executing Manifest Job Commit with {} manifests in {}",
+        getName(), manifestCount, getTaskManifestDir());
+
+    // first step is to aggregate the output of all manifests into a single
+    // list of files to commit.
+    // Which Guava can do in a zero-copy concatenated iterator
+
+    final Iterable<FileOrDirEntry> filesToCommit = concat(taskManifests.stream()
+        .map(TaskManifest::getFilesToCommit)
+        .collect(Collectors.toList()));
+
+    TaskPool.foreach(filesToCommit)
+        .executeWith(getIOProcessors())
+        .stopOnFailure()
+        .run(this::commitOneFile);
+
+    // synchronized block to keep spotbugs happy.
+    List<FileOrDirEntry> committed = getFilesCommitted();
+    LOG.info("{}: Files committed: {}. Total size {}",
+        getName(), committed.size(), getTotalFileSize());
+
+    // Add a subset of the destination files to the success file;
+    // enough for simple testing
+    success.getFilenames().addAll(

Review comment:
       Maybe an explicit add method, instead of adding the private list inside of ManifestSuccessData.

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
##########
@@ -450,12 +451,37 @@ public static void maybeUpdateMinimum(AtomicLong dest, long sample) {
    * @param factory factory of duration trackers
    * @param statistic statistic key
    * @param input input callable.
+   * @throws IOException IO failure.
    */
   public static void trackDurationOfInvocation(
       DurationTrackerFactory factory,
       String statistic,
       InvocationRaisingIOE input) throws IOException {
 
+    measureDurationOfInvocation(factory, statistic, input);
+  }
+
+  /**
+   * Given an IOException raising callable/lambda expression,
+   * execute it and update the relevant statistic,
+   * returning the measured duration.
+   *
+   * {@link #trackDurationOfInvocation(DurationTrackerFactory, String, InvocationRaisingIOE)}
+   * with the duration returned for logging etc.; added as a new
+   * method to avoid linking problems with any code calling the existing
+   * method.
+   *
+   * @param factory factory of duration trackers
+   * @param statistic statistic key
+   * @param input input callable.
+   * @return the duration of the operation, as measured by the duration tracker.
+   * @throws IOException IO failure.
+   */
+  public static Duration measureDurationOfInvocation(

Review comment:
       Should this be deprecating the equivalent method which returns a void?

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobCommitStage.java
##########
@@ -0,0 +1,1009 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperations;
+import org.apache.hadoop.util.OperationDuration;
+import org.apache.hadoop.util.Preconditions;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_IS_FILE;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_STATUS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_MKDIRS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_RENAME;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.createTracker;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.IO_ACQUIRE_READ_PERMIT_BLOCKED;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.IO_ACQUIRE_WRITE_PERMIT_BLOCKED;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MSYNC;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.enterStageWorker;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_GET_FILE_STATUS;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_LIST;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_OPEN_FILE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_COMMIT_FILE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_CREATE_FILE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_DELETE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_MKDIR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_RENAME;
+
+/**
+ * A Stage in Task/Job Commit.
+ * A stage can be executed once only, creating the return value of the
+ * {@link #apply(Object)} method, and, potentially, updating the state of the
+ * store via {@link StoreOperations}.
+ * IOStatistics will also be updated.
+ * Stages are expected to be combined to form the commit protocol.
+ * @param <IN> Type of arguments to the stage.
+ * @param <OUT> Type of result.
+ */
+public abstract class AbstractJobCommitStage<IN, OUT>
+    implements JobStage<IN, OUT> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbstractJobCommitStage.class);
+
+  /**
+   * Error text on rename failure: {@value}.
+   */
+  public static final String FAILED_TO_RENAME = "Failed to ";
+
+  /**
+   * Error text on when clean up is to trash, but
+   * the FS has trash disabled: {@value}.
+   */
+  public static final String E_TRASH_DISABLED = "Trash is disabled";
+
+  /**
+   * Is this a task stage? If so, toString() includes task
+   * info..
+   */
+  private final boolean isTaskStage;
+
+  /**
+   * Configuration of all the stages in the ongoing committer
+   * operation.
+   */
+  private final StageConfig stageConfig;
+
+  /**
+   * Name of the stage for statistics and logging.
+   */
+  private final String stageStatisticName;
+
+  /**
+   * Callbacks to update store.
+   * This is not made visible to the stages; they must
+   * go through the wrapper classes in this class, which
+   * add statistics and logging.
+   */
+  private final StoreOperations operations;
+
+  /**
+   * Submitter for doing IO against the store.
+   */
+  private final TaskPool.Submitter ioProcessors;
+
+  /**
+   * Used to stop any re-entrancy of the rename.
+   * This is an execute-once operation.
+   */
+  private final AtomicBoolean executed = new AtomicBoolean(false);
+
+  /**
+   * Tracker of the duration of the execution of the stage.
+   * set after {@link #executeStage(Object)} completes.
+   */
+  private DurationTracker stageExecutionTracker;
+
+  /**
+   * Name for logging.
+   */
+  private final String name;
+
+  /**
+   * Constructor.
+   * @param isTaskStage Is this a task stage?
+   * @param stageConfig stage-independent configuration.
+   * @param stageStatisticName name of the stage for statistics/logging
+   * @param requireIOProcessors are the IO processors required?
+   */
+  protected AbstractJobCommitStage(
+      final boolean isTaskStage,
+      final StageConfig stageConfig,
+      final String stageStatisticName,
+      final boolean requireIOProcessors) {
+    this.isTaskStage = isTaskStage;
+    this.stageStatisticName = stageStatisticName;
+    this.stageConfig = stageConfig;
+    requireNonNull(stageConfig.getDestinationDir(), "Destination Directory");
+    requireNonNull(stageConfig.getJobId(), "Job ID");
+    requireNonNull(stageConfig.getJobAttemptDir(), "Job attempt directory");
+    this.operations = requireNonNull(stageConfig.getOperations(),
+        "Operations callbacks");
+    // and the processors of work if required.
+    this.ioProcessors = bindProcessor(
+        requireIOProcessors,
+        stageConfig.getIoProcessors());
+    String stageName;
+    if (isTaskStage) {
+      // force fast failure.
+      getRequiredTaskId();
+      getRequiredTaskAttemptId();
+      getRequiredTaskAttemptDir();
+      stageName = String.format("[Task-Attempt %s]", getRequiredTaskAttemptId());
+    } else  {
+      stageName = String.format("[Job-Attempt %s/%02d]",
+          stageConfig.getJobId(),
+          stageConfig.getJobAttemptNumber());
+    }
+    name = stageName;
+  }
+
+  /**
+   * Bind to the processor if it is required.
+   * @param required is the processor required?
+   * @param processor processor
+   * @return the processor binding
+   * @throws NullPointerException if required == true and processor is null.
+   */
+  private TaskPool.Submitter bindProcessor(
+      final boolean required,
+      final TaskPool.Submitter processor) {
+    return required
+        ? requireNonNull(processor, "required IO processor is null")
+        : null;
+  }
+
+  /**
+   * Stage entry point.
+   * Verifies that this is the first and only time the stage is invoked,
+   * then calls {@link #executeStage(Object)} for the subclass
+   * to perform its part of the commit protocol.
+   * The duration of the stage is collected as a statistic, and its
+   * entry/exit logged at INFO.
+   * @param arguments arguments to the function.
+   * @return the result.
+   * @throws IOException failures.
+   */
+  @Override
+  public final OUT apply(final IN arguments) throws IOException {
+    executeOnlyOnce();
+    progress();
+    String stageName = getStageName(arguments);
+    getStageConfig().enterStage(stageName);
+    String statisticName = getStageStatisticName(arguments);
+    // The tracker here
+    LOG.info("{}: Executing Stage {}", getName(), stageName);
+    stageExecutionTracker = createTracker(getIOStatistics(), statisticName);
+    try {
+      // exec the input function and return its value
+      final OUT out = executeStage(arguments);
+      LOG.info("{}: Stage {} completed after {}",
+          getName(),
+          stageName,
+          OperationDuration.humanTime(
+              stageExecutionTracker.asDuration().toMillis()));
+      return out;
+    } catch (IOException | RuntimeException e) {
+      LOG.error("{}: Stage {} failed: after {}: {}",
+          getName(),
+          stageName,
+          OperationDuration.humanTime(
+              stageExecutionTracker.asDuration().toMillis()),
+          e.toString());
+      LOG.debug("{}: Stage failure:", getName(), e);
+      // input function failed: note it
+      stageExecutionTracker.failed();
+      // and rethrow
+      throw e;
+    } finally {
+      // update the tracker.
+      // this is called after the catch() call would have
+      // set the failed flag.
+      stageExecutionTracker.close();
+      progress();
+      getStageConfig().exitStage(stageName);
+    }
+  }
+
+  /**
+   * The work of a stage.
+   * Executed exactly once.
+   * @param arguments arguments to the function.
+   * @return the result.
+   * @throws IOException failures.
+   */
+  protected abstract OUT executeStage(IN arguments) throws IOException;
+
+  /**
+   * Check that the operation has not been invoked twice.
+   * This is an atomic check.
+   * @throws IllegalStateException on a second invocation.
+   */
+  private void executeOnlyOnce() {
+    Preconditions.checkState(
+        !executed.getAndSet(true),
+        "Stage attempted twice");
+  }
+
+  /**
+   * The stage statistic name.
+   * @param arguments args to the invocation.
+   * @return stage name.
+   */
+  protected String getStageStatisticName(IN arguments) {
+    return stageStatisticName;
+  }
+
+  /**
+   * Stage name for reporting; defaults to
+   * call {@link #getStageStatisticName(IN)}.
+   * @param arguments args to the invocation.
+   * @return name used in updating reports.
+   */
+  protected String getStageName(IN arguments) {
+    return getStageStatisticName(arguments);
+  }
+
+  /**
+   * Get the execution tracker; non-null
+   * after stage execution.
+   * @return a tracker or null.
+   */
+  public DurationTracker getStageExecutionTracker() {
+    return stageExecutionTracker;
+  }
+
+  /**
+   * Adds the duration of the job to an IOStatistics store
+   * (such as the manifest to be saved).
+   * @param iostats store
+   * @param statistic statistic name.
+   */
+  public void addExecutionDurationToStatistics(IOStatisticsStore iostats,
+      String statistic) {
+    iostats.addTimedOperation(
+        statistic,
+        getStageExecutionTracker().asDuration());
+  }
+
+  /**
+   * Acquire a given number of read permits.
+   * The subsequent caller will block if the rate
+   * limiter mandates it.
+   * no-op if (in test setups) there's no rate limiter.
+   * Acquisition duration is added to stats when rate-limited.
+   * @param permits permit count.
+   */
+  public void acquireReadPermits(int permits) {
+    noteAnyRateLimiting(IO_ACQUIRE_READ_PERMIT_BLOCKED,
+        stageConfig.acquireReadPermits(permits));
+  }
+
+  /**
+   * Acquire a given number of write permits.
+   * The subsequent caller will block if the rate
+   * limiter mandates it.
+   * no-op if (in test setups) there's no rate limiter.
+   * Acquisition duration is added to stats when rate-limited.
+   * @param permits permit count.
+   */
+  public void acquireWritePermits(int permits) {
+    noteAnyRateLimiting(IO_ACQUIRE_WRITE_PERMIT_BLOCKED,
+        stageConfig.acquireWritePermits(permits));
+  }
+
+  /**
+   * Note any rate limiting to the given timing statistic.
+   * If the wait was 0, no statistics are updated.
+   * @param statistic statistic key.
+   * @param wait wait time in seconds.
+   */
+  private void noteAnyRateLimiting(String statistic, int wait) {
+    if (wait > 0) {
+      // rate limiting took place
+      getIOStatistics().addTimedOperation(
+          statistic,
+          wait);
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "AbstractJobCommitStage{");
+    sb.append(isTaskStage ? "Task Stage" : "Job Stage");
+    sb.append(" name='").append(name).append('\'');
+    sb.append(" stage='").append(stageStatisticName).append('\'');
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * The stage configuration.
+   * @return the stage configuration used by this stage.
+   */
+  protected StageConfig getStageConfig() {
+    return stageConfig;
+  }
+
+  /**
+   * Update the thread context with the stage name and
+   * job ID.
+   * This MUST be invoked at the start of methods invoked in helper threads,
+   * to ensure that they are all annotated with job and stage.
+   * @param stage stage name.
+   */
+  protected void updateAuditContext(final String stage) {
+    enterStageWorker(stageConfig.getJobId(), stage);
+  }
+
+  /**
+   * The IOStatistics are shared across all uses of the
+   * StageConfig.
+   * @return the (possibly shared) IOStatistics.
+   */
+  @Override
+  public final IOStatisticsStore getIOStatistics() {
+    return stageConfig.getIOStatistics();
+  }
+
+  /**
+   * Call progress() on any Progressable passed in.
+   */
+  protected final void progress() {
+    if (stageConfig.getProgressable() != null) {
+      stageConfig.getProgressable().progress();
+    }
+  }
+
+  /**
+   * Get a file status value or, if the path doesn't exist, return null.
+   * @param path path
+   * @return status or null
+   * @throws IOException IO Failure.
+   */
+  protected final FileStatus getFileStatusOrNull(
+      final Path path)
+      throws IOException {
+    try {
+      return getFileStatus(path);
+    } catch (FileNotFoundException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Get a file status value or, if the path doesn't exist, return null.
+   * @param path path
+   * @return status or null
+   * @throws IOException IO Failure.
+   */
+  protected final FileStatus getFileStatus(
+      final Path path)
+      throws IOException {
+    LOG.trace("{}: getFileStatus('{}')", getName(), path);
+    requireNonNull(path,
+        () -> String.format("%s: Null path for getFileStatus() call", getName()));
+    acquireReadPermits(PERMIT_READ_GET_FILE_STATUS);
+    return trackDuration(getIOStatistics(), OP_GET_FILE_STATUS, () ->
+        operations.getFileStatus(path));
+  }
+
+  /**
+   * Get a file status value or, if the path doesn't exist, return null.
+   * @param path path
+   * @return true if the path resolves to a file
+   * @throws IOException IO Failure.
+   */
+  protected final boolean isFile(
+      final Path path)
+      throws IOException {
+    LOG.trace("{}: isFile('{}')", getName(), path);
+    return trackDuration(getIOStatistics(), OP_IS_FILE, () -> {
+      acquireReadPermits(PERMIT_READ_GET_FILE_STATUS);
+      return operations.isFile(path);
+    });
+  }
+
+  /**
+   * Delete a path.
+   * @param path path
+   * @param recursive recursive delete.
+   * @return status or null
+   * @throws IOException IO Failure.
+   */
+  protected final boolean delete(
+      final Path path,
+      final boolean recursive)
+      throws IOException {
+    LOG.trace("{}: delete('{}, {}')", getName(), path, recursive);
+    return delete(path, recursive, OP_DELETE);
+  }
+
+  /**
+   * Delete a path.
+   * @param path path
+   * @param recursive recursive delete.
+   * @param statistic statistic to update
+   * @return status or null
+   * @throws IOException IO Failure.
+   */
+  protected Boolean delete(

Review comment:
       Little confused by all the filesystem operations here, and then also the StoreOperations ... i.e. why so much layering.

##########
File path: hadoop-tools/hadoop-azure/pom.xml
##########
@@ -219,6 +219,58 @@
       <scope>test</scope>
     </dependency>
 
+    <!--
+    the mapreduce-client-core module is compiled against for the
+    manifest committer support. It is not needed on the classpath
+    except when using this committer, so it only tagged as
+    "provided".
+    -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>

Review comment:
       Likewise for yarn libraries.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestCommitterSupport.java
##########
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.IOException;
+import java.time.ZonedDateTime;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.EtagSource;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStoreBuilder;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.PENDING_DIR_NAME;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.INITIAL_APP_ATTEMPT_ID;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.JOB_ATTEMPT_DIR_FORMAT_STR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.JOB_DIR_FORMAT_STR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.JOB_ID_SOURCE_MAPREDUCE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.JOB_TASK_ATTEMPT_SUBDIR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.JOB_TASK_MANIFEST_SUBDIR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_COMMITTER_CLASSNAME;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SPARK_WRITE_UUID;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUMMARY_FILENAME_FORMAT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.TMP_SUFFIX;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.PRINCIPAL;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.STAGE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.COUNTER_STATISTICS;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.DURATION_STATISTICS;
+
+/**
+ * Class for manifest committer support util methods.
+ */
+public final class ManifestCommitterSupport {
+
+  private ManifestCommitterSupport() {
+  }
+
+  /**
+   * Create an IOStatistics Store with the standard statistics
+   * set up.
+   * @return a store builder preconfigured with the standard stats.
+   */
+  public static IOStatisticsStoreBuilder createIOStatisticsStore() {
+
+    final IOStatisticsStoreBuilder store
+        = iostatisticsStore();
+
+    store.withCounters(COUNTER_STATISTICS);
+    store.withMaximums(COUNTER_STATISTICS);
+    store.withMinimums(COUNTER_STATISTICS);
+    store.withMeanStatistics(COUNTER_STATISTICS);
+    store.withDurationTracking(DURATION_STATISTICS);
+    return store;
+  }
+
+  /**
+   * If the object is an IOStatisticsSource, get and add
+   * its IOStatistics.
+   * @param o source object.
+   */
+  public static void maybeAddIOStatistics(IOStatisticsAggregator ios,
+      Object o) {
+    if (o instanceof IOStatisticsSource) {
+      ios.aggregate(((IOStatisticsSource) o).getIOStatistics());
+    }
+  }
+
+  /**
+   * Build a Job UUID from the job conf (if it is
+   * {@link ManifestCommitterConstants#SPARK_WRITE_UUID}
+   * or the MR job ID.
+   * @param conf job/task configuration
+   * @param jobId job ID from YARN or spark.
+   * @return (a job ID, source)
+   */
+  public static Pair<String, String> buildJobUUID(Configuration conf,
+      JobID jobId) {
+    String jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, "");

Review comment:
       SPARK_WRITE_UUID <- seems a little strange for a Spark constant to leak into the committer. 

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Minimal subset of google rate limiter class.
+ * Can be used to throttle use of object stores where excess load
+ * will trigger cluster-wide throttling, backoff etc and so collapse
+ * performance.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface RateLimiting {

Review comment:
       General usage of this in the patch: Should something like this be used within the FileSystem implementation itself, potentially shared across multiple instances pointing to the same FileSystem, instead of by the individual commit operations.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterFactory.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
+
+/**
+ * This is the committer factory to register as the source of committers
+ * for the job/filesystem schema.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ManifestCommitterFactory extends PathOutputCommitterFactory {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      ManifestCommitterFactory.class);
+
+  /**
+   * Name of this factory.
+   */
+  public static final String NAME

Review comment:
       Can this refer to the class itself?
   ManifestCommitterFactory.class.get*
   
   If not - may be worth adding a comment on the reason. This is very easy to change as a 'trivial' change while fixing something else.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java
##########
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageEventCallbacks;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.RateLimitingFactory;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;
+
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.*;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.buildJobUUID;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.getAppAttemptId;
+
+/**
+ * The configuration for the committer as built up from the job configuration
+ * and data passed down from the committer factory.
+ * Isolated for ease of dev/test
+ */
+public final class ManifestCommitterConfig implements IOStatisticsSource {
+
+  /**
+   * Final destination of work.
+   * This is <i>unqualified</i>.
+   */
+  private final Path destinationDir;
+
+  /**
+   * Role: used in log/text messages.
+   */
+  private final String role;
+
+  /**
+   * This is the directory for all intermediate work: where the output
+   * format will write data.
+   * Will be null if built from a job context.
+   */
+  private final Path taskAttemptDir;
+
+  /** Configuration of the job. */
+  private final Configuration conf;
+
+  /** The job context. For a task, this can be cast to a TaskContext. */
+  private final JobContext jobContext;
+
+  /** Should a job marker be created? */
+  private final boolean createJobMarker;
+
+  /**
+   * Job ID Or UUID -without any attempt suffix.
+   * This is expected/required to be unique, though
+   * Spark has had "issues" there until recently
+   * with lack of uniqueness of generated MR Job IDs.
+   */
+  private final String jobUniqueId;
+
+  /**
+   * Where did the job Unique ID come from?
+   */
+  private final String jobUniqueIdSource;
+
+  /**
+   * Number of this attempt; starts at zero.
+   */
+  private final int jobAttemptNumber;
+
+  /**
+   * Job ID + AttemptID.
+   */
+  private final String jobAttemptId;
+
+  /**
+   * Task ID: used as the filename of the manifest.
+   * Will be "" if built from a job context.
+   */
+  private final String taskId;
+
+  /**
+   * Task attempt ID. Determines the working
+   * directory for task attempts to write data into,
+   * and for the task committer to scan.
+   * Will be "" if built from a job context.
+   */
+  private final String taskAttemptId;
+
+  /** Any progressable for progress callbacks. */
+  private final Progressable progressable;
+
+  /**
+   * IOStatistics to update.
+   */
+  private final IOStatisticsStore iostatistics;
+
+
+  /** Should the output be validated after the commit? */
+  private final boolean validateOutput;
+
+  /**
+   * Attempt directory management.
+   */
+  private final ManifestCommitterSupport.AttemptDirectories dirs;
+
+  /**
+   * Callback when a stage is entered.
+   */
+  private final StageEventCallbacks stageEventCallbacks;
+
+  /**
+   * Read operation rate limit per second..
+   */
+  private final int iopsRate;
+
+  /**
+   * Name for logging.
+   */
+  private final String name;
+
+  /**
+   * Delete target paths on commit? Stricter, but
+   * higher IO cost.
+   */
+  private final boolean deleteTargetPaths;
+
+  /**
+   * Prepare parent dirs by scanning them for files?
+   */
+  private final boolean prepareParentDirectories;
+
+  /**
+   * Constructor.
+   * @param outputPath destination path of the job.
+   * @param role role for log messages.
+   * @param context job/task context
+   * @param iostatistics IO Statistics
+   * @param stageEventCallbacks stage event callbacks.
+   */
+
+  ManifestCommitterConfig(
+      final Path outputPath,
+      final String role,
+      final JobContext context,
+      final IOStatisticsStore iostatistics,
+      final StageEventCallbacks stageEventCallbacks) {
+    this.role = role;
+    this.jobContext = context;
+    this.conf = context.getConfiguration();
+    this.destinationDir = outputPath;
+    this.iostatistics = iostatistics;
+    this.stageEventCallbacks = stageEventCallbacks;
+
+    Pair<String, String> pair = buildJobUUID(conf, context.getJobID());
+    this.jobUniqueId = pair.getLeft();
+    this.jobUniqueIdSource = pair.getRight();
+    this.jobAttemptNumber = getAppAttemptId(context);
+    this.jobAttemptId = this.jobUniqueId + "_" + jobAttemptNumber;
+
+    // build directories
+    this.dirs = new ManifestCommitterSupport.AttemptDirectories(outputPath,
+        this.jobUniqueId, jobAttemptNumber);
+
+    // read in configuration options
+    this.createJobMarker = conf.getBoolean(
+        SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+        DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER);
+    this.validateOutput = conf.getBoolean(
+        OPT_VALIDATE_OUTPUT,
+        OPT_VALIDATE_OUTPUT_DEFAULT);
+    this.deleteTargetPaths = conf.getBoolean(
+        OPT_PREPARE_TARGET_FILES,
+        OPT_VALIDATE_OUTPUT_DEFAULT);
+    this.prepareParentDirectories = conf.getBoolean(
+        OPT_PREPARE_PARENT_DIRECTORIES,
+        OPT_PREPARE_PARENT_DIRECTORIES_DEFAULT);
+
+
+    this.iopsRate = conf.getInt(OPT_IO_RATE, OPT_IO_RATE_DEFAULT);
+
+    // if constructed with a task attempt, build the task ID and path.
+    if (context instanceof TaskAttemptContext) {
+      // it's a task
+      final TaskAttemptContext tac = (TaskAttemptContext) context;
+      TaskAttemptID taskAttempt = Objects.requireNonNull(
+          tac.getTaskAttemptID());
+      taskAttemptId = taskAttempt.toString();
+      taskId = taskAttempt.getTaskID().toString();
+      // Task attempt dir; must be different across instances
+      taskAttemptDir = dirs.getTaskAttemptPath(taskAttemptId);
+      // the context is also the progress callback.
+      progressable = tac;
+      name = String.format(InternalConstants.NAME_FORMAT_TASK_ATTEMPT, taskAttemptId);
+
+    } else {
+      // it's a job
+      taskId = "";
+      taskAttemptId = "";
+      taskAttemptDir = null;
+      progressable = null;
+      name = String.format(InternalConstants.NAME_FORMAT_JOB_ATTEMPT, jobAttemptId);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "ManifestCommitterConfig{" +
+        "name=" + name +
+        ", destinationDir=" + destinationDir +
+        ", role='" + role + '\'' +
+        ", taskAttemptDir=" + taskAttemptDir +
+        ", createJobMarker=" + createJobMarker +
+        ", jobUniqueId='" + jobUniqueId + '\'' +
+        ", jobUniqueIdSource='" + jobUniqueIdSource + '\'' +
+        ", jobAttemptNumber=" + jobAttemptNumber +
+        ", jobAttemptId='" + jobAttemptId + '\'' +
+        ", taskId='" + taskId + '\'' +
+        ", taskAttemptId='" + taskAttemptId + '\'' +
+        '}';
+  }
+
+  /**
+   * Get the destination filesystem.
+   * @return destination FS.
+   * @throws IOException Problems binding to the destination FS.
+   */
+  FileSystem getDestinationFileSystem() throws IOException {
+    return FileSystem.get(destinationDir.toUri(), conf);
+  }
+
+  /**
+   * Create the job stage config from the committer
+   * configuration.
+   * This does not bind the store operations
+   * or processors.
+   * @return a stage config with configuration options passed in.
+   */
+  StageConfig createJobStageConfig() {

Review comment:
       Nit: name is a little confusing. The StageConfig being created here is for tasks as well. 

##########
File path: hadoop-tools/hadoop-azure/pom.xml
##########
@@ -219,6 +219,58 @@
       <scope>test</scope>
     </dependency>
 
+    <!--
+    the mapreduce-client-core module is compiled against for the
+    manifest committer support. It is not needed on the classpath
+    except when using this committer, so it only tagged as
+    "provided".
+    -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>

Review comment:
       This dependency should not be coming in. CloudStorageConnectors need to be usable without caring about libraries like mapreduce being pulled in, or being present on the classpath.

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
##########
@@ -42,6 +42,10 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 

Review comment:
       Still haven't looked at the changes here though.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Public constants for the manifest committer.
+ * This includes all configuration options and their default values.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class ManifestCommitterConstants {
+
+  /**
+   * Suffix to use in manifest files in the job attempt dir.
+   * Value: {@value}.
+   */
+  public static final String MANIFEST_SUFFIX = "-manifest.json";
+
+  /**
+   * Prefix for summary files in the report dir. Call
+   */
+  public static final String SUMMARY_FILENAME_PREFIX = "summary-";
+
+  /**
+   * Format string used to build a summary file from a Job ID.
+   */
+  public static final String SUMMARY_FILENAME_FORMAT =
+      SUMMARY_FILENAME_PREFIX + "%s.json";
+
+  /**
+   * Suffix to use for temp files before renaming them.
+   * Value: {@value}.
+   */
+  public static final String TMP_SUFFIX = ".tmp";
+
+  /**
+   * Initial number of all app attempts.
+   * This is fixed in YARN; for Spark jobs the
+   * same number "0" is used.
+   */
+  public static final int INITIAL_APP_ATTEMPT_ID = 0;
+
+  /**
+   * Format string for building a job dir.
+   * Value: {@value}.
+   */
+  public static final String JOB_DIR_FORMAT_STR = "manifest_%s";
+
+  /**
+   * Format string for building a job attempt dir.
+   * This uses the job attempt number so previous versions
+   * can be found trivially.
+   * Value: {@value}.
+   */
+  public static final String JOB_ATTEMPT_DIR_FORMAT_STR = "%d";
+
+  /**
+   * Name of directory under job attempt dir for manifests.
+   */
+  public static final String JOB_TASK_MANIFEST_SUBDIR = "manifests";
+
+  /**
+   * Name of directory under job attempt dir for task attempts.
+   */
+  public static final String JOB_TASK_ATTEMPT_SUBDIR = "tasks";
+
+
+  /**
+   * Committer classname as recorded in the committer _SUCCESS file.
+   */
+  public static final String MANIFEST_COMMITTER_CLASSNAME =
+      "org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter";
+
+  /**
+   * Marker file to create on success: {@value}.
+   */
+  public static final String SUCCESS_MARKER = "_SUCCESS";
+
+  /** Default job marker option: {@value}. */
+  public static final boolean DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER = true;
+
+  /**
+   * The limit to the number of committed objects tracked during
+   * job commits and saved to the _SUCCESS file.
+   * Value: {@value}.
+   */
+  public static final int SUCCESS_MARKER_FILE_LIMIT = 100;
+
+  /**
+   * The UUID for jobs: {@value}.
+   * This was historically created in Spark 1.x's SQL queries,
+   * but "went away".
+   * It has been restored in recent spark releases.
+   * If found: it is used instead of the MR job attempt ID.
+   */
+  public static final String SPARK_WRITE_UUID = "spark.sql.sources.writeJobUUID";
+
+  /**
+   * String to use as source of the job ID.
+   * This SHOULD be kept in sync with that of
+   * {@code AbstractS3ACommitter.JobUUIDSource}.
+   * Value: {@value}.
+   */
+  public static final String JOB_ID_SOURCE_MAPREDUCE = "JobID";
+
+  /**
+   * Prefix to use for config options: {@value}.
+   */
+  public static final String OPT_PREFIX = "mapreduce.manifest.committer.";
+
+  /**
+   * Rather than delete in cleanup, should the working directory
+   * be moved to the trash directory?
+   * Potentially faster on some stores.
+   * Value: {@value}.
+   */
+  public static final String OPT_CLEANUP_MOVE_TO_TRASH =

Review comment:
       Why does the committer need to introduce this?
   
   This particular committer isn't meant to be used by S3. Trash is often not enabled on cloud storage - so if someone turns this on, there may be nothing to go and clean up the relocated files. Think it'll be simpler to skip this config, and rely on whatever is configured for the filesystem.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java
##########
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.measureDurationOfInvocation;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_PREPARE_PARENT_DIRECTORIES;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_CREATE_DIRECTORIES;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_FILE_UNDER_DESTINATION;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MKDIRS_RETURNED_FALSE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_PREPARE_DIR_ANCESTORS;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CREATE_TARGET_DIRS;
+import static org.apache.hadoop.util.OperationDuration.humanTime;
+
+/**
+ * Prepare the destination directory tree, by making as few IO calls as
+ * possible -and doing those IO operations in the thread pool.
+ *
+ * The classic FileOutputCommitter does a recursive treewalk and
+ * deletes any files found at paths where directories are to be created.
+ *
+ *
+ * Each task manifest's directories are combined with those of the other tasks
+ * to build a set of all directories which are needed, without duplicates.
+ *
+ * If the option to prepare parent directories was set,
+ * these are also prepared for the commit.
+ * A set of ancestor directories is built up, all of which MUST NOT
+ * be files and SHALL be in one of two states
+ * - directory
+ * - nonexistent
+ * Across a thread pool, An isFile() probe is made for all parents;
+ * if there is a file there then it is deleted.
+ *
+ * After any ancestor dir preparation, the final dir set is created
+ * though mkdirs() calls, with some handling for race conditions.
+ * It is not an error if mkdirs() fails because the dest dir is there;
+ * if it fails because a file was found -that file will be deleted.
+ *
+ * The stage returns the list of directories created, and for testing,
+ * the map of paths to outcomes.
+ */
+public class CreateOutputDirectoriesStage extends
+    AbstractJobCommitStage<List<TaskManifest>, CreateOutputDirectoriesStage.Result> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CreateOutputDirectoriesStage.class);
+
+  /**
+   * Message to print on first mkdir failure if the parent dirs were not
+   * prepared.
+   */
+  private static final String HINT_PREPARE_PARENT_DIR = "Setting "
+      + OPT_PREPARE_PARENT_DIRECTORIES + " to \"true\" performs more parent directory"
+      + " preparation and so may fix this problem";
+
+  /**
+   * Directories as a map of (path, path).
+   * Using a map rather than any set for efficient concurrency; the
+   * concurrent sets don't do lookups so fast.
+   */
+  private final Map<Path, DirMapState> dirMap = new ConcurrentHashMap<>();
+
+  /**
+   * A list of created paths for the results.
+   */
+  private final List<Path> createdDirectories = new ArrayList<>();
+
+  public CreateOutputDirectoriesStage(final StageConfig stageConfig) {
+    super(false, stageConfig, OP_STAGE_JOB_CREATE_TARGET_DIRS, true);
+    // add the dest dir to the dir map as we expect the job setup to create it.
+    dirMap.put(getDestinationDir(), DirMapState.dirWasCreated);
+  }
+
+  @Override
+  protected Result executeStage(
+      final List<TaskManifest> taskManifests)
+      throws IOException {
+
+    final List<Path> directories = createAllDirectories(taskManifests);
+    LOG.debug("{}: Created {} directories", getName(), directories.size());
+    return new Result(directories, dirMap);
+  }
+
+  /**
+   * For each task, build the list of directories it wants.
+   * @param taskManifests task manifests
+   * @return the list of paths which have been created.
+   */
+  private List<Path> createAllDirectories(final List<TaskManifest> taskManifests)
+      throws IOException {
+
+    // the set is all directories which need to exist across all
+    // tasks.
+    // This set is probed during parent dir scanning, so
+    //  needs to be efficient to look up values.
+    final Set<Path> directoriesToCreate = new HashSet<>();
+
+    // iterate through the task manifests
+    // and all output dirs into the set of dirs to
+    // create.
+    // hopefully there is a lot of overlap, so the
+    // final number of dirs to create is small.
+    for (TaskManifest task : taskManifests) {
+      final List<FileOrDirEntry> dirEntries
+          = task.getDirectoriesToCreate();
+      for (FileOrDirEntry entry: dirEntries) {
+        // add the dest path
+        directoriesToCreate.add(entry.getDestPath());
+      }
+    }
+
+    // Prepare parent directories.
+    if (getStageConfig().getPrepareParentDirectories()) {
+      prepareParentDirectories(directoriesToCreate);
+    }
+
+    // Now the real work.
+
+    final int createCount = directoriesToCreate.size();
+    LOG.info("Preparing {} directory/directories", createCount);
+    // now probe for and create the parent dirs of all renames
+    Duration d = measureDurationOfInvocation(getIOStatistics(), OP_CREATE_DIRECTORIES, () ->
+        TaskPool.foreach(directoriesToCreate)
+            .executeWith(getIOProcessors(createCount))
+            .onFailure(this::reportMkDirFailure)
+            .stopOnFailure()
+            .run(dir -> {
+              updateAuditContext(OP_STAGE_JOB_CREATE_TARGET_DIRS);
+              if (maybeCreateOneDirectory(dir)) {
+                addCreatedDirectory(dir);
+              }
+            }));
+    LOG.info("Time to prepare directories {}", humanTime(d.toMillis()));
+    return createdDirectories;
+  }
+
+  /**
+   * How many failures have been reported.
+   */
+  private final AtomicInteger failureCount = new AtomicInteger();
+
+  /**
+   * report a single directory failure.
+   * @param path path which could not be deleted
+   * @param e exception raised.
+   */
+  private void reportMkDirFailure(Path path, Exception e) {
+    final int count = failureCount.incrementAndGet();
+    LOG.warn("{}: mkdir failure #{} Failed to create directory \"{}\": {}",
+        getName(), count, path, e.toString());
+    LOG.debug("{}: Full exception details",
+        getName(), e);
+    if (count == 1  && !getStageConfig().getPrepareParentDirectories()) {
+      // first failure and no parent dir setup, so recommend
+      // preparing the parent dir
+      LOG.warn(HINT_PREPARE_PARENT_DIR);
+    }
+  }
+
+  /**
+   * Clean up parent dirs. No attempt is made to create them, because
+   * we know mkdirs() will do that (assuming it has the permissions,
+   * of course)
+   * @param directoriesToCreate set of dirs to create
+   * @throws IOException IO problem
+   */
+  private void prepareParentDirectories(final Set<Path> directoriesToCreate)
+      throws IOException {
+
+    // include parents in the paths
+    final Set<Path> ancestors = extractAncestors(
+        getStageConfig().getDestinationDir(),
+        directoriesToCreate);
+
+    final int ancestorCount = ancestors.size();
+    LOG.info("Preparing {} ancestor directory/directories", ancestorCount);
+    // if there is a single ancestor, don't bother with parallisation.
+    Duration d = measureDurationOfInvocation(getIOStatistics(),
+        OP_PREPARE_DIR_ANCESTORS, () ->
+            TaskPool.foreach(ancestors)
+                .executeWith(getIOProcessors(ancestorCount))
+                .stopOnFailure()
+                .run(dir -> {
+                  updateAuditContext(OP_PREPARE_DIR_ANCESTORS);
+                  prepareParentDir(dir);
+                }));
+    LOG.info("Time to prepare ancestor directories {}", humanTime(d.toMillis()));
+  }
+
+  /**
+   * Prepare a parent directory.
+   * @param dir directory to probe
+   * @throws IOException failure in probe other than FNFE
+   */
+  private void prepareParentDir(Path dir) throws IOException {
+    // report progress back
+    progress();
+    LOG.debug("{}: Probing parent dir {}", getName(), dir);
+    if (isFile(dir)) {
+      // it's a file: delete it.
+      LOG.info("{}: Deleting file {}", getName(), dir);
+      delete(dir, false, OP_DELETE_FILE_UNDER_DESTINATION);
+      // note its final state
+      addToDirectoryMap(dir, DirMapState.ancestorWasFileNowDeleted);
+    } else {
+      // and add to dir map as a dir or missing entry
+      LOG.debug("{}: Dir {} is missing or a directory", getName(), dir);
+      addToDirectoryMap(dir, DirMapState.ancestorWasDirOrMissing);
+    }
+  }
+
+  /**
+   * Determine all ancestors of the dirs to create which are
+   * under the dest path, not in the set of dirs to create.
+   * Static and public for testability.
+   * @param destDir destination dir of the job
+   * @param directoriesToCreate set of dirs to create from tasks.
+   * @return the set of parent dirs
+   */
+  public static Set<Path> extractAncestors(
+      final Path destDir,
+      final Collection<Path> directoriesToCreate) {
+
+    final Set<Path> ancestors = new HashSet<>(directoriesToCreate.size());

Review comment:
       More specifically - if we ever end up trying to delete a directory in this list.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/StoreOperations.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * FileSystem operations which are needed to generate the task manifest.
+ * The specific choice of which implementation to use is configurable.
+ *
+ * If a store provides a custom file commit operation then it must
+ * offer a subclass of this which returns true in
+ * {@link #storeSupportsResilientCommit()}
+ * and then implement
+ * {@link #commitFile(FileOrDirEntry)}.
+ *
+ */
+public abstract class StoreOperations implements Closeable {
+
+  /**
+   * Bind to the filesystem.
+   * This is called by the manifest committer after the operations
+   * have been instantiated.
+   * @param fileSystem target FS
+   * @param path actual path under FS.
+   * @throws IOException if there are binding problems.
+   */
+  public void bindToFileSystem(FileSystem fileSystem, Path path) throws IOException {
+
+  }
+
+  /**
+   * Forward to {@link FileSystem#getFileStatus(Path)}.
+   * @param path path
+   * @return status
+   * @throws IOException failure.
+   */
+  public abstract FileStatus getFileStatus(Path path) throws IOException;
+
+  /**
+   * Is a path a file? Used during directory creation.
+   * The is a copy & paste of FileSystem.isFile();
+   * {@code StoreOperationsThroughFileSystem} calls into
+   * the FS direct so that stores which optimize their probes
+   * can save on IO.
+   * @param path path to probe
+   * @return true if the path exists and resolves to a file
+   * @throws IOException failure other than FileNotFoundException
+   */
+  public boolean isFile(Path path) throws IOException {
+    try {
+      return getFileStatus(path).isFile();
+    } catch (FileNotFoundException e) {
+      return false;
+    }
+  }
+
+  /**
+   * Forward to {@link FileSystem#delete(Path, boolean)}.
+   * If it returns without an error: there is nothing at
+   * the end of the path.
+   * @param path path
+   * @param recursive recursive delete.
+   * @return true if the path was deleted.
+   * @throws IOException failure.
+   */
+  public abstract boolean delete(Path path, boolean recursive)
+      throws IOException;
+
+  /**
+   * Forward to {@link FileSystem#mkdirs(Path)}.
+   * Usual "what does 'false' mean" ambiguity.
+   * @param path path
+   * @return true if the directory was created.
+   * @throws IOException failure.
+   */
+  public abstract boolean mkdirs(Path path) throws IOException;
+
+  /**
+   * Forward to {@link FileSystem#rename(Path, Path)}.
+   * Usual "what does 'false' mean" ambiguity.
+   * @param source source file
+   * @param dest destination path -which must not exist.
+   * @return the return value of the rename
+   * @throws IOException failure.
+   */
+  public abstract boolean renameFile(Path source, Path dest)
+      throws IOException;
+
+  /**
+   * Rename a dir; defaults to invoking
+   * Forward to {@link #renameFile(Path, Path)}.
+   * Usual "what does 'false' mean?" ambiguity.
+   * @param source source file
+   * @param dest destination path -which must not exist.
+   * @return true if the directory was created.
+   * @throws IOException failure.
+   */
+  public boolean renameDir(Path source, Path dest)
+      throws IOException {
+    return renameFile(source, dest);
+  }
+
+  /**
+   * List the directory.
+   * @param path path to list.
+   * @return an iterator over the results.
+   * @throws IOException any immediate failure.
+   */
+  public abstract RemoteIterator<FileStatus> listStatusIterator(Path path)
+      throws IOException;
+
+  /**
+   * Load a task manifest from the store.
+   * with a real FS, this is done with
+   * {@link TaskManifest#load(JsonSerialization, FileSystem, Path, FileStatus)}
+   *
+   * @param serializer serializer.
+   * @param st status with the path and other data.
+   * @return the manifest
+   * @throws IOException failure to load/parse
+   */
+  public abstract TaskManifest loadTaskManifest(
+      JsonSerialization<TaskManifest> serializer,
+      FileStatus st) throws IOException;
+
+  /**
+   * Save a task manifest by create(); there's no attempt at

Review comment:
       Not sure what 'by create()' means.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Public constants for the manifest committer.
+ * This includes all configuration options and their default values.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class ManifestCommitterConstants {
+
+  /**
+   * Suffix to use in manifest files in the job attempt dir.
+   * Value: {@value}.
+   */
+  public static final String MANIFEST_SUFFIX = "-manifest.json";
+
+  /**
+   * Prefix for summary files in the report dir. Call
+   */
+  public static final String SUMMARY_FILENAME_PREFIX = "summary-";
+
+  /**
+   * Format string used to build a summary file from a Job ID.
+   */
+  public static final String SUMMARY_FILENAME_FORMAT =
+      SUMMARY_FILENAME_PREFIX + "%s.json";
+
+  /**
+   * Suffix to use for temp files before renaming them.
+   * Value: {@value}.
+   */
+  public static final String TMP_SUFFIX = ".tmp";
+
+  /**
+   * Initial number of all app attempts.
+   * This is fixed in YARN; for Spark jobs the
+   * same number "0" is used.
+   */
+  public static final int INITIAL_APP_ATTEMPT_ID = 0;
+
+  /**
+   * Format string for building a job dir.
+   * Value: {@value}.
+   */
+  public static final String JOB_DIR_FORMAT_STR = "manifest_%s";
+
+  /**
+   * Format string for building a job attempt dir.
+   * This uses the job attempt number so previous versions
+   * can be found trivially.
+   * Value: {@value}.
+   */
+  public static final String JOB_ATTEMPT_DIR_FORMAT_STR = "%d";
+
+  /**
+   * Name of directory under job attempt dir for manifests.
+   */
+  public static final String JOB_TASK_MANIFEST_SUBDIR = "manifests";
+
+  /**
+   * Name of directory under job attempt dir for task attempts.
+   */
+  public static final String JOB_TASK_ATTEMPT_SUBDIR = "tasks";
+
+
+  /**
+   * Committer classname as recorded in the committer _SUCCESS file.
+   */
+  public static final String MANIFEST_COMMITTER_CLASSNAME =

Review comment:
       Can this refer to the class itself?
   ManifestCommitter.class.get*

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/AuditingIntegration.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import org.apache.hadoop.fs.audit.CommonAuditContext;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConfig;
+
+import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_JOB_ID;
+import static org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.CONTEXT_ATTR_STAGE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.CONTEXT_ATTR_TASK_ATTEMPT_ID;
+
+/**
+ * Helper class to support integration with Hadoop 3.3.2+ Auditing.
+ * This MUST BE the sole place where fs.audit methods are used, so can be replaced
+ * by a stub class on any backport.
+ * All use of auditing is currently commented out so it *is the stub*

Review comment:
       Not sure what this comment means.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/AuditingIntegration.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import org.apache.hadoop.fs.audit.CommonAuditContext;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConfig;
+
+import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_JOB_ID;
+import static org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.CONTEXT_ATTR_STAGE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.CONTEXT_ATTR_TASK_ATTEMPT_ID;
+
+/**
+ * Helper class to support integration with Hadoop 3.3.2+ Auditing.
+ * This MUST BE the sole place where fs.audit methods are used, so can be replaced
+ * by a stub class on any backport.
+ * All use of auditing is currently commented out so it *is the stub*
+ */
+public final class AuditingIntegration {
+  private AuditingIntegration() {
+  }
+
+  /**
+   * Add jobID to current context; also
+   * task attempt ID if set.
+   */
+  public static void updateCommonContextOnCommitterEntry(
+      ManifestCommitterConfig committerConfig) {
+    CommonAuditContext context = currentAuditContext();
+    context.put(PARAM_JOB_ID,
+        committerConfig.getJobUniqueId());
+    // maybe the task attempt ID.
+    if (!committerConfig.getTaskAttemptId().isEmpty()) {

Review comment:
       null check as well?

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java
##########
@@ -0,0 +1,751 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperations;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperationsThroughFileSystem;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbortTaskStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitJobStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitTaskStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJobStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageEventCallbacks;
+import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;
+
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_COMPLETED_COUNT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_FAILED_COUNT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_ABORT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.STAGE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.updateCommonContextOnCommitterExit;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.updateCommonContextOnCommitterEntry;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createIOStatisticsStore;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createJobSummaryFilename;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createManifestOutcome;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestPathForTask;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage.cleanupStageOptionsFromConfig;
+
+/**
+ * This is the Intermediate-Manifest committer.
+ * At every entry point it updates the thread's audit context with
+ * the current stage info; this is a placeholder for
+ * adding audit information to stores other than S3A.
+ *
+ * This is tagged as public/stable. This is mandatory
+ * for the classname and PathOutputCommitter implementation
+ * classes.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ManifestCommitter extends PathOutputCommitter implements
+    IOStatisticsSource, StageEventCallbacks {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      ManifestCommitter.class);
+
+  /**
+   * Role: task committer.
+   */
+  public static final String TASK_COMMITTER = "task committer";
+
+  /**
+   * Role: job committer.
+   */
+  public static final String JOB_COMMITTER = "job committer";
+
+  /**
+   * Committer Configuration as extracted from
+   * the job/task context and set in the constructor.
+   */
+  private final ManifestCommitterConfig baseConfig;
+
+  /**
+   * Destination of the job.
+   */
+  private final Path destinationDir;
+
+  /**
+   * For tasks, the attempt directory.
+   * Null for jobs.
+   */
+  private final Path taskAttemptDir;
+
+  /**
+   * IOStatistics to update.
+   */
+  private final IOStatisticsStore iostatistics;
+
+  /**
+   *  The job Manifest Success data; only valid after a job successfully
+   *  commits.
+   */
+  private ManifestSuccessData successReport;
+
+  /**
+   * The active stage; is updated by a callback from within the stages.
+   */
+  private String activeStage;
+
+  /**
+   * The task manifest of the task commit.
+   * Null unless this is a task attempt and the
+   * task has successfully been committed.
+   */
+  private TaskManifest taskAttemptCommittedManifest;
+
+  /**
+   * Create a committer.
+   * @param outputPath output path
+   * @param context job/task context
+   * @throws IOException failure.
+   */
+  ManifestCommitter(final Path outputPath,
+      final TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    this.destinationDir = resolveDestinationDirectory(outputPath,
+        context.getConfiguration());
+    this.iostatistics = createIOStatisticsStore().build();
+    this.baseConfig = enterCommitter(
+        context.getTaskAttemptID() != null,
+        context);
+
+    this.taskAttemptDir = baseConfig.getTaskAttemptDir();
+    LOG.info("Created ManifestCommitter with JobID {},"
+            + " Task Attempt {} and destination {}",
+        context.getJobID(), context.getTaskAttemptID(), outputPath);
+  }
+
+  /**
+   * Committer method invoked; generates a config for it.
+   * Calls {@code #updateCommonContextOnCommitterEntry()}
+   * to update the audit context.
+   * @param isTask is this a task entry point?
+   * @param context context
+   * @return committer config
+   */
+  private ManifestCommitterConfig enterCommitter(boolean isTask,
+      JobContext context) {
+    ManifestCommitterConfig committerConfig =
+        new ManifestCommitterConfig(
+            getOutputPath(),
+            isTask ? TASK_COMMITTER : JOB_COMMITTER,
+            context,
+            iostatistics,
+            this);
+    updateCommonContextOnCommitterEntry(committerConfig);
+    return committerConfig;
+  }
+
+  /**
+   * Set up a job through a {@link SetupJobStage}.
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException IO Failure.
+   */
+  public void setupJob(final JobContext jobContext) throws IOException {
+    ManifestCommitterConfig committerConfig = enterCommitter(false,
+        jobContext);
+    StageConfig stageConfig =
+        committerConfig
+            .createJobStageConfig()
+            .withOperations(createStoreOperations())
+            .build();
+    // set up the job.
+    new SetupJobStage(stageConfig)
+        .apply(committerConfig.getCreateJobMarker());
+    logCommitterStatisticsAtDebug();
+  }
+
+  /**
+   * Set up a task through a {@link SetupTaskStage}.
+   * Classic FileOutputCommitter is a no-op here, relying
+   * on RecordWriters to create the dir implicitly on file
+   * create().
+   * FileOutputCommitter also uses the existence of that
+   * file as a flag to indicate task commit is needed.
+   * @param context task context.
+   * @throws IOException IO Failure.
+   */
+  public void setupTask(final TaskAttemptContext context)
+      throws IOException {
+    ManifestCommitterConfig committerConfig =
+        enterCommitter(true, context);
+    StageConfig stageConfig =
+        committerConfig
+            .createJobStageConfig()
+            .withOperations(createStoreOperations())
+            .build();
+    // create task attempt dir; delete if present. Or fail?
+    new SetupTaskStage(stageConfig).apply("");
+    logCommitterStatisticsAtDebug();
+  }
+
+  /**
+   * Always return true.
+   * This way, even if there is no output, stats are collected.
+   * @param context task context.
+   * @return true
+   * @throws IOException IO Failure.
+   */
+  public boolean needsTaskCommit(final TaskAttemptContext context)
+      throws IOException {
+    LOG.info("Probe for needsTaskCommit({})",
+        context.getTaskAttemptID());
+    return true;
+  }
+
+  /**
+   * Failure during Job Commit is not recoverable from.
+   *
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return false, always
+   * @throws IOException never
+   */
+  @Override
+  public boolean isCommitJobRepeatable(final JobContext jobContext)
+      throws IOException {
+    LOG.info("Probe for isCommitJobRepeatable({}): returning false",
+        jobContext.getJobID());
+    return false;
+  }
+
+  /**
+   * Declare that task recovery is not supported.
+   * It would be, if someone added the code *and tests*.
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return false, always
+   * @throws IOException never
+   */
+  @Override
+  public boolean isRecoverySupported(final JobContext jobContext)
+      throws IOException {
+    LOG.info("Probe for isRecoverySupported({}): returning false",
+        jobContext.getJobID());
+    return false;
+  }
+
+  /**
+   *
+   * @param taskContext Context of the task whose output is being recovered
+   * @throws IOException always
+   */
+  @Override
+  public void recoverTask(final TaskAttemptContext taskContext)
+      throws IOException {
+    LOG.warn("Rejecting recoverTask({}) call", taskContext.getTaskAttemptID());
+    throw new IOException("Cannot recover task "
+        + taskContext.getTaskAttemptID());
+  }
+
+  /**
+   * Commit the task.
+   * This is where the task attempt tree list takes place.
+   * @param context task context.
+   * @throws IOException IO Failure.
+   */
+  public void commitTask(final TaskAttemptContext context)
+      throws IOException {
+    ManifestCommitterConfig committerConfig = enterCommitter(true,
+        context);
+    try {
+      StageConfig stageConfig = committerConfig.createJobStageConfig()
+          .withOperations(createStoreOperations())
+          .build();
+      taskAttemptCommittedManifest = new CommitTaskStage(stageConfig)
+          .apply(null).getTaskManifest();
+      iostatistics.incrementCounter(COMMITTER_TASKS_COMPLETED_COUNT, 1);
+    } catch (IOException e) {
+      iostatistics.incrementCounter(COMMITTER_TASKS_FAILED_COUNT, 1);
+      throw e;
+    } finally {
+      logCommitterStatisticsAtDebug();
+      updateCommonContextOnCommitterExit();
+    }
+
+  }
+
+  /**
+   * Abort a task.
+   * @param context task context
+   * @throws IOException failure during the delete
+   */
+  public void abortTask(final TaskAttemptContext context)
+      throws IOException {
+    ManifestCommitterConfig committerConfig = enterCommitter(true,
+        context);
+    try {
+      new AbortTaskStage(
+          committerConfig.createJobStageConfig()
+              .withOperations(createStoreOperations())
+              .build())
+          .apply(false);
+    } finally {
+      logCommitterStatisticsAtDebug();
+      updateCommonContextOnCommitterExit();
+    }
+  }
+
+  /**
+   * Get the manifest success data for this job; creating on demand if needed.
+   * @param committerConfig source config.
+   * @return the current {@link #successReport} value; never null.
+   */
+  private ManifestSuccessData getOrCreateSuccessData(
+      ManifestCommitterConfig committerConfig) {
+    if (successReport == null) {
+      successReport = createManifestOutcome(
+          committerConfig.createJobStageConfig(), activeStage);
+    }
+    return successReport;
+  }
+
+  /**
+   * This is the big job commit stage.
+   * Load the manifests, prepare the destination, rename
+   * the files then cleanup the job directory.
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException failure.
+   */
+  @Override
+  public void commitJob(final JobContext jobContext) throws IOException {
+
+    ManifestCommitterConfig committerConfig = enterCommitter(false, jobContext);
+
+    // create the initial success data.
+    // this is overwritten by that created during the operation sequence,
+    // but if the sequence fails before that happens, it
+    // will be saved to the report directory.
+    ManifestSuccessData marker = getOrCreateSuccessData(committerConfig);
+    IOException failure = null;
+    try (CloseableTaskPoolSubmitter ioProcs =
+             committerConfig.createSubmitter();
+         StoreOperations storeOperations = createStoreOperations()) {
+      // the stage config will be shared across all stages.
+      StageConfig stageConfig = committerConfig.createJobStageConfig()
+          .withOperations(storeOperations)
+          .withIOProcessors(ioProcs)
+          .build();
+
+      // commit the job, including any cleanup and validation.
+      final Configuration conf = jobContext.getConfiguration();
+      CommitJobStage.Result result = new CommitJobStage(stageConfig).apply(
+          new CommitJobStage.Arguments(
+              committerConfig.getCreateJobMarker(),
+              committerConfig.getValidateOutput(),
+              conf.getTrimmed(OPT_DIAGNOSTICS_MANIFEST_DIR, ""),
+              cleanupStageOptionsFromConfig(
+                  OP_STAGE_JOB_CLEANUP, conf)
+          ));
+      marker = result.getJobSuccessData();
+      // update the cached success with the new report.
+      setSuccessReport(marker);
+
+    } catch (IOException e) {
+      // failure. record it for the summary
+      failure = e;
+      // rethrow
+      throw e;
+    } finally {
+      // save the report summary, even on failure
+      maybeSaveSummary(activeStage,
+          committerConfig,
+          marker,
+          failure,
+          true,
+          true);
+      // print job commit stats
+      LOG.info("{}: Job Commit statistics {}",
+          committerConfig.getName(),
+          ioStatisticsToPrettyString(iostatistics));
+      // and warn of rename problems
+      final Long recoveries = iostatistics.counters().get(OP_COMMIT_FILE_RENAME_RECOVERED);
+      if (recoveries != null && recoveries > 0) {
+        LOG.warn("{}: rename failures were recovered from. Number of recoveries: {}",
+            committerConfig.getName(), recoveries);
+      }
+      updateCommonContextOnCommitterExit();
+    }
+  }
+
+  /**
+   * Abort the job.
+   * Invokes
+   * {@link #executeCleanup(String, JobContext, ManifestCommitterConfig)}
+   * then saves the (ongoing) job report data if reporting is enabled.
+   * @param jobContext Context of the job whose output is being written.
+   * @param state final runstate of the job
+   * @throws IOException failure during cleanup; report failure are swallowed
+   */
+  @Override
+  public void abortJob(final JobContext jobContext,
+      final JobStatus.State state)
+      throws IOException {
+    LOG.info("Aborting Job {} in state {}", jobContext.getJobID(), state);
+    ManifestCommitterConfig committerConfig = enterCommitter(false,
+        jobContext);
+    ManifestSuccessData report = getOrCreateSuccessData(
+        committerConfig);
+    IOException failure = null;
+
+    try {
+      executeCleanup(OP_STAGE_JOB_ABORT, jobContext, committerConfig);
+    } catch (IOException e) {
+      // failure.
+      failure = e;
+    }
+    report.setSuccess(false);
+    // job abort does not overwrite any existing report, so a job commit
+    // failure cause will be preserved.
+    maybeSaveSummary(activeStage, committerConfig, report, failure,
+        true, false);
+    // print job stats
+    LOG.info("Job Abort statistics {}",
+        ioStatisticsToPrettyString(iostatistics));
+    updateCommonContextOnCommitterExit();
+  }
+
+  /**
+   * Execute the {@code CleanupJobStage} to remove the job attempt dir.
+   * This does
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException failure during cleanup
+   */
+  @SuppressWarnings("deprecation")
+  @Override
+  public void cleanupJob(final JobContext jobContext) throws IOException {
+    ManifestCommitterConfig committerConfig = enterCommitter(false,
+        jobContext);
+    try {
+      executeCleanup(OP_STAGE_JOB_CLEANUP, jobContext, committerConfig);
+    } finally {
+      logCommitterStatisticsAtDebug();
+      updateCommonContextOnCommitterExit();
+    }
+  }
+
+  /**
+   * Perform the cleanup operation for job cleanup or abort.
+   * @param statisticName statistic/stage name
+   * @param jobContext job context
+   * @param committerConfig committer config
+   * @throws IOException failure
+   * @return the outcome
+   */
+  private CleanupJobStage.Result executeCleanup(
+      final String statisticName,
+      final JobContext jobContext,
+      final ManifestCommitterConfig committerConfig) throws IOException {
+    try (CloseableTaskPoolSubmitter ioProcs =
+             committerConfig.createSubmitter()) {
+
+      return new CleanupJobStage(
+          committerConfig.createJobStageConfig()
+              .withOperations(createStoreOperations())
+              .withIOProcessors(ioProcs)
+              .build())
+          .apply(cleanupStageOptionsFromConfig(
+              statisticName,
+              jobContext.getConfiguration()));
+    }
+  }
+
+  /**
+   * Output path: destination directory of the job.
+   * @return the overall job destination directory.
+   */
+  public Path getOutputPath() {
+    return getDestinationDir();
+  }
+
+  /**
+   * Work path of the current task attempt.
+   * This is null if the task does not have one.
+   * @return a path.
+   */
+  public Path getWorkPath() {
+    return getTaskAttemptDir();
+  }
+
+  /**
+   * Get the job destination dir.
+   * @return dest dir.
+   */
+  private Path getDestinationDir() {
+    return destinationDir;
+  }
+
+  /**
+   * Get the task attempt dir.
+   * May be null.
+   * @return a path or null.
+   */
+  private Path getTaskAttemptDir() {
+    return taskAttemptDir;
+  }
+
+  /**
+   * Callback on stage entry.
+   * Sets {@link #activeStage} and updates the
+   * common context.
+   * @param stage new stage
+   */
+  @Override
+  public void enterStage(String stage) {
+    activeStage = stage;
+    AuditingIntegration.enterStage(stage);
+  }
+
+  /**
+   * Remove stage from common audit context.
+   * @param stage stage exited.
+   */
+  @Override
+  public void exitStage(String stage) {
+    AuditingIntegration.exitStage();
+  }
+
+  /**
+   * Get the unique ID of this job.
+   * @return job ID (yarn, spark)
+   */
+  public String getJobUniqueId() {
+    return baseConfig.getJobUniqueId();
+  }
+
+  /**
+   * Get the config of the task attempt this instance was constructed
+   * with.
+   * @return a configuration.
+   */
+  public Configuration getConf() {
+    return baseConfig.getConf();
+  }
+
+  /**
+   * Get the manifest Success data; only valid after a job.
+   * @return the job _SUCCESS data, or null.
+   */
+  public ManifestSuccessData getSuccessReport() {
+    return successReport;
+  }
+
+  private void setSuccessReport(ManifestSuccessData successReport) {
+    this.successReport = successReport;
+  }
+
+  /**
+   * Get the manifest of the last committed task.
+   * @return a task manifest or null.
+   */
+  @VisibleForTesting
+  TaskManifest getTaskAttemptCommittedManifest() {
+    return taskAttemptCommittedManifest;
+  }
+
+  /**
+   * Compute the path where the output of a task attempt is stored until
+   * that task is committed.
+   * @param context the context of the task attempt.
+   * @return the path where a task attempt should be stored.
+   */
+  @VisibleForTesting
+  public Path getTaskAttemptPath(TaskAttemptContext context) {
+    return enterCommitter(false, context).getTaskAttemptDir();
+  }
+
+  /**
+   * The path to where the manifest file of a task attempt will be
+   * saved when the task is committed.
+   * This path will be the same for all attempts of the same task.
+   * @param context the context of the task attempt.
+   * @return the path where a task attempt should be stored.
+   */
+  @VisibleForTesting
+  public Path getTaskManifestPath(TaskAttemptContext context) {
+    final Path dir = enterCommitter(false, context).getTaskManifestDir();
+
+    return manifestPathForTask(dir,
+        context.getTaskAttemptID().getTaskID().toString());
+  }
+
+  /**
+   * Compute the path where the output of a task attempt is stored until
+   * that task is committed.
+   * @param context the context of the task attempt.
+   * @return the path where a task attempt should be stored.
+   */
+  @VisibleForTesting
+  public Path getJobAttemptPath(JobContext context) {
+
+    return enterCommitter(false, context).getJobAttemptDir();
+  }
+
+  /**
+   * Get the final output path, including resolving any relative path.
+   * @param outputPath output path
+   * @param conf configuration to create any FS with
+   * @return a resolved path.
+   * @throws IOException failure.
+   */
+  private Path resolveDestinationDirectory(Path outputPath,
+      Configuration conf) throws IOException {
+    return FileSystem.get(outputPath.toUri(), conf).makeQualified(outputPath);
+  }
+
+  /**
+   * Create a FS level store operations for the destination store.
+   * This MUST NOT be used for the success report operations, as
+   * they may be to a different filesystem.
+   * This is a point which can be overridden during testing.
+   * @return a new store operations instance bonded to the destination fs.
+   * @throws IOException failure to instantiate.
+   */
+  protected StoreOperations createStoreOperations() throws IOException {
+    return ManifestCommitterSupport.createStoreOperations(
+        baseConfig.getConf(),
+        baseConfig.getDestinationFileSystem(),
+        baseConfig.getDestinationDir());
+  }
+
+  /**
+   * Log IO Statistics at debug.
+   */
+  private void logCommitterStatisticsAtDebug() {
+    logIOStatisticsAtDebug(LOG, "Committer Statistics", this);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "ManifestCommitter{");
+    sb.append(baseConfig);
+    sb.append(", iostatistics=").append(ioStatisticsToPrettyString(iostatistics));
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Save a summary to the report dir if the config option
+   * is set.
+   * The IOStatistics of the summary will be updated to the latest
+   * snapshot of the committer's statistics, so the report is up
+   * to date.
+   * The report will updated with the current active stage,
+   * and if {@code thrown} is non-null, it will be added to the
+   * diagnistics (and the job tagged as a failure).
+   * Static for testability.
+   * @param activeStage active stage
+   * @param config configuration to use.
+   * @param report summary file.
+   * @param thrown any exception indicting failure.
+   * @param quiet should exceptions be swallowed.
+   * @param overwrite should the existing file be overwritten
+   * @return the path of a file, if successfully saved
+   * @throws IOException if a failure occured and quiet==false
+   */
+  private static Path maybeSaveSummary(
+      String activeStage,
+      ManifestCommitterConfig config,
+      ManifestSuccessData report,
+      Throwable thrown,
+      boolean quiet,
+      boolean overwrite) throws IOException {
+    Configuration conf = config.getConf();
+    String reportDir = conf.getTrimmed(OPT_SUMMARY_REPORT_DIR, "");
+    if (reportDir.isEmpty()) {
+      LOG.debug("No summary directory set in " + OPT_SUMMARY_REPORT_DIR);
+      return null;
+    }
+    LOG.debug("Summary directory set in to {}" + OPT_SUMMARY_REPORT_DIR,
+        reportDir);
+
+    // update to the latest statistics
+    report.snapshotIOStatistics(config.getIOStatistics());
+
+    Path reportDirPath = new Path(reportDir);
+    Path path = new Path(reportDirPath,
+        createJobSummaryFilename(config.getJobUniqueId()));
+
+    if (thrown != null) {
+      report.recordJobFailure(thrown);
+    }
+    report.putDiagnostic(STAGE, activeStage);
+    final FileSystem fs = path.getFileSystem(conf);
+    try (StoreOperations operations = new StoreOperationsThroughFileSystem(fs)) {

Review comment:
       Should this be using the same mechanism to create StoreOperations as the rest of the class?

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobCommitStage.java
##########
@@ -0,0 +1,1009 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperations;
+import org.apache.hadoop.util.OperationDuration;
+import org.apache.hadoop.util.Preconditions;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_IS_FILE;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_STATUS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_MKDIRS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_RENAME;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.createTracker;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.IO_ACQUIRE_READ_PERMIT_BLOCKED;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.IO_ACQUIRE_WRITE_PERMIT_BLOCKED;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MSYNC;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.enterStageWorker;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_GET_FILE_STATUS;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_LIST;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_READ_OPEN_FILE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_COMMIT_FILE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_CREATE_FILE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_DELETE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_MKDIR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.PERMIT_WRITE_RENAME;
+
+/**
+ * A Stage in Task/Job Commit.
+ * A stage can be executed once only, creating the return value of the
+ * {@link #apply(Object)} method, and, potentially, updating the state of the
+ * store via {@link StoreOperations}.
+ * IOStatistics will also be updated.
+ * Stages are expected to be combined to form the commit protocol.
+ * @param <IN> Type of arguments to the stage.
+ * @param <OUT> Type of result.
+ */
+public abstract class AbstractJobCommitStage<IN, OUT>

Review comment:
       Nit: Name is a little confusing (the Job part, since this is used by all stages)

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/LoadManifestsStage.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_ALL_MANIFESTS;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_LOAD_MANIFESTS;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.maybeAddIOStatistics;
+
+/**
+ * Stage to load all the task manifests in the job attempt directory.
+ * Invoked in Job Commit.
+ * Manifests are loaded in parallel.
+ * The IOStatistics snapshot passed in is built up with the statistics
+ * and the statistics stripped from the manifest if prune == true.
+ * This keeps the memory footprint of each manifest down.
+ */
+public class LoadManifestsStage extends
+    AbstractJobCommitStage<Boolean, LoadManifestsStage.Result> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      LoadManifestsStage.class);
+
+  /**
+   * Summary of manifest loading.
+   */
+  private final SummaryInfo summaryInfo = new SummaryInfo();
+
+  /**
+   * Should manifests be pruned of IOStatistics?
+   */
+  private boolean pruneManifests;
+
+  /**
+   * List of loaded manifests.
+   */
+  private final List<TaskManifest> manifests = new ArrayList<>();
+
+  public LoadManifestsStage(final StageConfig stageConfig) {
+    super(false, stageConfig, OP_STAGE_JOB_LOAD_MANIFESTS, true);
+  }
+
+  /**
+   * Load the manifests.
+   * @param prune should manifests be pruned of IOStatistics?
+   * @return the summary and a list of manifests.
+   * @throws IOException IO failure.
+   */
+  @Override
+  protected LoadManifestsStage.Result executeStage(
+      final Boolean prune) throws IOException {
+
+    final Path manifestDir = getTaskManifestDir();
+    LOG.info("{}: Executing Manifest Job Commit with manifests in {}",
+        getName(),
+        manifestDir);
+    pruneManifests = prune;
+    // build a list of all task manifests successfully committed
+    //
+    msync(manifestDir);
+    final RemoteIterator<FileStatus> manifestFiles = listManifests();
+
+    final List<TaskManifest> manifestList = loadAllManifests(manifestFiles);
+    LOG.info("{}: Summary of {} manifests loaded in {}: {}",
+        getName(),
+        manifestList.size(),
+        manifestDir,
+        summaryInfo);
+
+    // collect any stats
+    maybeAddIOStatistics(getIOStatistics(), manifestFiles);
+    return new LoadManifestsStage.Result(summaryInfo, manifestList);
+  }
+
+  /**
+   * Load all the manifests.
+   * @param manifestFiles list of manifest files.
+   * @return the loaded manifests.
+   * @throws IOException IO Failure.
+   */
+  private List<TaskManifest> loadAllManifests(
+      final RemoteIterator<FileStatus> manifestFiles) throws IOException {
+
+    trackDurationOfInvocation(getIOStatistics(), OP_LOAD_ALL_MANIFESTS, () ->
+        TaskPool.foreach(manifestFiles)
+            .executeWith(getIOProcessors())
+            .stopOnFailure()

Review comment:
       Here and elsewhere. Not sure what this does, and how any errors will reflect ... 
   i.e. will this end up throwing an Exception? What will happen to the other threads that may be in the process of execution?
   (Have not looked at the TaskPool yet)

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java
##########
@@ -0,0 +1,751 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperations;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperationsThroughFileSystem;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbortTaskStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitJobStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitTaskStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJobStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageEventCallbacks;
+import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;
+
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_COMPLETED_COUNT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_FAILED_COUNT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_ABORT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.STAGE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.updateCommonContextOnCommitterExit;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.updateCommonContextOnCommitterEntry;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createIOStatisticsStore;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createJobSummaryFilename;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createManifestOutcome;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestPathForTask;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage.cleanupStageOptionsFromConfig;
+
+/**
+ * This is the Intermediate-Manifest committer.
+ * At every entry point it updates the thread's audit context with
+ * the current stage info; this is a placeholder for
+ * adding audit information to stores other than S3A.
+ *
+ * This is tagged as public/stable. This is mandatory
+ * for the classname and PathOutputCommitter implementation
+ * classes.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ManifestCommitter extends PathOutputCommitter implements
+    IOStatisticsSource, StageEventCallbacks {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      ManifestCommitter.class);
+
+  /**
+   * Role: task committer.
+   */
+  public static final String TASK_COMMITTER = "task committer";
+
+  /**
+   * Role: job committer.
+   */
+  public static final String JOB_COMMITTER = "job committer";
+
+  /**
+   * Committer Configuration as extracted from
+   * the job/task context and set in the constructor.
+   */
+  private final ManifestCommitterConfig baseConfig;
+
+  /**
+   * Destination of the job.
+   */
+  private final Path destinationDir;
+
+  /**
+   * For tasks, the attempt directory.
+   * Null for jobs.
+   */
+  private final Path taskAttemptDir;
+
+  /**
+   * IOStatistics to update.
+   */
+  private final IOStatisticsStore iostatistics;
+
+  /**
+   *  The job Manifest Success data; only valid after a job successfully
+   *  commits.
+   */
+  private ManifestSuccessData successReport;
+
+  /**
+   * The active stage; is updated by a callback from within the stages.
+   */
+  private String activeStage;
+
+  /**
+   * The task manifest of the task commit.
+   * Null unless this is a task attempt and the
+   * task has successfully been committed.
+   */
+  private TaskManifest taskAttemptCommittedManifest;
+
+  /**
+   * Create a committer.
+   * @param outputPath output path
+   * @param context job/task context
+   * @throws IOException failure.
+   */
+  ManifestCommitter(final Path outputPath,
+      final TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    this.destinationDir = resolveDestinationDirectory(outputPath,
+        context.getConfiguration());
+    this.iostatistics = createIOStatisticsStore().build();
+    this.baseConfig = enterCommitter(
+        context.getTaskAttemptID() != null,
+        context);
+
+    this.taskAttemptDir = baseConfig.getTaskAttemptDir();
+    LOG.info("Created ManifestCommitter with JobID {},"
+            + " Task Attempt {} and destination {}",
+        context.getJobID(), context.getTaskAttemptID(), outputPath);
+  }
+
+  /**
+   * Committer method invoked; generates a config for it.
+   * Calls {@code #updateCommonContextOnCommitterEntry()}
+   * to update the audit context.
+   * @param isTask is this a task entry point?
+   * @param context context
+   * @return committer config
+   */
+  private ManifestCommitterConfig enterCommitter(boolean isTask,
+      JobContext context) {
+    ManifestCommitterConfig committerConfig =
+        new ManifestCommitterConfig(
+            getOutputPath(),
+            isTask ? TASK_COMMITTER : JOB_COMMITTER,
+            context,
+            iostatistics,
+            this);
+    updateCommonContextOnCommitterEntry(committerConfig);
+    return committerConfig;
+  }
+
+  /**
+   * Set up a job through a {@link SetupJobStage}.
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException IO Failure.
+   */
+  public void setupJob(final JobContext jobContext) throws IOException {
+    ManifestCommitterConfig committerConfig = enterCommitter(false,
+        jobContext);
+    StageConfig stageConfig =
+        committerConfig
+            .createJobStageConfig()
+            .withOperations(createStoreOperations())
+            .build();
+    // set up the job.
+    new SetupJobStage(stageConfig)
+        .apply(committerConfig.getCreateJobMarker());
+    logCommitterStatisticsAtDebug();
+  }
+
+  /**
+   * Set up a task through a {@link SetupTaskStage}.
+   * Classic FileOutputCommitter is a no-op here, relying
+   * on RecordWriters to create the dir implicitly on file
+   * create().
+   * FileOutputCommitter also uses the existence of that
+   * file as a flag to indicate task commit is needed.
+   * @param context task context.
+   * @throws IOException IO Failure.
+   */
+  public void setupTask(final TaskAttemptContext context)
+      throws IOException {
+    ManifestCommitterConfig committerConfig =
+        enterCommitter(true, context);
+    StageConfig stageConfig =
+        committerConfig
+            .createJobStageConfig()
+            .withOperations(createStoreOperations())
+            .build();
+    // create task attempt dir; delete if present. Or fail?
+    new SetupTaskStage(stageConfig).apply("");
+    logCommitterStatisticsAtDebug();
+  }
+
+  /**
+   * Always return true.
+   * This way, even if there is no output, stats are collected.
+   * @param context task context.
+   * @return true
+   * @throws IOException IO Failure.
+   */
+  public boolean needsTaskCommit(final TaskAttemptContext context)
+      throws IOException {
+    LOG.info("Probe for needsTaskCommit({})",
+        context.getTaskAttemptID());
+    return true;
+  }
+
+  /**
+   * Failure during Job Commit is not recoverable from.
+   *
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return false, always
+   * @throws IOException never
+   */
+  @Override
+  public boolean isCommitJobRepeatable(final JobContext jobContext)
+      throws IOException {
+    LOG.info("Probe for isCommitJobRepeatable({}): returning false",
+        jobContext.getJobID());
+    return false;
+  }
+
+  /**
+   * Declare that task recovery is not supported.
+   * It would be, if someone added the code *and tests*.
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return false, always
+   * @throws IOException never
+   */
+  @Override
+  public boolean isRecoverySupported(final JobContext jobContext)
+      throws IOException {
+    LOG.info("Probe for isRecoverySupported({}): returning false",
+        jobContext.getJobID());
+    return false;
+  }
+
+  /**
+   *
+   * @param taskContext Context of the task whose output is being recovered
+   * @throws IOException always
+   */
+  @Override
+  public void recoverTask(final TaskAttemptContext taskContext)
+      throws IOException {
+    LOG.warn("Rejecting recoverTask({}) call", taskContext.getTaskAttemptID());
+    throw new IOException("Cannot recover task "
+        + taskContext.getTaskAttemptID());
+  }
+
+  /**
+   * Commit the task.
+   * This is where the task attempt tree list takes place.
+   * @param context task context.
+   * @throws IOException IO Failure.
+   */
+  public void commitTask(final TaskAttemptContext context)
+      throws IOException {
+    ManifestCommitterConfig committerConfig = enterCommitter(true,
+        context);
+    try {
+      StageConfig stageConfig = committerConfig.createJobStageConfig()
+          .withOperations(createStoreOperations())
+          .build();
+      taskAttemptCommittedManifest = new CommitTaskStage(stageConfig)
+          .apply(null).getTaskManifest();
+      iostatistics.incrementCounter(COMMITTER_TASKS_COMPLETED_COUNT, 1);
+    } catch (IOException e) {
+      iostatistics.incrementCounter(COMMITTER_TASKS_FAILED_COUNT, 1);
+      throw e;
+    } finally {
+      logCommitterStatisticsAtDebug();
+      updateCommonContextOnCommitterExit();
+    }
+
+  }
+
+  /**
+   * Abort a task.
+   * @param context task context
+   * @throws IOException failure during the delete
+   */
+  public void abortTask(final TaskAttemptContext context)
+      throws IOException {
+    ManifestCommitterConfig committerConfig = enterCommitter(true,
+        context);
+    try {
+      new AbortTaskStage(
+          committerConfig.createJobStageConfig()
+              .withOperations(createStoreOperations())
+              .build())
+          .apply(false);

Review comment:
       Should this be true? Or relying on the caller to ignore the error?

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbortTaskStage.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_ABORT_TASK;
+
+/**
+ * Abort a task.
+ *
+ * This is done by deleting the task directory.
+ * Exceptions may/may not be suppressed.
+ */
+public class AbortTaskStage extends
+    AbstractJobCommitStage<Boolean, Path> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbortTaskStage.class);
+
+  public AbortTaskStage(final StageConfig stageConfig) {
+    super(true, stageConfig, OP_STAGE_TASK_ABORT_TASK, false);
+  }
+
+  /**
+   * Delete the task attempt directory.
+   * @param suppressExceptions should exceptions be ignored?
+   * @return the directory
+   * @throws IOException failure when exceptions were not suppressed
+   */
+  @Override
+  protected Path executeStage(final Boolean suppressExceptions)

Review comment:
       Asked elsewhere as well. Would this ever be invoked outside of individual tasks. i.e. could this end up requiring parallelization at a higher level.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/package-info.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Persistence formats.
+ * These are the persistence formats used for passing data from tasks
+ * to the job committer
+ * {@link org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest},
+ * and for a {@code _SUCCESS} file, which is in
+ * {@link org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData}.
+ * The {@code _SUCCESS} file is a copy of the S3A Committer
+ * {@code org.apache.hadoop.fs.s3a.commit.files.ManifestSuccessData},
+ * the intent being that at the JSON-level they are compatible.
+ * This is to aid testing/validation and support calls, with one single
+ * format to load.
+ *
+ * Consult the individual formats for their declarations of access;
+ * the _SUCCESS file is one which tests may use.
+ *
+ */
+@InterfaceAudience.Public

Review comment:
       Public for consumption by tooling which may want to analyze?

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitTaskStage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_COMMIT;
+
+/**
+ * Commit a task attempt.
+ * Scan the task attempt directories through
+ * {@link TaskAttemptScanDirectoryStage}
+ * and then save to the task manifest path at
+ * {@link SaveTaskManifestStage}.
+ */
+
+public class CommitTaskStage extends
+    AbstractJobCommitStage<Void, CommitTaskStage.Result> {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CommitTaskStage.class);
+
+  public CommitTaskStage(final StageConfig stageConfig) {
+    super(true, stageConfig, OP_STAGE_TASK_COMMIT, false);
+  }
+
+  /**
+   * Scan the task attempt dir then save the manifest.
+   * A snapshot of the IOStats will be included in the manifest;
+   * this includes the scan time.
+   * @param arguments arguments to the function.
+   * @return the path the manifest was saved to, and the manifest.
+   * @throws IOException IO failure.
+   */
+  @Override
+  protected CommitTaskStage.Result executeStage(final Void arguments)
+      throws IOException {
+    LOG.info("{}: Committing task \"{}\"", getName(), getTaskAttemptId());
+
+    // execute the scan
+    final TaskAttemptScanDirectoryStage scanStage =

Review comment:
       Here and elsewhere in Stages which internally reference other stages... this ends up resetting various bits of state information, without any mechanism of setting them back. e.g. stageName will go to being empty (and not CommitTaskStage) after this execution completes. Similarly AuditInfo will end up being reset. (ManifestCommitter.enterStage / exitStage calls invoked by the apply() wrapper)

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/package-info.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Intermediate manifest committer.
+ *
+ * Optimized for object stores where listing is slow, directory renames may not
+ * be atomic, and the output is a deep tree of files intermixed with
+ * the output of (many) other task attempts.
+ *
+ * All classes in this module are private/unstable, except where stated.
+ */
+
+@InterfaceAudience.Private

Review comment:
       This includes the configs as private ?

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/UnreliableStoreOperations.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.OPERATION_TIMED_OUT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperationsThroughFileSystem.E_TRASH_FALSE;
+
+/**
+ * Wrap an existing StoreOperations implementation and fail on
+ * specific paths.
+ * This is for testing. It could be implemented via
+ * Mockito 2 spy code but is not so that:
+ * 1. It can be backported to Hadoop versions using Mockito 1.x.
+ * 2. It can be extended to use in production. This is why it is in
+ * the production module -to allow for downstream tests to adopt it.

Review comment:
       Downstream could adopt this even if it were in the test module. e.g. the way MiniHDFSCluster is exposed.
   
   Also, the comment makes me think StoreOperations is supposed to be a public endpoint.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/LoadManifestsStage.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_ALL_MANIFESTS;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_LOAD_MANIFESTS;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.maybeAddIOStatistics;
+
+/**
+ * Stage to load all the task manifests in the job attempt directory.
+ * Invoked in Job Commit.
+ * Manifests are loaded in parallel.
+ * The IOStatistics snapshot passed in is built up with the statistics
+ * and the statistics stripped from the manifest if prune == true.
+ * This keeps the memory footprint of each manifest down.
+ */
+public class LoadManifestsStage extends
+    AbstractJobCommitStage<Boolean, LoadManifestsStage.Result> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      LoadManifestsStage.class);
+
+  /**
+   * Summary of manifest loading.
+   */
+  private final SummaryInfo summaryInfo = new SummaryInfo();
+
+  /**
+   * Should manifests be pruned of IOStatistics?
+   */
+  private boolean pruneManifests;
+
+  /**
+   * List of loaded manifests.
+   */
+  private final List<TaskManifest> manifests = new ArrayList<>();
+
+  public LoadManifestsStage(final StageConfig stageConfig) {
+    super(false, stageConfig, OP_STAGE_JOB_LOAD_MANIFESTS, true);
+  }
+
+  /**
+   * Load the manifests.
+   * @param prune should manifests be pruned of IOStatistics?
+   * @return the summary and a list of manifests.
+   * @throws IOException IO failure.
+   */
+  @Override
+  protected LoadManifestsStage.Result executeStage(
+      final Boolean prune) throws IOException {
+
+    final Path manifestDir = getTaskManifestDir();
+    LOG.info("{}: Executing Manifest Job Commit with manifests in {}",
+        getName(),
+        manifestDir);
+    pruneManifests = prune;
+    // build a list of all task manifests successfully committed
+    //
+    msync(manifestDir);

Review comment:
       Have no idea what an msync does. Invoked in the job committer.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java
##########
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.measureDurationOfInvocation;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_PREPARE_PARENT_DIRECTORIES;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_CREATE_DIRECTORIES;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_FILE_UNDER_DESTINATION;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MKDIRS_RETURNED_FALSE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_PREPARE_DIR_ANCESTORS;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CREATE_TARGET_DIRS;
+import static org.apache.hadoop.util.OperationDuration.humanTime;
+
+/**
+ * Prepare the destination directory tree, by making as few IO calls as
+ * possible -and doing those IO operations in the thread pool.
+ *
+ * The classic FileOutputCommitter does a recursive treewalk and
+ * deletes any files found at paths where directories are to be created.
+ *
+ *
+ * Each task manifest's directories are combined with those of the other tasks
+ * to build a set of all directories which are needed, without duplicates.
+ *
+ * If the option to prepare parent directories was set,
+ * these are also prepared for the commit.
+ * A set of ancestor directories is built up, all of which MUST NOT
+ * be files and SHALL be in one of two states
+ * - directory
+ * - nonexistent
+ * Across a thread pool, An isFile() probe is made for all parents;
+ * if there is a file there then it is deleted.
+ *
+ * After any ancestor dir preparation, the final dir set is created
+ * though mkdirs() calls, with some handling for race conditions.
+ * It is not an error if mkdirs() fails because the dest dir is there;
+ * if it fails because a file was found -that file will be deleted.
+ *
+ * The stage returns the list of directories created, and for testing,
+ * the map of paths to outcomes.
+ */
+public class CreateOutputDirectoriesStage extends
+    AbstractJobCommitStage<List<TaskManifest>, CreateOutputDirectoriesStage.Result> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CreateOutputDirectoriesStage.class);
+
+  /**
+   * Message to print on first mkdir failure if the parent dirs were not
+   * prepared.
+   */
+  private static final String HINT_PREPARE_PARENT_DIR = "Setting "
+      + OPT_PREPARE_PARENT_DIRECTORIES + " to \"true\" performs more parent directory"
+      + " preparation and so may fix this problem";
+
+  /**
+   * Directories as a map of (path, path).
+   * Using a map rather than any set for efficient concurrency; the
+   * concurrent sets don't do lookups so fast.
+   */
+  private final Map<Path, DirMapState> dirMap = new ConcurrentHashMap<>();
+
+  /**
+   * A list of created paths for the results.
+   */
+  private final List<Path> createdDirectories = new ArrayList<>();
+
+  public CreateOutputDirectoriesStage(final StageConfig stageConfig) {
+    super(false, stageConfig, OP_STAGE_JOB_CREATE_TARGET_DIRS, true);
+    // add the dest dir to the dir map as we expect the job setup to create it.
+    dirMap.put(getDestinationDir(), DirMapState.dirWasCreated);
+  }
+
+  @Override
+  protected Result executeStage(
+      final List<TaskManifest> taskManifests)
+      throws IOException {
+
+    final List<Path> directories = createAllDirectories(taskManifests);
+    LOG.debug("{}: Created {} directories", getName(), directories.size());
+    return new Result(directories, dirMap);
+  }
+
+  /**
+   * For each task, build the list of directories it wants.
+   * @param taskManifests task manifests
+   * @return the list of paths which have been created.
+   */
+  private List<Path> createAllDirectories(final List<TaskManifest> taskManifests)
+      throws IOException {
+
+    // the set is all directories which need to exist across all
+    // tasks.
+    // This set is probed during parent dir scanning, so
+    //  needs to be efficient to look up values.
+    final Set<Path> directoriesToCreate = new HashSet<>();
+
+    // iterate through the task manifests
+    // and all output dirs into the set of dirs to
+    // create.
+    // hopefully there is a lot of overlap, so the
+    // final number of dirs to create is small.
+    for (TaskManifest task : taskManifests) {
+      final List<FileOrDirEntry> dirEntries
+          = task.getDirectoriesToCreate();
+      for (FileOrDirEntry entry: dirEntries) {
+        // add the dest path
+        directoriesToCreate.add(entry.getDestPath());
+      }
+    }
+
+    // Prepare parent directories.
+    if (getStageConfig().getPrepareParentDirectories()) {
+      prepareParentDirectories(directoriesToCreate);
+    }
+
+    // Now the real work.
+
+    final int createCount = directoriesToCreate.size();
+    LOG.info("Preparing {} directory/directories", createCount);
+    // now probe for and create the parent dirs of all renames
+    Duration d = measureDurationOfInvocation(getIOStatistics(), OP_CREATE_DIRECTORIES, () ->
+        TaskPool.foreach(directoriesToCreate)
+            .executeWith(getIOProcessors(createCount))
+            .onFailure(this::reportMkDirFailure)
+            .stopOnFailure()
+            .run(dir -> {
+              updateAuditContext(OP_STAGE_JOB_CREATE_TARGET_DIRS);
+              if (maybeCreateOneDirectory(dir)) {
+                addCreatedDirectory(dir);
+              }
+            }));
+    LOG.info("Time to prepare directories {}", humanTime(d.toMillis()));
+    return createdDirectories;
+  }
+
+  /**
+   * How many failures have been reported.
+   */
+  private final AtomicInteger failureCount = new AtomicInteger();
+
+  /**
+   * report a single directory failure.
+   * @param path path which could not be deleted
+   * @param e exception raised.
+   */
+  private void reportMkDirFailure(Path path, Exception e) {
+    final int count = failureCount.incrementAndGet();
+    LOG.warn("{}: mkdir failure #{} Failed to create directory \"{}\": {}",
+        getName(), count, path, e.toString());
+    LOG.debug("{}: Full exception details",
+        getName(), e);
+    if (count == 1  && !getStageConfig().getPrepareParentDirectories()) {
+      // first failure and no parent dir setup, so recommend
+      // preparing the parent dir
+      LOG.warn(HINT_PREPARE_PARENT_DIR);
+    }
+  }
+
+  /**
+   * Clean up parent dirs. No attempt is made to create them, because
+   * we know mkdirs() will do that (assuming it has the permissions,
+   * of course)
+   * @param directoriesToCreate set of dirs to create
+   * @throws IOException IO problem
+   */
+  private void prepareParentDirectories(final Set<Path> directoriesToCreate)
+      throws IOException {
+
+    // include parents in the paths
+    final Set<Path> ancestors = extractAncestors(
+        getStageConfig().getDestinationDir(),
+        directoriesToCreate);
+
+    final int ancestorCount = ancestors.size();
+    LOG.info("Preparing {} ancestor directory/directories", ancestorCount);
+    // if there is a single ancestor, don't bother with parallisation.
+    Duration d = measureDurationOfInvocation(getIOStatistics(),
+        OP_PREPARE_DIR_ANCESTORS, () ->
+            TaskPool.foreach(ancestors)
+                .executeWith(getIOProcessors(ancestorCount))
+                .stopOnFailure()
+                .run(dir -> {
+                  updateAuditContext(OP_PREPARE_DIR_ANCESTORS);
+                  prepareParentDir(dir);
+                }));
+    LOG.info("Time to prepare ancestor directories {}", humanTime(d.toMillis()));
+  }
+
+  /**
+   * Prepare a parent directory.
+   * @param dir directory to probe
+   * @throws IOException failure in probe other than FNFE
+   */
+  private void prepareParentDir(Path dir) throws IOException {
+    // report progress back
+    progress();
+    LOG.debug("{}: Probing parent dir {}", getName(), dir);
+    if (isFile(dir)) {

Review comment:
       In what scenario could this be a file? (Maybe a parallel job?)

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java
##########
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.measureDurationOfInvocation;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_PREPARE_PARENT_DIRECTORIES;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_CREATE_DIRECTORIES;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_FILE_UNDER_DESTINATION;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MKDIRS_RETURNED_FALSE;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_PREPARE_DIR_ANCESTORS;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CREATE_TARGET_DIRS;
+import static org.apache.hadoop.util.OperationDuration.humanTime;
+
+/**
+ * Prepare the destination directory tree, by making as few IO calls as
+ * possible -and doing those IO operations in the thread pool.
+ *
+ * The classic FileOutputCommitter does a recursive treewalk and
+ * deletes any files found at paths where directories are to be created.
+ *
+ *
+ * Each task manifest's directories are combined with those of the other tasks
+ * to build a set of all directories which are needed, without duplicates.
+ *
+ * If the option to prepare parent directories was set,
+ * these are also prepared for the commit.
+ * A set of ancestor directories is built up, all of which MUST NOT
+ * be files and SHALL be in one of two states
+ * - directory
+ * - nonexistent
+ * Across a thread pool, An isFile() probe is made for all parents;
+ * if there is a file there then it is deleted.
+ *
+ * After any ancestor dir preparation, the final dir set is created
+ * though mkdirs() calls, with some handling for race conditions.
+ * It is not an error if mkdirs() fails because the dest dir is there;
+ * if it fails because a file was found -that file will be deleted.
+ *
+ * The stage returns the list of directories created, and for testing,
+ * the map of paths to outcomes.
+ */
+public class CreateOutputDirectoriesStage extends
+    AbstractJobCommitStage<List<TaskManifest>, CreateOutputDirectoriesStage.Result> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CreateOutputDirectoriesStage.class);
+
+  /**
+   * Message to print on first mkdir failure if the parent dirs were not
+   * prepared.
+   */
+  private static final String HINT_PREPARE_PARENT_DIR = "Setting "
+      + OPT_PREPARE_PARENT_DIRECTORIES + " to \"true\" performs more parent directory"
+      + " preparation and so may fix this problem";
+
+  /**
+   * Directories as a map of (path, path).
+   * Using a map rather than any set for efficient concurrency; the
+   * concurrent sets don't do lookups so fast.
+   */
+  private final Map<Path, DirMapState> dirMap = new ConcurrentHashMap<>();
+
+  /**
+   * A list of created paths for the results.
+   */
+  private final List<Path> createdDirectories = new ArrayList<>();
+
+  public CreateOutputDirectoriesStage(final StageConfig stageConfig) {
+    super(false, stageConfig, OP_STAGE_JOB_CREATE_TARGET_DIRS, true);
+    // add the dest dir to the dir map as we expect the job setup to create it.
+    dirMap.put(getDestinationDir(), DirMapState.dirWasCreated);
+  }
+
+  @Override
+  protected Result executeStage(
+      final List<TaskManifest> taskManifests)
+      throws IOException {
+
+    final List<Path> directories = createAllDirectories(taskManifests);
+    LOG.debug("{}: Created {} directories", getName(), directories.size());
+    return new Result(directories, dirMap);
+  }
+
+  /**
+   * For each task, build the list of directories it wants.
+   * @param taskManifests task manifests
+   * @return the list of paths which have been created.
+   */
+  private List<Path> createAllDirectories(final List<TaskManifest> taskManifests)
+      throws IOException {
+
+    // the set is all directories which need to exist across all
+    // tasks.
+    // This set is probed during parent dir scanning, so
+    //  needs to be efficient to look up values.
+    final Set<Path> directoriesToCreate = new HashSet<>();
+
+    // iterate through the task manifests
+    // and all output dirs into the set of dirs to
+    // create.
+    // hopefully there is a lot of overlap, so the
+    // final number of dirs to create is small.
+    for (TaskManifest task : taskManifests) {
+      final List<FileOrDirEntry> dirEntries
+          = task.getDirectoriesToCreate();
+      for (FileOrDirEntry entry: dirEntries) {
+        // add the dest path
+        directoriesToCreate.add(entry.getDestPath());
+      }
+    }
+
+    // Prepare parent directories.
+    if (getStageConfig().getPrepareParentDirectories()) {
+      prepareParentDirectories(directoriesToCreate);
+    }
+
+    // Now the real work.
+
+    final int createCount = directoriesToCreate.size();
+    LOG.info("Preparing {} directory/directories", createCount);
+    // now probe for and create the parent dirs of all renames
+    Duration d = measureDurationOfInvocation(getIOStatistics(), OP_CREATE_DIRECTORIES, () ->
+        TaskPool.foreach(directoriesToCreate)
+            .executeWith(getIOProcessors(createCount))
+            .onFailure(this::reportMkDirFailure)
+            .stopOnFailure()
+            .run(dir -> {
+              updateAuditContext(OP_STAGE_JOB_CREATE_TARGET_DIRS);
+              if (maybeCreateOneDirectory(dir)) {
+                addCreatedDirectory(dir);
+              }
+            }));
+    LOG.info("Time to prepare directories {}", humanTime(d.toMillis()));
+    return createdDirectories;
+  }
+
+  /**
+   * How many failures have been reported.
+   */
+  private final AtomicInteger failureCount = new AtomicInteger();
+
+  /**
+   * report a single directory failure.
+   * @param path path which could not be deleted
+   * @param e exception raised.
+   */
+  private void reportMkDirFailure(Path path, Exception e) {
+    final int count = failureCount.incrementAndGet();
+    LOG.warn("{}: mkdir failure #{} Failed to create directory \"{}\": {}",
+        getName(), count, path, e.toString());
+    LOG.debug("{}: Full exception details",
+        getName(), e);
+    if (count == 1  && !getStageConfig().getPrepareParentDirectories()) {
+      // first failure and no parent dir setup, so recommend
+      // preparing the parent dir
+      LOG.warn(HINT_PREPARE_PARENT_DIR);
+    }
+  }
+
+  /**
+   * Clean up parent dirs. No attempt is made to create them, because
+   * we know mkdirs() will do that (assuming it has the permissions,
+   * of course)
+   * @param directoriesToCreate set of dirs to create
+   * @throws IOException IO problem
+   */
+  private void prepareParentDirectories(final Set<Path> directoriesToCreate)
+      throws IOException {
+
+    // include parents in the paths
+    final Set<Path> ancestors = extractAncestors(
+        getStageConfig().getDestinationDir(),
+        directoriesToCreate);
+
+    final int ancestorCount = ancestors.size();
+    LOG.info("Preparing {} ancestor directory/directories", ancestorCount);
+    // if there is a single ancestor, don't bother with parallisation.
+    Duration d = measureDurationOfInvocation(getIOStatistics(),
+        OP_PREPARE_DIR_ANCESTORS, () ->
+            TaskPool.foreach(ancestors)
+                .executeWith(getIOProcessors(ancestorCount))
+                .stopOnFailure()
+                .run(dir -> {
+                  updateAuditContext(OP_PREPARE_DIR_ANCESTORS);
+                  prepareParentDir(dir);
+                }));
+    LOG.info("Time to prepare ancestor directories {}", humanTime(d.toMillis()));
+  }
+
+  /**
+   * Prepare a parent directory.
+   * @param dir directory to probe
+   * @throws IOException failure in probe other than FNFE
+   */
+  private void prepareParentDir(Path dir) throws IOException {
+    // report progress back
+    progress();
+    LOG.debug("{}: Probing parent dir {}", getName(), dir);
+    if (isFile(dir)) {
+      // it's a file: delete it.
+      LOG.info("{}: Deleting file {}", getName(), dir);
+      delete(dir, false, OP_DELETE_FILE_UNDER_DESTINATION);
+      // note its final state
+      addToDirectoryMap(dir, DirMapState.ancestorWasFileNowDeleted);
+    } else {
+      // and add to dir map as a dir or missing entry
+      LOG.debug("{}: Dir {} is missing or a directory", getName(), dir);
+      addToDirectoryMap(dir, DirMapState.ancestorWasDirOrMissing);
+    }
+  }
+
+  /**
+   * Determine all ancestors of the dirs to create which are
+   * under the dest path, not in the set of dirs to create.
+   * Static and public for testability.
+   * @param destDir destination dir of the job
+   * @param directoriesToCreate set of dirs to create from tasks.
+   * @return the set of parent dirs
+   */
+  public static Set<Path> extractAncestors(
+      final Path destDir,
+      final Collection<Path> directoriesToCreate) {
+
+    final Set<Path> ancestors = new HashSet<>(directoriesToCreate.size());

Review comment:
       In what scenario would the directories to create contain a path outside of the destination directory?
   If I'm not mistaken, the destination directory is the configured output directory ... and the source directory will always be something under $outputDir/.temporary/*
   Wondering if for safety - this should error-out if the target is outside of the destination directory

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveTaskManifestStage.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_SAVE_MANIFEST;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestPathForTask;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestTempPathForTaskAttempt;
+
+/**
+ * Save a task manifest to the job attempt dir, using the task
+ * ID for the name of the final file.
+ * For atomic writes, the manifest is saved
+ * by writing to a temp file and then renaming it.
+ * Uses both the task ID and task attempt ID to determine the temp filename;
+ * Before the rename of (temp, final-path), any file at the final path
+ * is deleted.
+ * This is so that when this stage is invoked in a task commit, its output
+ * overwrites any of the first commit.
+ * When it succeeds, therefore, unless there is any subsequent commit of
+ * another task, the task manifest at the final path is from this
+ * operation.
+ *
+ * Returns the path where the manifest was saved.
+ */
+public class SaveTaskManifestStage extends
+    AbstractJobCommitStage<TaskManifest, Path> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      SaveTaskManifestStage.class);
+
+  public SaveTaskManifestStage(final StageConfig stageConfig) {
+    super(true, stageConfig, OP_STAGE_TASK_SAVE_MANIFEST, false);
+  }
+
+  /**
+   * Save the manifest to a temp file and rename to the final
+   * manifest destination.
+   * @param manifest manifest
+   * @return the path to the final entry
+   * @throws IOException IO failure.
+   */
+  @Override
+  protected Path executeStage(final TaskManifest manifest)
+      throws IOException {
+
+    final Path manifestDir = getTaskManifestDir();
+    // final manifest file is by task ID
+    Path manifestFile = manifestPathForTask(manifestDir,
+        getRequiredTaskId());
+    Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir,
+        getRequiredTaskAttemptId());
+    LOG.info("{}: Saving manifest file to {}", getName(), manifestFile);
+    save(manifest, manifestTempFile, manifestFile);

Review comment:
       Is this to somehow handle scenarios where there may be more than one attempt of the same task which has been told it can commit?

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/RenameFilesStage.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER_FILE_LIMIT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_RENAME_FILES;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createManifestOutcome;
+import static org.apache.hadoop.thirdparty.com.google.common.collect.Iterables.concat;
+
+/**
+ * This stage renames all the files.
+ * It retuns a manifest success data file summarizing the
+ * output, but does not add iostatistics to it.
+ */
+public class RenameFilesStage extends
+    AbstractJobCommitStage<List<TaskManifest>, ManifestSuccessData> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      RenameFilesStage.class);
+
+  /**
+   * List of all files committed.
+   */
+  private final List<FileOrDirEntry> filesCommitted = new ArrayList<>();
+
+  /**
+   * Total file size.
+   */
+  private long totalFileSize = 0;
+
+  public RenameFilesStage(final StageConfig stageConfig) {
+    super(false, stageConfig, OP_STAGE_JOB_RENAME_FILES, true);
+  }
+
+  /**
+   * Get the list of files committed.
+   * Access is not synchronized.
+   * @return direct access to the list of files.
+   */
+  public synchronized  List<FileOrDirEntry> getFilesCommitted() {
+    return filesCommitted;
+  }
+
+  /**
+   * Get the total file size of the committed task.
+   * @return a number greater than or equal to zero.
+   */
+  public synchronized long getTotalFileSize() {
+    return totalFileSize;
+  }
+
+  /**
+   * Rename files in job commit.
+   * @param taskManifests a list of task manifests containing files.
+   * @return the job report.
+   * @throws IOException failure
+   */
+  @Override
+  protected ManifestSuccessData executeStage(
+      final List<TaskManifest> taskManifests)
+      throws IOException {
+
+    final ManifestSuccessData success = createManifestOutcome(getStageConfig(),
+        OP_STAGE_JOB_COMMIT);
+    final int manifestCount = taskManifests.size();
+
+    LOG.info("{}: Executing Manifest Job Commit with {} manifests in {}",
+        getName(), manifestCount, getTaskManifestDir());
+
+    // first step is to aggregate the output of all manifests into a single
+    // list of files to commit.
+    // Which Guava can do in a zero-copy concatenated iterator
+
+    final Iterable<FileOrDirEntry> filesToCommit = concat(taskManifests.stream()
+        .map(TaskManifest::getFilesToCommit)
+        .collect(Collectors.toList()));
+
+    TaskPool.foreach(filesToCommit)
+        .executeWith(getIOProcessors())
+        .stopOnFailure()
+        .run(this::commitOneFile);
+
+    // synchronized block to keep spotbugs happy.
+    List<FileOrDirEntry> committed = getFilesCommitted();
+    LOG.info("{}: Files committed: {}. Total size {}",
+        getName(), committed.size(), getTotalFileSize());
+
+    // Add a subset of the destination files to the success file;
+    // enough for simple testing
+    success.getFilenames().addAll(
+        committed
+            .subList(0, Math.min(committed.size(), SUCCESS_MARKER_FILE_LIMIT))

Review comment:
       If this is limited, may want to add some information about number of total files, and a message indicating that files were truncated - to avoid confusion while debugging (why are there only 100 files?). Didn't see total files in ManifestSuccessData.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/StoreOperations.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * FileSystem operations which are needed to generate the task manifest.
+ * The specific choice of which implementation to use is configurable.
+ *
+ * If a store provides a custom file commit operation then it must
+ * offer a subclass of this which returns true in
+ * {@link #storeSupportsResilientCommit()}
+ * and then implement
+ * {@link #commitFile(FileOrDirEntry)}.
+ *
+ */
+public abstract class StoreOperations implements Closeable {

Review comment:
       This is probably one of the biggest concerns. This class almost looks like a parallel to FileSystem, or a complementary extension.
   
   There's an Azure implementation of this in the hadoop-cloud Azure project, which would imply hadoop-azure/hadoop-cloud-storage adding a dependency to hadoop-mapreduce. I don't think hadoop-cloud-storage should ever be depending on hadoop-mapreduce. hadoop-common is where the dependency hopefully ends.
   
   If retaining this class, I think it needs to be limited to the ManifestCommitter. Maybe rename it to ManifestCommitterStoreOperations to make it clear that others should not be attempting to use this, or consider it as complimentary to FileSystem. The Azure impl can likely move into the same module?

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/StageConfig.java
##########
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.util.Objects;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperations;
+import org.apache.hadoop.util.JsonSerialization;
+import org.apache.hadoop.util.Preconditions;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.RateLimiting;
+import org.apache.hadoop.util.RateLimitingFactory;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER;
+
+/**
+ * Stage Config.
+ * Everything to configure a stage which is common to all.
+ *
+ * It's isolated from the details of MR datatypes (taskID, taskattempt etc);
+ * at this point it expects parsed values.
+ *
+ * It uses the builder API, but once {@link #build()} is called it goes
+ * read only. This is to ensure that changes cannot
+ * take place when shared across stages.
+ */
+public class StageConfig {
+
+  /**
+   * A flag which freezes the config for
+   * further updates.
+   */
+  private boolean frozen;
+
+  /**
+   * IOStatistics to update.
+   */
+  private IOStatisticsStore iostatistics;
+
+  /**
+   * Job ID; constant over multiple attempts.
+   */
+  private String jobId;
+
+  /**
+   * Where did the job Unique ID come from?
+   */
+  private String jobIdSource = "";
+
+  /**
+   * Number of the job attempt; starts at zero.
+   */
+  private int jobAttemptNumber;
+
+  /**
+   * ID of the task.
+   */
+  private String taskId;
+
+  /**
+   * ID of this specific attempt at a task.
+   */
+  private String taskAttemptId;
+
+  /**
+   * Destination of job.
+   */
+  private Path destinationDir;
+
+  /**
+   * Job attempt dir.
+   */
+  private Path jobAttemptDir;
+
+  /**
+   * temp directory under job dest dir.
+   */
+  private Path outputTempSubDir;
+
+  /**
+   * Task attempt dir.
+   */
+  private Path taskAttemptDir;
+
+  /**
+   * directory where task manifests must go.
+   */
+  private Path taskManifestDir;
+
+  /**
+   * Subdir under the job attempt dir where task
+   * attempts will have subdirectories.
+   */
+  private Path jobAttemptTaskSubDir;
+
+  /**
+   * Callbacks to update store.
+   * This is not made visible to the stages; they must
+   * go through the superclass which
+   * adds statistics and logging.
+   */
+  private StoreOperations operations;
+
+  /**
+   * Submitter for doing IO against the store other than
+   * manifest processing.
+   */
+  private TaskPool.Submitter ioProcessors;
+
+  /**
+   * Optional progress callback.
+   */
+  private Progressable progressable;
+
+  /**
+   * Callback when a stage is entered.
+   */
+  private StageEventCallbacks enterStageEventHandler;
+
+  /**
+   * Thread local serializer; created on demand
+   * and shareable across a sequence of stages.
+   */
+  private final ThreadLocal<JsonSerialization<TaskManifest>> threadLocalSerializer =
+      ThreadLocal.withInitial(TaskManifest::serializer);
+
+  /**
+   * Rate limiter for operations.
+   */
+  private RateLimiting ioLimiter = RateLimitingFactory.unlimitedRate();
+
+  /**
+   * Delete target paths on commit? Stricter, but
+   * higher IO cost.
+   */
+  private boolean deleteTargetPaths;
+
+  /**
+   * Prepare parent dirs by scanning them for files?
+   */
+  private boolean prepareParentDirectories;
+
+  /**
+   * Name for logging.
+   */
+  private String name = "";
+
+  public StageConfig() {
+  }
+
+  /**
+   * Verify that the config is not yet frozen.
+   */
+  private void checkOpen() {
+    Preconditions.checkState(!frozen,
+        "StageConfig is now read-only");
+  }
+
+  /**
+   * The build command makes the config immutable.
+   * Idempotent.
+   * @return the now-frozen config
+   */
+  public StageConfig build() {
+    frozen = true;
+    return this;
+  }
+
+  /**
+   * Set job destination dir.
+   * @param dir new dir
+   * @return this
+   */
+  public StageConfig withDestinationDir(final Path dir) {
+    destinationDir = dir;
+    return this;
+  }
+
+  /**
+   * Set IOStatistics store.
+   * @param store new store
+   * @return this
+   */
+  public StageConfig withIOStatistics(final IOStatisticsStore store) {
+    checkOpen();
+    iostatistics = store;
+    return this;
+  }
+
+  /**
+   * Set builder value.
+   * @param value new value
+   * @return this
+   */
+  public StageConfig withIOProcessors(final TaskPool.Submitter value) {
+    checkOpen();
+    ioProcessors = value;
+    return this;
+  }
+
+  /**
+   * Set Job attempt directory.
+   * @param dir new dir
+   * @return this
+   */
+  public StageConfig withJobAttemptDir(final Path dir) {
+    checkOpen();
+    jobAttemptDir = dir;
+    return this;
+  }
+
+  /**
+   * Directory to put task manifests into.
+   * @return a path under the job attempt dir.
+   */
+  public Path getTaskManifestDir() {
+    return taskManifestDir;
+  }
+
+  /**
+   * Set builder value.
+   * @param value new value
+   * @return the builder
+   */
+  public StageConfig withTaskManifestDir(Path value) {
+    checkOpen();
+    taskManifestDir = value;
+    return this;
+  }
+
+  /**
+   * Set builder value.
+   * @param value new value
+   * @return the builder
+   */
+  public StageConfig withJobAttemptTaskSubDir(Path value) {
+    jobAttemptTaskSubDir = value;
+    return this;
+  }
+
+  /**
+   * Get the path to the subdirectory under $jobID where task
+   * attempts are. List this dir to find all task attempt dirs.
+   * @return a path under the job attempt dir.
+   */
+  public Path getJobAttemptTaskSubDir() {
+    return jobAttemptTaskSubDir;
+  }
+
+  /**
+   * Set the job directories from the attempt directories
+   * information. Does not set task attempt fields.
+   * @param dirs source of directories.
+   * @return this
+   */
+  public StageConfig withJobDirectories(
+      final ManifestCommitterSupport.AttemptDirectories dirs) {
+
+    checkOpen();
+    withJobAttemptDir(dirs.getJobAttemptDir())
+        .withJobAttemptTaskSubDir(dirs.getJobAttemptTaskSubDir())
+        .withDestinationDir(dirs.getOutputPath())
+        .withOutputTempSubDir(dirs.getOutputTempSubDir())
+        .withTaskManifestDir(dirs.getTaskManifestDir());
+
+    return this;
+  }
+
+  /**
+   * Set job ID with no attempt included.
+   * @param value new value
+   * @return this
+   */
+  public StageConfig withJobId(final String value) {
+    checkOpen();
+    jobId = value;
+    return this;
+  }
+
+  public Path getOutputTempSubDir() {
+    return outputTempSubDir;
+  }
+
+  /**
+   * Set builder value.
+   * @param value new value
+   * @return this
+   */
+  public StageConfig withOutputTempSubDir(final Path value) {
+    checkOpen();
+    outputTempSubDir = value;
+    return this;
+  }
+
+  /**
+   * Set builder value.
+   * @param value new value
+   * @return this
+   */
+  public StageConfig withOperations(final StoreOperations value) {
+    checkOpen();
+    operations = value;
+    return this;
+  }
+
+  /**
+   * Set builder value.
+   * @param value new value
+   * @return this
+   */
+  public StageConfig withTaskAttemptId(final String value) {
+    checkOpen();
+    taskAttemptId = value;
+    return this;
+  }
+
+  /**
+   * Set builder value.
+   * @param value new value
+   * @return this
+   */
+  public StageConfig withTaskId(final String value) {
+    checkOpen();
+    taskId = value;
+    return this;
+  }
+
+  /**
+   * Set handler for stage entry events..
+   * @param value new value
+   * @return this
+   */
+  public StageConfig withStageEventCallbacks(StageEventCallbacks value) {
+    checkOpen();
+    enterStageEventHandler = value;
+    return this;
+  }
+
+  /**
+   * Optional progress callback.
+   * @param value new value
+   * @return this
+   */
+  public StageConfig withProgressable(final Progressable value) {
+    checkOpen();
+    progressable = value;
+    return this;
+  }
+
+  /**
+   * Set the Task attempt directory.
+   * @param value new value
+   * @return this
+   */
+  public StageConfig withTaskAttemptDir(final Path value) {
+    checkOpen();
+    taskAttemptDir = value;
+    return this;
+  }
+
+  /**
+   * Set the job attempt number.
+   * @param value new value
+   * @return this
+   */
+  public StageConfig withJobAttemptNumber(final int value) {
+    checkOpen();
+    jobAttemptNumber = value;
+    return this;
+  }
+
+  /**
+   * Set the Job ID source.
+   * @param value new value
+   * @return this
+   */
+  public StageConfig withJobIdSource(final String value) {
+    checkOpen();
+    jobIdSource = value;
+    return this;
+  }
+
+  /**
+   * Set read limiter.
+   * @param value new value
+   * @return the builder
+   */
+  public StageConfig withIOLimiter(RateLimiting value) {
+    checkOpen();
+    ioLimiter = Objects.requireNonNull(value);
+    return this;
+  }
+
+  /**
+   * Set name of task/job.
+   * @param value new value
+   * @return the builder
+   */
+  public StageConfig withName(String value) {
+    name = value;

Review comment:
       Also checkOpen() ?

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/StoreOperations.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * FileSystem operations which are needed to generate the task manifest.
+ * The specific choice of which implementation to use is configurable.
+ *
+ * If a store provides a custom file commit operation then it must
+ * offer a subclass of this which returns true in
+ * {@link #storeSupportsResilientCommit()}
+ * and then implement
+ * {@link #commitFile(FileOrDirEntry)}.
+ *
+ */
+public abstract class StoreOperations implements Closeable {
+
+  /**
+   * Bind to the filesystem.
+   * This is called by the manifest committer after the operations
+   * have been instantiated.
+   * @param fileSystem target FS
+   * @param path actual path under FS.
+   * @throws IOException if there are binding problems.
+   */
+  public void bindToFileSystem(FileSystem fileSystem, Path path) throws IOException {
+
+  }
+
+  /**
+   * Forward to {@link FileSystem#getFileStatus(Path)}.
+   * @param path path
+   * @return status
+   * @throws IOException failure.
+   */
+  public abstract FileStatus getFileStatus(Path path) throws IOException;
+
+  /**
+   * Is a path a file? Used during directory creation.
+   * The is a copy & paste of FileSystem.isFile();
+   * {@code StoreOperationsThroughFileSystem} calls into
+   * the FS direct so that stores which optimize their probes
+   * can save on IO.
+   * @param path path to probe
+   * @return true if the path exists and resolves to a file
+   * @throws IOException failure other than FileNotFoundException
+   */
+  public boolean isFile(Path path) throws IOException {
+    try {
+      return getFileStatus(path).isFile();
+    } catch (FileNotFoundException e) {
+      return false;
+    }
+  }
+
+  /**
+   * Forward to {@link FileSystem#delete(Path, boolean)}.
+   * If it returns without an error: there is nothing at
+   * the end of the path.
+   * @param path path
+   * @param recursive recursive delete.
+   * @return true if the path was deleted.
+   * @throws IOException failure.
+   */
+  public abstract boolean delete(Path path, boolean recursive)
+      throws IOException;
+
+  /**
+   * Forward to {@link FileSystem#mkdirs(Path)}.
+   * Usual "what does 'false' mean" ambiguity.
+   * @param path path
+   * @return true if the directory was created.
+   * @throws IOException failure.
+   */
+  public abstract boolean mkdirs(Path path) throws IOException;
+
+  /**
+   * Forward to {@link FileSystem#rename(Path, Path)}.
+   * Usual "what does 'false' mean" ambiguity.
+   * @param source source file
+   * @param dest destination path -which must not exist.
+   * @return the return value of the rename
+   * @throws IOException failure.
+   */
+  public abstract boolean renameFile(Path source, Path dest)
+      throws IOException;
+
+  /**
+   * Rename a dir; defaults to invoking
+   * Forward to {@link #renameFile(Path, Path)}.
+   * Usual "what does 'false' mean?" ambiguity.
+   * @param source source file
+   * @param dest destination path -which must not exist.
+   * @return true if the directory was created.
+   * @throws IOException failure.
+   */
+  public boolean renameDir(Path source, Path dest)
+      throws IOException {
+    return renameFile(source, dest);
+  }
+
+  /**
+   * List the directory.
+   * @param path path to list.
+   * @return an iterator over the results.
+   * @throws IOException any immediate failure.
+   */
+  public abstract RemoteIterator<FileStatus> listStatusIterator(Path path)
+      throws IOException;
+
+  /**
+   * Load a task manifest from the store.
+   * with a real FS, this is done with
+   * {@link TaskManifest#load(JsonSerialization, FileSystem, Path, FileStatus)}
+   *
+   * @param serializer serializer.
+   * @param st status with the path and other data.
+   * @return the manifest
+   * @throws IOException failure to load/parse
+   */
+  public abstract TaskManifest loadTaskManifest(
+      JsonSerialization<TaskManifest> serializer,
+      FileStatus st) throws IOException;
+
+  /**
+   * Save a task manifest by create(); there's no attempt at
+   * renaming anything here.
+   * @param manifestData the manifest/success file
+   * @param path temp path for the initial save
+   * @param overwrite should create(overwrite=true) be used?
+   * @throws IOException failure to load/parse
+   */
+  public abstract <T extends AbstractManifestData<T>> void save(
+      T manifestData,
+      Path path,
+      boolean overwrite) throws IOException;
+
+  /**
+   * Is trash enabled for the given store.
+   * @param path path to move, assumed to be _temporary
+   * @return true iff trash is enabled.
+   */
+  public abstract boolean isTrashEnabled(Path path);
+
+  /**
+   * Make an msync() call; swallow when unsupported.
+   * @param path path
+   * @throws IOException IO failure
+   */
+  public abstract void msync(Path path) throws IOException;
+
+  /**
+   * Extract an etag from a status if the conditions are met.
+   * If the conditions are not met, return null or ""; they will
+   * both be treated as "no etags available"
+   * <pre>
+   *   1. The status is of a type which the implementation recognizes
+   *   as containing an etag.
+   *   2. After casting the etag field can be retrieved
+   *   3. and that value is non-null/non-empty.
+   * </pre>
+   * @param status status, which may be null of any subclass of FileStatus.
+   * @return either a valid etag, or null or "".
+   */
+  public String getEtag(FileStatus status) {
+    return ManifestCommitterSupport.getEtag(status);
+  }
+
+  /**
+   * Does the store preserve etags through renames.
+   * If true, and if the source listing entry has an etag,
+   * it will be used to attempt to validate a failed rename.
+   * @param path path to probe.
+   * @return true if etag comparison is a valid strategy.
+   */
+  public boolean storePreservesEtagsThroughRenames(Path path) {
+    return false;
+  }
+
+  /**
+   * Does the store provide rename resilience through an
+   * implementation of {@link #commitFile(FileOrDirEntry)}?
+   * If true then that method will be invoked to commit work
+   * @return true if resilient commit support is available.
+   */
+  public boolean storeSupportsResilientCommit() {
+    return false;
+  }
+
+  /**
+   * Commit one file through any resilient API.
+   * This operation MUST rename source to destination,
+   * else raise an exception.
+   * The result indicates whether or not some
+   * form of recovery took place.
+   *
+   * If etags were collected during task commit, these will be
+   * in the entries passed in here.
+   * @param entry entry to commit
+   * @return the result of the commit
+   * @throws IOException failure.
+   * @throws UnsupportedOperationException if not available.
+   *
+   */
+  public CommitFileResult commitFile(FileOrDirEntry entry) throws IOException {
+    throw new UnsupportedOperationException("Resilient commit not supported");
+  }
+
+  /**
+   * Outcome from the operation {@link #commitFile(FileOrDirEntry)}.
+   * As a rename failure MUST raise an exception, this result
+   * only declares whether or not some form of recovery took place.
+   */
+  public static final class CommitFileResult {
+
+    /** Did recovery take place? */
+    private final boolean recovered;
+
+    /**
+     * Full commit result.
+     * @param recovered Did recovery take place?
+     */
+    public static CommitFileResult fromResilientCommit(final boolean recovered) {
+      return new CommitFileResult(recovered);
+    }
+
+    /**
+     * Full commit result.
+     * @param recovered Did recovery take place?
+     *
+     */
+    private CommitFileResult(
+        final boolean recovered) {
+      this.recovered = recovered;
+    }
+
+    /**
+     * Did some form of recovery take place?
+     * @return true if the commit succeeded through some form of (etag-based) recovery
+     */
+    public boolean recovered() {
+      return recovered;
+    }
+
+  }
+
+  /**
+   * Move a directory to trash, with the jobID as its name.
+   * IOExceptions are caught and included in the outcome.
+   * @param jobId job ID.
+   * @param path path to move, assumed to be _temporary
+   * @return the outcome.
+   */
+  public abstract MoveToTrashResult moveToTrash(String jobId, Path path);

Review comment:
       Leave this to the FileSystem implementation?

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperationsThroughFileSystem;
+
+/**
+ * Extension of StoreOperationsThroughFileSystem with ABFS awareness.
+ * Purely for use by jobs committing work through the manifest committer.
+ * its classname MUST NOT be changed otherwise these jobs will fail.
+ *
+ * ADLS Gen2 stores support etag-recovery on renames, but not WASB
+ * stores.
+ */
+@InterfaceAudience.LimitedPrivate("mapreduce")
+@InterfaceStability.Unstable
+public class AbfsManifestStoreOperations extends StoreOperationsThroughFileSystem {

Review comment:
       Should not be in the Abfs / cloud-connector package. The interface StoreOperationsThroughFileSystem/StoreOperations are private.

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
##########
@@ -42,6 +42,10 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 

Review comment:
       Separate PR for the Azure changes? Think making changes to something as core as a FileSystem implementation in such a big PR is a bad idea.

##########
File path: hadoop-project/pom.xml
##########
@@ -383,6 +383,13 @@
         <version>${hadoop.version}</version>
       </dependency>
 
+      <dependency>

Review comment:
       Goes away if the azure dependency change goes away?

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/TaskAttemptScanDirectoryStage.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.LongSummaryStatistics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_DIRECTORY_COUNT_MEAN;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_DIRECTORY_DEPTH_MEAN;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_FILE_COUNT_MEAN;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_FILE_SIZE_MEAN;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_SCAN_DIRECTORY;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry.dirEntry;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createTaskManifest;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.maybeAddIOStatistics;
+
+/**
+ * Stage to scan a directory tree and build a task manifest.
+ * This is executed by the task committer.
+ */
+public final class TaskAttemptScanDirectoryStage
+    extends AbstractJobCommitStage<Void, TaskManifest> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TaskAttemptScanDirectoryStage.class);
+
+  public TaskAttemptScanDirectoryStage(
+      final StageConfig stageConfig) {
+    super(true, stageConfig, OP_STAGE_TASK_SCAN_DIRECTORY, false);
+  }
+
+  /**
+   * Build the Manifest.
+   * @return the manifest
+   * @throws IOException failure.
+   */
+  @Override
+  protected TaskManifest executeStage(final Void arguments)
+      throws IOException {
+
+    final Path taskAttemptDir = getRequiredTaskAttemptDir();
+    final TaskManifest manifest = createTaskManifest(getStageConfig());
+
+    LOG.info("{}: scanning directory {}",
+        getName(), taskAttemptDir);
+
+    final int depth = scanDirectoryTree(manifest, taskAttemptDir,
+        getDestinationDir(),
+        0);
+    List<FileOrDirEntry> filesToCommit = manifest.getFilesToCommit();
+    LongSummaryStatistics fileSummary = filesToCommit.stream()
+        .mapToLong(FileOrDirEntry::getSize)
+        .summaryStatistics();
+    long fileDataSize = fileSummary.getSum();
+    long fileCount = fileSummary.getCount();
+    int dirCount = manifest.getDirectoriesToCreate().size();
+    LOG.info("{}: directory {} contained {} file(s); data size {}",
+        getName(),
+        taskAttemptDir,
+        fileCount,
+        fileDataSize);
+    LOG.info("{}: Directory count = {}; maximum depth {}",
+        getName(),
+        dirCount,
+        depth);
+    // add statistics about the task output which, when aggregated, provides
+    // insight into structure of job, task skew, etc.
+    IOStatisticsStore iostats = getIOStatistics();
+    iostats.addSample(COMMITTER_TASK_DIRECTORY_COUNT_MEAN, dirCount);
+    iostats.addSample(COMMITTER_TASK_DIRECTORY_DEPTH_MEAN, depth);
+    iostats.addSample(COMMITTER_TASK_FILE_COUNT_MEAN, fileCount);
+    iostats.addSample(COMMITTER_TASK_FILE_SIZE_MEAN, fileDataSize);
+
+    return manifest;
+  }
+
+  /**
+   * Recursively scan a directory tree.
+   * The manifest will contain all files to rename
+   * (source and dest) and directories to create.
+   * All files are processed before any of the subdirs are.
+   * This helps in statistics gathering.
+   * There's some optimizations which could be done with async
+   * fetching of the iterators of those subdirs, but as this
+   * is generally off-critical path then that "enhancement"
+   * can be postponed until data suggests this needs improvement.
+   * @param manifest manifest to update
+   * @param srcDir dir to scan
+   * @param destDir destination directory
+   * @param depth depth from the task attempt dir.
+   * @return the maximum depth of child directories
+   * @throws IOException IO failure.
+   */
+  private int scanDirectoryTree(
+      TaskManifest manifest,
+      Path srcDir,
+      Path destDir,
+      int depth) throws IOException {
+
+    // generate some task progress in case directory scanning is very slow.
+    progress();
+
+    int maxDepth = 0;
+    int files = 0;
+    List<FileStatus> subdirs = new ArrayList<>();
+    try (DurationInfo ignored = new DurationInfo(LOG, false,
+        "Task Attempt %s source dir %s, dest dir %s",
+        getTaskAttemptId(), srcDir, destDir)) {
+
+      // list the directory. This may block until the listing is complete,
+      // or, if the FS does incremental or asynchronous fetching, until the
+      // first page of results is ready.
+      final RemoteIterator<FileStatus> listing = listStatusIterator(srcDir);

Review comment:
       Am always confused about which list method should be used ... from a performance standpoint. Guessing this is the best option here? (and is not recursive?)

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/StoreOperations.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * FileSystem operations which are needed to generate the task manifest.
+ * The specific choice of which implementation to use is configurable.
+ *
+ * If a store provides a custom file commit operation then it must
+ * offer a subclass of this which returns true in
+ * {@link #storeSupportsResilientCommit()}
+ * and then implement
+ * {@link #commitFile(FileOrDirEntry)}.
+ *
+ */
+public abstract class StoreOperations implements Closeable {

Review comment:
       Not sure if this can somehow be made package private / module private.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/StoreOperations.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * FileSystem operations which are needed to generate the task manifest.
+ * The specific choice of which implementation to use is configurable.
+ *
+ * If a store provides a custom file commit operation then it must
+ * offer a subclass of this which returns true in
+ * {@link #storeSupportsResilientCommit()}
+ * and then implement
+ * {@link #commitFile(FileOrDirEntry)}.
+ *
+ */
+public abstract class StoreOperations implements Closeable {

Review comment:
       Maybe even ManifestCommitterOperations - given this contains things like TaskManifest loadTaskManifest, etc.

##########
File path: hadoop-tools/hadoop-azure/pom.xml
##########
@@ -319,7 +371,7 @@
                     <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
                     <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
                     <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
-                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>

Review comment:
       Not sure why this change is required?

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/RenameFilesStage.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER_FILE_LIMIT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_RENAME_FILES;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createManifestOutcome;
+import static org.apache.hadoop.thirdparty.com.google.common.collect.Iterables.concat;
+
+/**
+ * This stage renames all the files.
+ * It retuns a manifest success data file summarizing the
+ * output, but does not add iostatistics to it.
+ */
+public class RenameFilesStage extends
+    AbstractJobCommitStage<List<TaskManifest>, ManifestSuccessData> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      RenameFilesStage.class);
+
+  /**
+   * List of all files committed.
+   */
+  private final List<FileOrDirEntry> filesCommitted = new ArrayList<>();
+
+  /**
+   * Total file size.
+   */
+  private long totalFileSize = 0;
+
+  public RenameFilesStage(final StageConfig stageConfig) {
+    super(false, stageConfig, OP_STAGE_JOB_RENAME_FILES, true);
+  }
+
+  /**
+   * Get the list of files committed.
+   * Access is not synchronized.
+   * @return direct access to the list of files.
+   */
+  public synchronized  List<FileOrDirEntry> getFilesCommitted() {
+    return filesCommitted;
+  }
+
+  /**
+   * Get the total file size of the committed task.
+   * @return a number greater than or equal to zero.
+   */
+  public synchronized long getTotalFileSize() {
+    return totalFileSize;
+  }
+
+  /**
+   * Rename files in job commit.
+   * @param taskManifests a list of task manifests containing files.
+   * @return the job report.
+   * @throws IOException failure
+   */
+  @Override
+  protected ManifestSuccessData executeStage(

Review comment:
       May want to ignore this. Turned into writing down though-process. Left if because if helps me refer back to this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org