You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/03/21 11:18:39 UTC

[hadoop] branch trunk updated: HADOOP-16058. S3A tests to include Terasort.

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9f1c017  HADOOP-16058. S3A tests to include Terasort.
9f1c017 is described below

commit 9f1c017f444d5e57899493dc23207c6b5fc26dae
Author: Steve Loughran <st...@apache.org>
AuthorDate: Thu Mar 21 11:15:37 2019 +0000

    HADOOP-16058. S3A tests to include Terasort.
    
    Contributed by Steve Loughran.
    
    This includes
     - HADOOP-15890. Some S3A committer tests don't match ITest* pattern; don't run in maven
     - MAPREDUCE-7090. BigMapOutput example doesn't work with paths off cluster fs
     - MAPREDUCE-7091. Terasort on S3A to switch to new committers
     - MAPREDUCE-7092. MR examples to work better against cloud stores
---
 .../org/apache/hadoop/mapred/BigMapOutput.java     |  18 +-
 .../java/org/apache/hadoop/mapred/MRBench.java     |   2 +-
 .../hadoop/examples/terasort/TeraOutputFormat.java |  11 -
 .../apache/hadoop/examples/terasort/TeraSort.java  |   2 +-
 .../hadoop/examples/terasort/TestTeraSort.java     |   6 +-
 hadoop-tools/hadoop-aws/pom.xml                    |   4 +
 .../hadoop/fs/s3a/commit/AbstractCommitITest.java  |  64 ++++++
 .../fs/s3a/commit/AbstractITCommitMRJob.java       | 156 ++-----------
 .../fs/s3a/commit/AbstractYarnClusterITest.java    | 256 +++++++++++++++++++++
 ...CommitMRJob.java => ITestMagicCommitMRJob.java} |  30 ++-
 ...itMRJob.java => ITestDirectoryCommitMRJob.java} |  30 ++-
 ...itMRJob.java => ITestPartitionCommitMRJob.java} |  31 ++-
 ...mmitMRJob.java => ITestStagingCommitMRJob.java} |  46 +++-
 ...st.java => ITestStagingCommitMRJobBadDest.java} |  29 ++-
 .../commit/terasort/AbstractCommitTerasortIT.java  | 241 +++++++++++++++++++
 .../ITestTerasortDirectoryCommitter.java}          |  37 ++-
 .../ITestTerasortMagicCommitter.java}              |  55 ++---
 17 files changed, 816 insertions(+), 202 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/BigMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/BigMapOutput.java
index 964673b..35992f5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/BigMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/BigMapOutput.java
@@ -128,17 +128,20 @@ public class BigMapOutput extends Configured implements Tool {
         usage();
       }
     }
-    
-    FileSystem fs = FileSystem.get(getConf());
+    if (bigMapInput == null || outputPath == null) {
+      // report usage and exit
+      usage();
+      // this stops IDES warning about unset local variables.
+      return -1;
+    }
+
     JobConf jobConf = new JobConf(getConf(), BigMapOutput.class);
 
     jobConf.setJobName("BigMapOutput");
     jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
     jobConf.setOutputFormat(SequenceFileOutputFormat.class);
     FileInputFormat.setInputPaths(jobConf, bigMapInput);
-    if (fs.exists(outputPath)) {
-      fs.delete(outputPath, true);
-    }
+    outputPath.getFileSystem(jobConf).delete(outputPath, true);
     FileOutputFormat.setOutputPath(jobConf, outputPath);
     jobConf.setMapperClass(IdentityMapper.class);
     jobConf.setReducerClass(IdentityReducer.class);
@@ -146,7 +149,10 @@ public class BigMapOutput extends Configured implements Tool {
     jobConf.setOutputValueClass(BytesWritable.class);
     
     if (createInput) {
-      createBigMapInputFile(jobConf, fs, bigMapInput, fileSizeInMB);
+      createBigMapInputFile(jobConf,
+          bigMapInput.getFileSystem(jobConf),
+          bigMapInput,
+          fileSizeInMB);
     }
     
     Date startTime = new Date();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRBench.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRBench.java
index 5328756..36f4693 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRBench.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRBench.java
@@ -284,7 +284,7 @@ public class MRBench extends Configured implements Tool{
       }
 
     JobConf jobConf = setupJob(numMaps, numReduces, jarFile);
-    FileSystem fs = FileSystem.get(jobConf);
+    FileSystem fs = BASE_DIR.getFileSystem(jobConf);
     Path inputFile = new Path(INPUT_DIR, "input_" + (new Random()).nextInt() + ".txt");
     generateTextFile(fs, inputFile, inputLines, inputSortOrder);
 
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
index e0ce36c..14fea56 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
@@ -30,10 +30,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapred.InvalidJobConfException;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.slf4j.Logger;
@@ -45,7 +43,6 @@ import org.slf4j.LoggerFactory;
 public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
   private static final Logger LOG =
       LoggerFactory.getLogger(TeraOutputFormat.class);
-  private OutputCommitter committer = null;
 
   /**
    * Set the requirement for a final sync before the stream is closed.
@@ -145,12 +142,4 @@ public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
     return new TeraRecordWriter(fileOut, job);
   }
   
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
-      throws IOException {
-    if (committer == null) {
-      Path output = getOutputPath(context);
-      committer = new FileOutputCommitter(output, context);
-    }
-    return committer;
-  }
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java
index 8b698e3..e21653e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java
@@ -321,7 +321,7 @@ public class TeraSort extends Configured implements Tool {
       try {
         TeraInputFormat.writePartitionFile(job, partitionFile);
       } catch (Throwable e) {
-        LOG.error(e.getMessage());
+        LOG.error("{}", e.getMessage(), e);
         return -1;
       }
       job.addCacheFile(partitionUri);  
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java
index b301659..992ac50 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java
@@ -61,7 +61,7 @@ public class TestTeraSort extends HadoopTestCase {
     String[] genArgs = {NUM_ROWS, sortInput.toString()};
 
     // Run TeraGen
-    assertEquals(ToolRunner.run(conf, new TeraGen(), genArgs), 0);
+    assertEquals(0, ToolRunner.run(conf, new TeraGen(), genArgs));
   }
 
   private void runTeraSort(Configuration conf,
@@ -71,7 +71,7 @@ public class TestTeraSort extends HadoopTestCase {
     String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
 
     // Run Sort
-    assertEquals(ToolRunner.run(conf, new TeraSort(), sortArgs), 0);
+    assertEquals(0, ToolRunner.run(conf, new TeraSort(), sortArgs));
   }
 
   private void runTeraValidator(Configuration job,
@@ -80,7 +80,7 @@ public class TestTeraSort extends HadoopTestCase {
     String[] svArgs = {sortOutput.toString(), valOutput.toString()};
 
     // Run Tera-Validator
-    assertEquals(ToolRunner.run(job, new TeraValidate(), svArgs), 0);
+    assertEquals(0, ToolRunner.run(job, new TeraValidate(), svArgs));
   }
 
   @Test
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index f89bc8d..3bfe776 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -186,6 +186,7 @@
                     <exclude>**/ITestS3AHuge*.java</exclude>
                     <!-- this sets out to overlaod DynamoDB, so must be run standalone -->
                     <exclude>**/ITestDynamoDBMetadataStoreScale.java</exclude>
+                    <exclude>**/ITestTerasort*.java</exclude>
                   </excludes>
                 </configuration>
               </execution>
@@ -220,6 +221,9 @@
                     <include>**/ITestS3AEncryptionSSEC*.java</include>
                     <!-- this sets out to overlaod DynamoDB, so must be run standalone -->
                     <include>**/ITestDynamoDBMetadataStoreScale.java</include>
+                    <!-- the terasort tests both work with a file in the same path in -->
+                    <!--  the local FS. Running them sequentially guarantees isolation -->
+                    <include>**/ITestTerasort*.java</include>
                   </includes>
                 </configuration>
               </execution>
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
index 69a6ed6..ef594e6 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.fs.s3a.commit;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -50,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 
 /**
@@ -75,6 +79,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
 
   private InconsistentAmazonS3Client inconsistentClient;
 
+
   /**
    * Should the inconsistent S3A client be used?
    * Default value: true.
@@ -436,4 +441,63 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
         jContext.getConfiguration(),
         TypeConverter.fromYarn(attemptID));
   }
+
+
+  /**
+   * Load in the success data marker: this guarantees that an S3A
+   * committer was used.
+   * @param fs filesystem
+   * @param outputPath path of job
+   * @param committerName name of committer to match
+   * @return the success data
+   * @throws IOException IO failure
+   */
+  public static SuccessData validateSuccessFile(final S3AFileSystem fs,
+      final Path outputPath, final String committerName) throws IOException {
+    SuccessData successData = null;
+    try {
+      successData = loadSuccessFile(fs, outputPath);
+    } catch (FileNotFoundException e) {
+      // either the output path is missing or, if its the success file,
+      // somehow the relevant committer wasn't picked up.
+      String dest = outputPath.toString();
+      LOG.error("No _SUCCESS file found under {}", dest);
+      List<String> files = new ArrayList<>();
+      applyLocatedFiles(fs.listFiles(outputPath, true),
+          (status) -> {
+            files.add(status.getPath().toString());
+            LOG.error("{} {}", status.getPath(), status.getLen());
+          });
+      throw new AssertionError("No _SUCCESS file in " + dest
+          + "; found : " + files.stream().collect(Collectors.joining("\n")),
+          e);
+    }
+    String commitDetails = successData.toString();
+    LOG.info("Committer name " + committerName + "\n{}",
+        commitDetails);
+    LOG.info("Committer statistics: \n{}",
+        successData.dumpMetrics("  ", " = ", "\n"));
+    LOG.info("Diagnostics\n{}",
+        successData.dumpDiagnostics("  ", " = ", "\n"));
+    assertEquals("Wrong committer in " + commitDetails,
+        committerName, successData.getCommitter());
+    return successData;
+  }
+
+  /**
+   * Load a success file; fail if the file is empty/nonexistent.
+   * @param fs filesystem
+   * @param outputPath directory containing the success file.
+   * @return the loaded file.
+   * @throws IOException failure to find/load the file
+   * @throws AssertionError file is 0-bytes long
+   */
+  public static SuccessData loadSuccessFile(final S3AFileSystem fs,
+      final Path outputPath) throws IOException {
+    Path success = new Path(outputPath, _SUCCESS);
+    FileStatus status = fs.getFileStatus(success);
+    assertTrue("0 byte success file - not a s3guard committer " + success,
+        status.getLen() > 0);
+    return SuccessData.load(fs, success);
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
index 03c834f..682931d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
@@ -30,22 +30,17 @@ import java.util.Set;
 import java.util.UUID;
 
 import com.google.common.collect.Sets;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AUtils;
 import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
@@ -54,98 +49,36 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.util.DurationInfo;
 
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService;
-import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
-import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID;
+
+/**
+ * Test for an MR Job with all the different committers.
+ */
+public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest {
 
-/** Full integration test of an MR job. */
-public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractITCommitMRJob.class);
 
-  private static final int TEST_FILE_COUNT = 2;
-  private static final int SCALE_TEST_FILE_COUNT = 20;
-
-  private static MiniDFSClusterService hdfs;
-  private static MiniMRYarnCluster yarn = null;
-  private static JobConf conf = null;
-  private boolean uniqueFilenames = false;
-  private boolean scaleTest;
-
-  protected static FileSystem getDFS() {
-    return hdfs.getClusterFS();
-  }
-
-  @BeforeClass
-  public static void setupClusters() throws IOException {
-    // the HDFS and YARN clusters share the same configuration, so
-    // the HDFS cluster binding is implicitly propagated to YARN
-    conf = new JobConf();
-    conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
-    conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE);
-
-    hdfs = deployService(conf, new MiniDFSClusterService());
-    yarn = deployService(conf,
-        new MiniMRYarnCluster("ITCommitMRJob", 2));
-  }
-
-  @SuppressWarnings("ThrowableNotThrown")
-  @AfterClass
-  public static void teardownClusters() throws IOException {
-    conf = null;
-    yarn = terminateService(yarn);
-    hdfs = terminateService(hdfs);
-  }
-
-  public static MiniDFSCluster getHdfs() {
-    return hdfs.getCluster();
-  }
-
-  public static FileSystem getLocalFS() {
-    return hdfs.getLocalFS();
-  }
-
   @Rule
   public final TemporaryFolder temp = new TemporaryFolder();
 
-  /**
-   * The name of the committer as returned by
-   * {@link AbstractS3ACommitter#getName()} and used for committer construction.
-   */
-  protected abstract String committerName();
-
-  @Override
-  public void setup() throws Exception {
-    super.setup();
-    scaleTest = getTestPropertyBool(
-        getConfiguration(),
-        KEY_SCALE_TESTS_ENABLED,
-        DEFAULT_SCALE_TESTS_ENABLED);
-  }
-
-  @Override
-  protected int getTestTimeoutMillis() {
-    return SCALE_TEST_TIMEOUT_SECONDS * 1000;
-  }
-
   @Test
   public void testMRJob() throws Exception {
+    describe("Run a simple MR Job");
+
     S3AFileSystem fs = getFileSystem();
     // final dest is in S3A
-    Path outputPath = path("testMRJob");
+    Path outputPath = path(getMethodName());
 
     String commitUUID = UUID.randomUUID().toString();
-    String suffix = uniqueFilenames ? ("-" + commitUUID) : "";
+    String suffix = isUniqueFilenames() ? ("-" + commitUUID) : "";
     int numFiles = getTestFileCount();
     List<String> expectedFiles = new ArrayList<>(numFiles);
     Set<String> expectedKeys = Sets.newHashSet();
     for (int i = 0; i < numFiles; i += 1) {
-      File file = temp.newFile(String.valueOf(i) + ".text");
+      File file = temp.newFile(i + ".text");
       try (FileOutputStream out = new FileOutputStream(file)) {
         out.write(("file " + i).getBytes(StandardCharsets.UTF_8));
       }
@@ -156,17 +89,8 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
     }
     Collections.sort(expectedFiles);
 
-    Job mrJob = Job.getInstance(yarn.getConfig(), "test-committer-job");
+    Job mrJob = createJob();
     JobConf jobConf = (JobConf) mrJob.getConfiguration();
-    jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
-        uniqueFilenames);
-
-
-    bindCommitter(jobConf,
-        CommitConstants.S3A_COMMITTER_FACTORY,
-        committerName());
-    // pass down the scale test flag
-    jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest);
 
     mrJob.setOutputFormatClass(LoggingTextOutputFormat.class);
     FileOutputFormat.setOutputPath(mrJob, outputPath);
@@ -200,7 +124,7 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
     mrJob.setMaxMapAttempts(1);
 
     mrJob.submit();
-    try (DurationInfo d = new DurationInfo(LOG, "Job Execution")) {
+    try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) {
       boolean succeeded = mrJob.waitForCompletion(true);
       assertTrue("MR job failed", succeeded);
     }
@@ -219,24 +143,11 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
     }
     Collections.sort(actualFiles);
 
-    // load in the success data marker: this guarantees that a s3guard
-    // committer was used
-    Path success = new Path(outputPath, _SUCCESS);
-    FileStatus status = fs.getFileStatus(success);
-    assertTrue("0 byte success file - not a s3guard committer " + success,
-        status.getLen() > 0);
-    SuccessData successData = SuccessData.load(fs, success);
-    String commitDetails = successData.toString();
-    LOG.info("Committer name " + committerName() + "\n{}",
-        commitDetails);
-    LOG.info("Committer statistics: \n{}",
-        successData.dumpMetrics("  ", " = ", "\n"));
-    LOG.info("Diagnostics\n{}",
-        successData.dumpDiagnostics("  ", " = ", "\n"));
-    assertEquals("Wrong committer in " + commitDetails,
-        committerName(), successData.getCommitter());
+    SuccessData successData = validateSuccessFile(fs, outputPath,
+        committerName());
     List<String> successFiles = successData.getFilenames();
-    assertTrue("No filenames in " + commitDetails,
+    String commitData = successData.toString();
+    assertTrue("No filenames in " + commitData,
         !successFiles.isEmpty());
 
     assertEquals("Should commit the expected files",
@@ -245,42 +156,13 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
     Set<String> summaryKeys = Sets.newHashSet();
     summaryKeys.addAll(successFiles);
     assertEquals("Summary keyset doesn't list the the expected paths "
-        + commitDetails, expectedKeys, summaryKeys);
+        + commitData, expectedKeys, summaryKeys);
     assertPathDoesNotExist("temporary dir",
         new Path(outputPath, CommitConstants.TEMPORARY));
     customPostExecutionValidation(outputPath, successData);
   }
 
   /**
-   * Get the file count for the test.
-   * @return the number of mappers to create.
-   */
-  public int getTestFileCount() {
-    return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT;
-  }
-
-  /**
-   * Override point to let implementations tune the MR Job conf.
-   * @param jobConf configuration
-   */
-  protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
-
-  }
-
-  /**
-   * Override point for any committer specific validation operations;
-   * called after the base assertions have all passed.
-   * @param destPath destination of work
-   * @param successData loaded success data
-   * @throws Exception failure
-   */
-  protected void customPostExecutionValidation(Path destPath,
-      SuccessData successData)
-      throws Exception {
-
-  }
-
-  /**
    *  Test Mapper.
    *  This is executed in separate process, and must not make any assumptions
    *  about external state.
@@ -301,7 +183,7 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
       org.apache.log4j.BasicConfigurator.configure();
       boolean scaleMap = context.getConfiguration()
           .getBoolean(KEY_SCALE_TESTS_ENABLED, false);
-      operations = scaleMap ? 1000 : 10;
+      operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS;
       id = context.getTaskAttemptID().toString();
     }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java
new file mode 100644
index 0000000..45f0738
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES;
+
+/**
+ * Full integration test MR jobs.
+ *
+ * This is all done on shared static mini YARN and HDFS clusters, set up before
+ * any of the tests methods run.
+ *
+ * To isolate tests properly for parallel test runs, that static state
+ * needs to be stored in the final classes implementing the tests, and
+ * exposed to the base class, with the setup clusters in the
+ * specific test suites creating the clusters with unique names.
+ *
+ * This is "hard" to do in Java, unlike, say, Scala.
+ *
+ * Note: this turns out not to be the root cause of ordering problems
+ * with the Terasort tests (that is hard coded use of a file in the local FS),
+ * but this design here does make it clear that the before and after class
+ * operations are explicitly called in the subclasses.
+ * If two subclasses of this class are instantiated in the same JVM, in order,
+ * they are guaranteed to be isolated.
+ *
+ * History: this is a superclass extracted from
+ * {@link AbstractITCommitMRJob} while adding support for testing terasorting.
+ *
+ */
+public abstract class AbstractYarnClusterITest extends AbstractCommitITest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractYarnClusterITest.class);
+
+  private static final int TEST_FILE_COUNT = 2;
+  private static final int SCALE_TEST_FILE_COUNT = 20;
+
+  public static final int SCALE_TEST_KEYS = 1000;
+  public static final int BASE_TEST_KEYS = 10;
+
+  private boolean scaleTest;
+
+  private boolean uniqueFilenames = false;
+
+  /**
+   * This is the cluster binding which every subclass must create.
+   */
+  protected static final class ClusterBinding {
+
+    private final MiniDFSClusterService hdfs;
+
+    private final MiniMRYarnCluster yarn;
+
+    public ClusterBinding(
+        final MiniDFSClusterService hdfs,
+        final MiniMRYarnCluster yarn) {
+      this.hdfs = checkNotNull(hdfs);
+      this.yarn = checkNotNull(yarn);
+    }
+
+    public MiniDFSClusterService getHdfs() {
+      return hdfs;
+    }
+
+    public MiniMRYarnCluster getYarn() {
+      return yarn;
+    }
+
+    public Configuration getConf() {
+      return getYarn().getConfig();
+    }
+
+    public void terminate() {
+      terminateService(getYarn());
+      terminateService(getHdfs());
+    }
+  }
+
+  /**
+   * Create the cluster binding. This must be done in
+   * class setup of the (final) subclass.
+   * The HDFS and YARN clusters share the same configuration, so
+   * the HDFS cluster binding is implicitly propagated to YARN.
+   * @param conf configuration to start with.
+   * @return the cluster binding.
+   * @throws IOException failure.
+   */
+  protected static ClusterBinding createCluster(JobConf conf)
+      throws IOException {
+
+    conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
+    conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE);
+
+    // create a unique cluster name.
+    String clusterName = "yarn-" + UUID.randomUUID();
+    MiniDFSClusterService miniDFSClusterService = deployService(conf,
+        new MiniDFSClusterService());
+    MiniMRYarnCluster yarnCluster = deployService(conf,
+        new MiniMRYarnCluster(clusterName, 2));
+    return new ClusterBinding(miniDFSClusterService, yarnCluster);
+  }
+
+  /**
+   * Get the cluster binding for this subclass
+   * @return
+   */
+  protected abstract ClusterBinding getClusterBinding();
+
+  protected MiniDFSClusterService getHdfs() {
+    return getClusterBinding().getHdfs();
+  }
+
+
+  protected MiniMRYarnCluster getYarn() {
+    return getClusterBinding().getYarn();
+  }
+
+  public FileSystem getLocalFS() {
+    return getHdfs().getLocalFS();
+  }
+
+  protected FileSystem getDFS() {
+    return getHdfs().getClusterFS();
+  }
+
+  /**
+   * The name of the committer as returned by
+   * {@link AbstractS3ACommitter#getName()} and used for committer construction.
+   */
+  protected abstract String committerName();
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    assertNotNull("cluster is not bound",
+        getClusterBinding());
+
+    scaleTest = getTestPropertyBool(
+        getConfiguration(),
+        KEY_SCALE_TESTS_ENABLED,
+        DEFAULT_SCALE_TESTS_ENABLED);
+  }
+
+  @Override
+  protected int getTestTimeoutMillis() {
+    return SCALE_TEST_TIMEOUT_SECONDS * 1000;
+  }
+
+  protected JobConf newJobConf() {
+    return new JobConf(getYarn().getConfig());
+  }
+
+
+  protected Job createJob() throws IOException {
+    Job mrJob = Job.getInstance(getClusterBinding().getConf(),
+        getMethodName());
+    patchConfigurationForCommitter(mrJob.getConfiguration());
+    return mrJob;
+  }
+
+  protected Configuration patchConfigurationForCommitter(
+      final Configuration jobConf) {
+    jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
+        uniqueFilenames);
+    bindCommitter(jobConf,
+        CommitConstants.S3A_COMMITTER_FACTORY,
+        committerName());
+    // pass down the scale test flag
+    jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest);
+    return jobConf;
+  }
+
+  /**
+   * Get the file count for the test.
+   * @return the number of mappers to create.
+   */
+  public int getTestFileCount() {
+    return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT;
+  }
+
+  /**
+   * Override point to let implementations tune the MR Job conf.
+   * @param jobConf configuration
+   */
+  protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
+
+  }
+
+  /**
+   * Override point for any committer specific validation operations;
+   * called after the base assertions have all passed.
+   * @param destPath destination of work
+   * @param successData loaded success data
+   * @throws Exception failure
+   */
+  protected void customPostExecutionValidation(Path destPath,
+      SuccessData successData)
+      throws Exception {
+
+  }
+
+  /**
+   * Assume that scale tests are enabled.
+   */
+  protected void requireScaleTestsEnabled() {
+    assume("Scale test disabled: to enable set property " +
+            KEY_SCALE_TESTS_ENABLED,
+        isScaleTest());
+  }
+
+  public boolean isScaleTest() {
+    return scaleTest;
+  }
+
+  public boolean isUniqueFilenames() {
+    return uniqueFilenames;
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java
similarity index 74%
copy from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
copy to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java
index b7be17a..a9b9c2c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.fs.s3a.commit.magic;
 
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
 import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
@@ -33,7 +38,30 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
  * the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are
  * passed down to these processes.
  */
-public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
+public final class ITestMagicCommitMRJob extends AbstractITCommitMRJob {
+
+  /**
+   * The static cluster binding with the lifecycle of this test; served
+   * through instance-level methods for sharing across methods in the
+   * suite.
+   */
+  @SuppressWarnings("StaticNonFinalField")
+  private static ClusterBinding clusterBinding;
+
+  @BeforeClass
+  public static void setupClusters() throws IOException {
+    clusterBinding = createCluster(new JobConf());
+  }
+
+  @AfterClass
+  public static void teardownClusters() throws IOException {
+    clusterBinding.terminate();
+  }
+
+  @Override
+  public ClusterBinding getClusterBinding() {
+    return clusterBinding;
+  }
 
   /**
    * Need consistency here.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java
similarity index 59%
copy from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java
copy to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java
index c10ebed..8d44ddb 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java
@@ -18,13 +18,41 @@
 
 package org.apache.hadoop.fs.s3a.commit.staging.integration;
 
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
 import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
 import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
  * Full integration test for the directory committer.
  */
-public class ITDirectoryCommitMRJob extends AbstractITCommitMRJob {
+public final class ITestDirectoryCommitMRJob extends AbstractITCommitMRJob {
+
+  /**
+   * The static cluster binding with the lifecycle of this test; served
+   * through instance-level methods for sharing across methods in the
+   * suite.
+   */
+  @SuppressWarnings("StaticNonFinalField")
+  private static ClusterBinding clusterBinding;
+
+  @BeforeClass
+  public static void setupClusters() throws IOException {
+    clusterBinding = createCluster(new JobConf());  }
+
+  @AfterClass
+  public static void teardownClusters() throws IOException {
+    clusterBinding.terminate();
+  }
+
+  @Override
+  public ClusterBinding getClusterBinding() {
+    return clusterBinding;
+  }
 
   @Override
   protected String committerName() {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java
similarity index 59%
rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java
rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java
index 1c19a95..f71479c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java
@@ -18,13 +18,42 @@
 
 package org.apache.hadoop.fs.s3a.commit.staging.integration;
 
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
 import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
 import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
  * Full integration test for the partition committer.
  */
-public class ITPartitionCommitMRJob extends AbstractITCommitMRJob {
+public final class ITestPartitionCommitMRJob extends AbstractITCommitMRJob {
+
+  /**
+   * The static cluster binding with the lifecycle of this test; served
+   * through instance-level methods for sharing across methods in the
+   * suite.
+   */
+  @SuppressWarnings("StaticNonFinalField")
+  private static ClusterBinding clusterBinding;
+
+  @BeforeClass
+  public static void setupClusters() throws IOException {
+    clusterBinding = createCluster(new JobConf());
+  }
+
+  @AfterClass
+  public static void teardownClusters() throws IOException {
+    clusterBinding.terminate();
+  }
+
+  @Override
+  public ClusterBinding getClusterBinding() {
+    return clusterBinding;
+  }
 
   @Override
   protected String committerName() {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java
similarity index 63%
rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java
rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java
index 76ad464..d4a351f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java
@@ -18,25 +18,53 @@
 
 package org.apache.hadoop.fs.s3a.commit.staging.integration;
 
-import org.junit.Test;
+import java.io.IOException;
 
 import org.hamcrest.core.StringContains;
 import org.hamcrest.core.StringEndsWith;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
-import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
-import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
 import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS;
 
 /**
  * Full integration test for the staging committer.
  */
-public class ITStagingCommitMRJob extends AbstractITCommitMRJob {
+public final class ITestStagingCommitMRJob extends AbstractITCommitMRJob {
+
+  /**
+   * The static cluster binding with the lifecycle of this test; served
+   * through instance-level methods for sharing across methods in the
+   * suite.
+   */
+  @SuppressWarnings("StaticNonFinalField")
+  private static ClusterBinding clusterBinding;
+
+  @BeforeClass
+  public static void setupClusters() throws IOException {
+    clusterBinding = createCluster(new JobConf());
+  }
+
+  @AfterClass
+  public static void teardownClusters() throws IOException {
+    clusterBinding.terminate();
+  }
+
+  @Override
+  public ClusterBinding getClusterBinding() {
+    return clusterBinding;
+  }
 
   @Override
   protected String committerName() {
@@ -51,12 +79,12 @@ public class ITStagingCommitMRJob extends AbstractITCommitMRJob {
   public void testStagingDirectory() throws Throwable {
     FileSystem hdfs = getDFS();
     Configuration conf = hdfs.getConf();
-    conf.set(CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH,
-        "private");
+    conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "private");
     Path dir = getMultipartUploadCommitsDirectory(conf, "UUID");
-    assertThat(dir.toString(), StringEndsWith.endsWith(
-        "UUID/"
-        + StagingCommitterConstants.STAGING_UPLOADS));
+    assertThat("Directory " + dir + " path is wrong",
+        dir.toString(),
+        StringEndsWith.endsWith("UUID/"
+        + STAGING_UPLOADS));
     assertTrue("path unqualified", dir.isAbsolute());
     String self = UserGroupInformation.getCurrentUser().getShortUserName();
     assertThat(dir.toString(),
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java
similarity index 74%
rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java
rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java
index be477a7..68926f9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.fs.s3a.commit.staging.integration;
 
 import java.io.IOException;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
@@ -33,7 +36,30 @@ import org.apache.hadoop.test.LambdaTestUtils;
  * This is a test to verify that the committer will fail if the destination
  * directory exists, and that this happens in job setup.
  */
-public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
+public final class ITestStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
+
+  /**
+   * The static cluster binding with the lifecycle of this test; served
+   * through instance-level methods for sharing across methods in the
+   * suite.
+   */
+  @SuppressWarnings("StaticNonFinalField")
+  private static ClusterBinding clusterBinding;
+
+  @BeforeClass
+  public static void setupClusters() throws IOException {
+    clusterBinding = createCluster(new JobConf());
+  }
+
+  @AfterClass
+  public static void teardownClusters() throws IOException {
+    clusterBinding.terminate();
+  }
+
+  @Override
+  public ClusterBinding getClusterBinding() {
+    return clusterBinding;
+  }
 
   @Override
   protected String committerName() {
@@ -59,4 +85,5 @@ public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
         "Output directory",
         super::testMRJob);
   }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java
new file mode 100644
index 0000000..491ecb9
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.terasort;
+
+import java.util.Optional;
+import java.util.function.BiConsumer;
+
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.terasort.TeraGen;
+import org.apache.hadoop.examples.terasort.TeraSort;
+import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
+import org.apache.hadoop.examples.terasort.TeraValidate;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import static java.util.Optional.empty;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
+
+/**
+ * Runs Terasort against S3A.
+ *
+ * This is all done on a shared mini YARN and HDFS clusters, set up before
+ * any of the tests methods run.
+ *
+ * The tests run in sequence, so each operation is isolated.
+ * This also means that the test paths deleted in test
+ * teardown; shared variables must all be static.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@SuppressWarnings("StaticNonFinalField")
+public abstract class AbstractCommitTerasortIT extends
+    AbstractYarnClusterITest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractCommitTerasortIT.class);
+
+  // all the durations are optional as they only get filled in when
+  // a test run successfully completes. Failed tests don't have numbers.
+  private static Optional<DurationInfo> terasortDuration = empty();
+
+  private static Optional<DurationInfo> teragenStageDuration = empty();
+
+  private static Optional<DurationInfo> terasortStageDuration = empty();
+
+  private static Optional<DurationInfo> teravalidateStageDuration = empty();
+
+  private Path terasortPath;
+
+  private Path sortInput;
+
+  private Path sortOutput;
+
+  private Path sortValidate;
+
+  /**
+   * Not using special paths here.
+   * @return false
+   */
+  @Override
+  public boolean useInconsistentClient() {
+    return false;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    requireScaleTestsEnabled();
+    prepareToTerasort();
+  }
+
+  /**
+   * Set up for terasorting by initializing paths.
+   * The paths used must be unique across parallel runs.
+   */
+  private void prepareToTerasort() {
+    // small sample size for faster runs
+    Configuration yarnConfig = getYarn().getConfig();
+    yarnConfig.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
+    yarnConfig.setBoolean(
+        TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
+        true);
+    terasortPath = new Path("/terasort-" + getClass().getSimpleName())
+        .makeQualified(getFileSystem());
+    sortInput = new Path(terasortPath, "sortin");
+    sortOutput = new Path(terasortPath, "sortout");
+    sortValidate = new Path(terasortPath, "validate");
+    if (!terasortDuration.isPresent()) {
+      terasortDuration = Optional.of(new DurationInfo(LOG, "Terasort"));
+    }
+  }
+
+  /**
+   * Execute a single stage in the terasort,
+   * @param stage Stage name for messages/assertions.
+   * @param jobConf job conf
+   * @param dest destination directory -the _SUCCESS File will be expected here.
+   * @param tool tool to run.
+   * @param args args for the tool.
+   * @throws Exception any failure
+   */
+  private Optional<DurationInfo> executeStage(
+      final String stage,
+      final JobConf jobConf,
+      final Path dest,
+      final Tool tool,
+      final String[] args) throws Exception {
+    int result;
+    DurationInfo d = new DurationInfo(LOG, stage);
+    try {
+      result = ToolRunner.run(jobConf, tool, args);
+    } finally {
+      d.close();
+    }
+    assertEquals(stage
+        + "(" + StringUtils.join(", ", args) + ")"
+        + " failed", 0, result);
+    validateSuccessFile(getFileSystem(), dest, committerName());
+    return Optional.of(d);
+  }
+
+  /**
+   * Set up terasort by cleaning out the destination, and note the initial
+   * time before any of the jobs are executed.
+   */
+  @Test
+  public void test_100_terasort_setup() throws Throwable {
+    describe("Setting up for a terasort");
+
+    getFileSystem().delete(terasortPath, true);
+  }
+
+  @Test
+  public void test_110_teragen() throws Throwable {
+    describe("Teragen to %s", sortInput);
+
+    JobConf jobConf = newJobConf();
+    patchConfigurationForCommitter(jobConf);
+    teragenStageDuration = executeStage("Teragen",
+        jobConf,
+        sortInput,
+        new TeraGen(),
+        new String[]{Integer.toString(SCALE_TEST_KEYS), sortInput.toString()});
+  }
+
+  @Test
+  public void test_120_terasort() throws Throwable {
+    describe("Terasort from %s to %s", sortInput, sortOutput);
+    loadSuccessFile(getFileSystem(), sortInput);
+    JobConf jobConf = newJobConf();
+    patchConfigurationForCommitter(jobConf);
+    // this job adds some data, so skip it.
+    jobConf.set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
+    terasortStageDuration = executeStage("TeraSort",
+        jobConf,
+        sortOutput,
+        new TeraSort(),
+        new String[]{sortInput.toString(), sortOutput.toString()});
+  }
+
+  @Test
+  public void test_130_teravalidate() throws Throwable {
+    describe("TeraValidate from %s to %s", sortOutput, sortValidate);
+    loadSuccessFile(getFileSystem(), sortOutput);
+    JobConf jobConf = newJobConf();
+    patchConfigurationForCommitter(jobConf);
+    teravalidateStageDuration = executeStage("TeraValidate",
+        jobConf,
+        sortValidate,
+        new TeraValidate(),
+        new String[]{sortOutput.toString(), sortValidate.toString()});
+  }
+
+  /**
+   * Print the results, and save to the base dir as a CSV file.
+   * Why there? Makes it easy to list and compare.
+   */
+  @Test
+  public void test_140_teracomplete() throws Throwable {
+    terasortDuration.get().close();
+
+    final StringBuilder results = new StringBuilder();
+    results.append("\"Operation\"\t\"Duration\"\n");
+
+    // this is how you dynamically create a function in a method
+    // for use afterwards.
+    // Works because there's no IOEs being raised in this sequence.
+    BiConsumer<String, Optional<DurationInfo>> stage =
+        (s, od) ->
+            results.append(String.format("\"%s\"\t\"%s\"\n",
+                s,
+                od.map(DurationInfo::getDurationString).orElse("")));
+
+    stage.accept("Generate", teragenStageDuration);
+    stage.accept("Terasort", terasortStageDuration);
+    stage.accept("Validate", teravalidateStageDuration);
+    stage.accept("Completed", terasortDuration);
+    String text = results.toString();
+    Path path = new Path(terasortPath, "results.csv");
+    LOG.info("Results are in {}\n{}", path, text);
+    ContractTestUtils.writeTextFile(getFileSystem(), path, text, true);
+  }
+
+  /**
+   * Reset the duration so if two committer tests are run sequentially.
+   * Without this the total execution time is reported as from the start of
+   * the first test suite to the end of the second.
+   */
+  @Test
+  public void test_150_teracleanup() throws Throwable {
+    terasortDuration = Optional.empty();
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java
similarity index 52%
rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java
rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java
index c10ebed..cb9cdd0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java
@@ -16,18 +16,47 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.fs.s3a.commit.staging.integration;
+package org.apache.hadoop.fs.s3a.commit.terasort;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 
-import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
 import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
- * Full integration test for the directory committer.
+ * Terasort with the directory committer.
  */
-public class ITDirectoryCommitMRJob extends AbstractITCommitMRJob {
+public final class ITestTerasortDirectoryCommitter extends AbstractCommitTerasortIT {
+
+  /**
+   * The static cluster binding with the lifecycle of this test; served
+   * through instance-level methods for sharing across methods in the
+   * suite.
+   */
+  @SuppressWarnings("StaticNonFinalField")
+  private static ClusterBinding clusterBinding;
+
+  @BeforeClass
+  public static void setupClusters() throws IOException {
+    clusterBinding = createCluster(new JobConf());
+  }
+
+  @AfterClass
+  public static void teardownClusters() throws IOException {
+    clusterBinding.terminate();
+  }
+
+  @Override
+  public ClusterBinding getClusterBinding() {
+    return clusterBinding;
+  }
 
   @Override
   protected String committerName() {
     return DirectoryStagingCommitter.NAME;
   }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java
similarity index 54%
rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java
index b7be17a..e1b4eac 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java
@@ -16,35 +16,47 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.fs.s3a.commit.magic;
+package org.apache.hadoop.fs.s3a.commit.terasort;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
-import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
 import org.apache.hadoop.mapred.JobConf;
 
-import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
 
 /**
- * Full integration test for the Magic Committer.
- *
- * There's no need to disable the committer setting for the filesystem here,
- * because the committers are being instantiated in their own processes;
- * the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are
- * passed down to these processes.
+ * Terasort with the magic committer.
  */
-public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
+public final class ITestTerasortMagicCommitter
+    extends AbstractCommitTerasortIT {
 
   /**
-   * Need consistency here.
-   * @return false
+   * The static cluster binding with the lifecycle of this test; served
+   * through instance-level methods for sharing across methods in the
+   * suite.
    */
-  @Override
-  public boolean useInconsistentClient() {
-    return false;
+  @SuppressWarnings("StaticNonFinalField")
+  private static ClusterBinding clusterBinding;
+
+  @BeforeClass
+  public static void setupClusters() throws IOException {
+    clusterBinding = createCluster(new JobConf());
+  }
+
+  @AfterClass
+  public static void teardownClusters() throws IOException {
+    clusterBinding.terminate();
   }
 
   @Override
+  public ClusterBinding getClusterBinding() {
+    return clusterBinding;
+  }
+  @Override
   protected String committerName() {
     return MagicS3GuardCommitter.NAME;
   }
@@ -58,13 +70,4 @@ public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
     conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
   }
 
-  /**
-   * Check that the magic dir was cleaned up.
-   * {@inheritDoc}
-   */
-  @Override
-  protected void customPostExecutionValidation(Path destPath,
-      SuccessData successData) throws Exception {
-    assertPathDoesNotExist("No cleanup", new Path(destPath, MAGIC));
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org