You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/03/29 15:27:01 UTC
[hadoop] branch branch-3.2 updated: HADOOP-16058. S3A tests to
include Terasort.
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 60c9042 HADOOP-16058. S3A tests to include Terasort.
60c9042 is described below
commit 60c9042286609131deb1a06ba80bc3473d880414
Author: Steve Loughran <st...@apache.org>
AuthorDate: Fri Mar 29 15:25:45 2019 +0000
HADOOP-16058. S3A tests to include Terasort.
Contributed by Steve Loughran.
This includes
- HADOOP-15890. Some S3A committer tests don't match ITest* pattern; don't run in maven
- MAPREDUCE-7090. BigMapOutput example doesn't work with paths off cluster fs
- MAPREDUCE-7091. Terasort on S3A to switch to new committers
- MAPREDUCE-7092. MR examples to work better against cloud stores
---
.../org/apache/hadoop/mapred/BigMapOutput.java | 18 +-
.../java/org/apache/hadoop/mapred/MRBench.java | 2 +-
.../hadoop/examples/terasort/TeraOutputFormat.java | 11 -
.../apache/hadoop/examples/terasort/TeraSort.java | 2 +-
.../hadoop/examples/terasort/TestTeraSort.java | 6 +-
hadoop-tools/hadoop-aws/pom.xml | 4 +
.../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 62 +++++
.../hadoop/fs/s3a/commit/AbstractCommitITest.java | 64 ++++++
.../fs/s3a/commit/AbstractITCommitMRJob.java | 161 ++-----------
.../fs/s3a/commit/AbstractYarnClusterITest.java | 256 +++++++++++++++++++++
...CommitMRJob.java => ITestMagicCommitMRJob.java} | 30 ++-
...itMRJob.java => ITestDirectoryCommitMRJob.java} | 30 ++-
...itMRJob.java => ITestPartitionCommitMRJob.java} | 31 ++-
...mmitMRJob.java => ITestStagingCommitMRJob.java} | 46 +++-
...st.java => ITestStagingCommitMRJobBadDest.java} | 29 ++-
.../commit/terasort/AbstractCommitTerasortIT.java | 241 +++++++++++++++++++
.../ITestTerasortDirectoryCommitter.java} | 37 ++-
.../ITestTerasortMagicCommitter.java} | 55 ++---
18 files changed, 878 insertions(+), 207 deletions(-)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/BigMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/BigMapOutput.java
index 964673b..35992f5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/BigMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/BigMapOutput.java
@@ -128,17 +128,20 @@ public class BigMapOutput extends Configured implements Tool {
usage();
}
}
-
- FileSystem fs = FileSystem.get(getConf());
+ if (bigMapInput == null || outputPath == null) {
+ // report usage and exit
+ usage();
+ // this stops IDES warning about unset local variables.
+ return -1;
+ }
+
JobConf jobConf = new JobConf(getConf(), BigMapOutput.class);
jobConf.setJobName("BigMapOutput");
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, bigMapInput);
- if (fs.exists(outputPath)) {
- fs.delete(outputPath, true);
- }
+ outputPath.getFileSystem(jobConf).delete(outputPath, true);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
@@ -146,7 +149,10 @@ public class BigMapOutput extends Configured implements Tool {
jobConf.setOutputValueClass(BytesWritable.class);
if (createInput) {
- createBigMapInputFile(jobConf, fs, bigMapInput, fileSizeInMB);
+ createBigMapInputFile(jobConf,
+ bigMapInput.getFileSystem(jobConf),
+ bigMapInput,
+ fileSizeInMB);
}
Date startTime = new Date();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRBench.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRBench.java
index 5328756..36f4693 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRBench.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRBench.java
@@ -284,7 +284,7 @@ public class MRBench extends Configured implements Tool{
}
JobConf jobConf = setupJob(numMaps, numReduces, jarFile);
- FileSystem fs = FileSystem.get(jobConf);
+ FileSystem fs = BASE_DIR.getFileSystem(jobConf);
Path inputFile = new Path(INPUT_DIR, "input_" + (new Random()).nextInt() + ".txt");
generateTextFile(fs, inputFile, inputLines, inputSortOrder);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
index e0ce36c..14fea56 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
@@ -30,10 +30,8 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.slf4j.Logger;
@@ -45,7 +43,6 @@ import org.slf4j.LoggerFactory;
public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
private static final Logger LOG =
LoggerFactory.getLogger(TeraOutputFormat.class);
- private OutputCommitter committer = null;
/**
* Set the requirement for a final sync before the stream is closed.
@@ -145,12 +142,4 @@ public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
return new TeraRecordWriter(fileOut, job);
}
- public OutputCommitter getOutputCommitter(TaskAttemptContext context)
- throws IOException {
- if (committer == null) {
- Path output = getOutputPath(context);
- committer = new FileOutputCommitter(output, context);
- }
- return committer;
- }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java
index 8b698e3..e21653e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java
@@ -321,7 +321,7 @@ public class TeraSort extends Configured implements Tool {
try {
TeraInputFormat.writePartitionFile(job, partitionFile);
} catch (Throwable e) {
- LOG.error(e.getMessage());
+ LOG.error("{}", e.getMessage(), e);
return -1;
}
job.addCacheFile(partitionUri);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java
index b301659..992ac50 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/test/java/org/apache/hadoop/examples/terasort/TestTeraSort.java
@@ -61,7 +61,7 @@ public class TestTeraSort extends HadoopTestCase {
String[] genArgs = {NUM_ROWS, sortInput.toString()};
// Run TeraGen
- assertEquals(ToolRunner.run(conf, new TeraGen(), genArgs), 0);
+ assertEquals(0, ToolRunner.run(conf, new TeraGen(), genArgs));
}
private void runTeraSort(Configuration conf,
@@ -71,7 +71,7 @@ public class TestTeraSort extends HadoopTestCase {
String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
// Run Sort
- assertEquals(ToolRunner.run(conf, new TeraSort(), sortArgs), 0);
+ assertEquals(0, ToolRunner.run(conf, new TeraSort(), sortArgs));
}
private void runTeraValidator(Configuration job,
@@ -80,7 +80,7 @@ public class TestTeraSort extends HadoopTestCase {
String[] svArgs = {sortOutput.toString(), valOutput.toString()};
// Run Tera-Validator
- assertEquals(ToolRunner.run(job, new TeraValidate(), svArgs), 0);
+ assertEquals(0, ToolRunner.run(job, new TeraValidate(), svArgs));
}
@Test
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 08c53e7..48a284a 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -186,6 +186,7 @@
<exclude>**/ITestS3AHuge*.java</exclude>
<!-- this sets out to overlaod DynamoDB, so must be run standalone -->
<exclude>**/ITestDynamoDBMetadataStoreScale.java</exclude>
+ <exclude>**/ITestTerasort*.java</exclude>
</excludes>
</configuration>
</execution>
@@ -220,6 +221,9 @@
<include>**/ITestS3AEncryptionSSEC*.java</include>
<!-- this sets out to overlaod DynamoDB, so must be run standalone -->
<include>**/ITestDynamoDBMetadataStoreScale.java</include>
+ <!-- the terasort tests both work with a file in the same path in -->
+ <!-- the local FS. Running them sequentially guarantees isolation -->
+ <include>**/ITestTerasort*.java</include>
</includes>
</configuration>
</execution>
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index ecb5d0b..a841a66 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -31,6 +31,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceOperations;
+
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Assume;
@@ -564,6 +567,65 @@ public final class S3ATestUtils {
}
/**
+ * Call a function; any exception raised is logged at info.
+ * This is for test teardowns.
+ * @param log log to use.
+ * @param operation operation to invoke
+ * @param <T> type of operation.
+ */
+ public static <T> void callQuietly(final Logger log,
+ final Invoker.Operation<T> operation) {
+ try {
+ operation.execute();
+ } catch (Exception e) {
+ log.info(e.toString(), e);
+ }
+ }
+
+ /**
+ * Call a void operation; any exception raised is logged at info.
+ * This is for test teardowns.
+ * @param log log to use.
+ * @param operation operation to invoke
+ */
+ public static void callQuietly(final Logger log,
+ final Invoker.VoidOperation operation) {
+ try {
+ operation.execute();
+ } catch (Exception e) {
+ log.info(e.toString(), e);
+ }
+ }
+
+ /**
+ * Deploy a hadoop service: init and start it.
+ * @param conf configuration to use
+ * @param service service to configure
+ * @param <T> type of service
+ * @return the started service
+ */
+ public static <T extends Service> T deployService(
+ final Configuration conf,
+ final T service) {
+ service.init(conf);
+ service.start();
+ return service;
+ }
+
+ /**
+ * Terminate a service, returning {@code null} cast at compile-time
+ * to the type of the service, for ease of setting fields to null.
+ * @param service service.
+ * @param <T> type of the service
+ * @return null, always
+ */
+ @SuppressWarnings("ThrowableNotThrown")
+ public static <T extends Service> T terminateService(final T service) {
+ ServiceOperations.stopQuietly(LOG, service);
+ return null;
+ }
+
+ /**
* Helper class to do diffs of metrics.
*/
public static final class MetricDiff {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
index 246bf9d..0a3d07a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.fs.s3a.commit;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -50,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
/**
@@ -75,6 +79,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
private InconsistentAmazonS3Client inconsistentClient;
+
/**
* Should the inconsistent S3A client be used?
* Default value: true.
@@ -436,4 +441,63 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
jContext.getConfiguration(),
TypeConverter.fromYarn(attemptID));
}
+
+
+ /**
+ * Load in the success data marker: this guarantees that an S3A
+ * committer was used.
+ * @param fs filesystem
+ * @param outputPath path of job
+ * @param committerName name of committer to match
+ * @return the success data
+ * @throws IOException IO failure
+ */
+ public static SuccessData validateSuccessFile(final S3AFileSystem fs,
+ final Path outputPath, final String committerName) throws IOException {
+ SuccessData successData = null;
+ try {
+ successData = loadSuccessFile(fs, outputPath);
+ } catch (FileNotFoundException e) {
+ // either the output path is missing or, if its the success file,
+ // somehow the relevant committer wasn't picked up.
+ String dest = outputPath.toString();
+ LOG.error("No _SUCCESS file found under {}", dest);
+ List<String> files = new ArrayList<>();
+ applyLocatedFiles(fs.listFiles(outputPath, true),
+ (status) -> {
+ files.add(status.getPath().toString());
+ LOG.error("{} {}", status.getPath(), status.getLen());
+ });
+ throw new AssertionError("No _SUCCESS file in " + dest
+ + "; found : " + files.stream().collect(Collectors.joining("\n")),
+ e);
+ }
+ String commitDetails = successData.toString();
+ LOG.info("Committer name " + committerName + "\n{}",
+ commitDetails);
+ LOG.info("Committer statistics: \n{}",
+ successData.dumpMetrics(" ", " = ", "\n"));
+ LOG.info("Diagnostics\n{}",
+ successData.dumpDiagnostics(" ", " = ", "\n"));
+ assertEquals("Wrong committer in " + commitDetails,
+ committerName, successData.getCommitter());
+ return successData;
+ }
+
+ /**
+ * Load a success file; fail if the file is empty/nonexistent.
+ * @param fs filesystem
+ * @param outputPath directory containing the success file.
+ * @return the loaded file.
+ * @throws IOException failure to find/load the file
+ * @throws AssertionError file is 0-bytes long
+ */
+ public static SuccessData loadSuccessFile(final S3AFileSystem fs,
+ final Path outputPath) throws IOException {
+ Path success = new Path(outputPath, _SUCCESS);
+ FileStatus status = fs.getFileStatus(success);
+ assertTrue("0 byte success file - not a s3guard committer " + success,
+ status.getLen() > 0);
+ return SuccessData.load(fs, success);
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
index 161db85..09da720 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
@@ -30,22 +30,17 @@ import java.util.Set;
import java.util.UUID;
import com.google.common.collect.Sets;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
@@ -54,102 +49,35 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
-import org.apache.hadoop.service.ServiceOperations;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
-import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
-import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID;
+
+/**
+ * Test for an MR Job with all the different committers.
+ */
+public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest {
-/** Full integration test of an MR job. */
-public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractITCommitMRJob.class);
- private static final int TEST_FILE_COUNT = 2;
- private static final int SCALE_TEST_FILE_COUNT = 20;
-
- private static MiniDFSClusterService hdfs;
- private static MiniMRYarnCluster yarn = null;
- private static JobConf conf = null;
- private boolean uniqueFilenames = false;
- private boolean scaleTest;
-
- protected static FileSystem getDFS() {
- return hdfs.getClusterFS();
- }
-
- @BeforeClass
- public static void setupClusters() throws IOException {
- // the HDFS and YARN clusters share the same configuration, so
- // the HDFS cluster binding is implicitly propagated to YARN
- conf = new JobConf();
- conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
- conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE);
-
- hdfs = new MiniDFSClusterService();
- hdfs.init(conf);
- hdfs.start();
- yarn = new MiniMRYarnCluster("ITCommitMRJob", 2);
- yarn.init(conf);
- yarn.start();
- }
-
- @SuppressWarnings("ThrowableNotThrown")
- @AfterClass
- public static void teardownClusters() throws IOException {
- conf = null;
- ServiceOperations.stopQuietly(yarn);
- ServiceOperations.stopQuietly(hdfs);
- hdfs = null;
- yarn = null;
- }
-
- public static MiniDFSCluster getHdfs() {
- return hdfs.getCluster();
- }
-
- public static FileSystem getLocalFS() {
- return hdfs.getLocalFS();
- }
-
@Rule
public final TemporaryFolder temp = new TemporaryFolder();
- /**
- * The name of the committer as returned by
- * {@link AbstractS3ACommitter#getName()} and used for committer construction.
- */
- protected abstract String committerName();
-
- @Override
- public void setup() throws Exception {
- super.setup();
- scaleTest = getTestPropertyBool(
- getConfiguration(),
- KEY_SCALE_TESTS_ENABLED,
- DEFAULT_SCALE_TESTS_ENABLED);
- }
-
- @Override
- protected int getTestTimeoutMillis() {
- return SCALE_TEST_TIMEOUT_SECONDS * 1000;
- }
-
@Test
public void testMRJob() throws Exception {
+ describe("Run a simple MR Job");
+
S3AFileSystem fs = getFileSystem();
// final dest is in S3A
- Path outputPath = path("testMRJob");
+ Path outputPath = path(getMethodName());
String commitUUID = UUID.randomUUID().toString();
- String suffix = uniqueFilenames ? ("-" + commitUUID) : "";
+ String suffix = isUniqueFilenames() ? ("-" + commitUUID) : "";
int numFiles = getTestFileCount();
List<String> expectedFiles = new ArrayList<>(numFiles);
Set<String> expectedKeys = Sets.newHashSet();
for (int i = 0; i < numFiles; i += 1) {
- File file = temp.newFile(String.valueOf(i) + ".text");
+ File file = temp.newFile(i + ".text");
try (FileOutputStream out = new FileOutputStream(file)) {
out.write(("file " + i).getBytes(StandardCharsets.UTF_8));
}
@@ -160,17 +88,8 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
}
Collections.sort(expectedFiles);
- Job mrJob = Job.getInstance(yarn.getConfig(), "test-committer-job");
+ Job mrJob = createJob();
JobConf jobConf = (JobConf) mrJob.getConfiguration();
- jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
- uniqueFilenames);
-
-
- bindCommitter(jobConf,
- CommitConstants.S3A_COMMITTER_FACTORY,
- committerName());
- // pass down the scale test flag
- jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest);
mrJob.setOutputFormatClass(LoggingTextOutputFormat.class);
FileOutputFormat.setOutputPath(mrJob, outputPath);
@@ -204,7 +123,7 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
mrJob.setMaxMapAttempts(1);
mrJob.submit();
- try (DurationInfo d = new DurationInfo(LOG, "Job Execution")) {
+ try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) {
boolean succeeded = mrJob.waitForCompletion(true);
assertTrue("MR job failed", succeeded);
}
@@ -223,24 +142,11 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
}
Collections.sort(actualFiles);
- // load in the success data marker: this guarantees that a s3guard
- // committer was used
- Path success = new Path(outputPath, _SUCCESS);
- FileStatus status = fs.getFileStatus(success);
- assertTrue("0 byte success file - not a s3guard committer " + success,
- status.getLen() > 0);
- SuccessData successData = SuccessData.load(fs, success);
- String commitDetails = successData.toString();
- LOG.info("Committer name " + committerName() + "\n{}",
- commitDetails);
- LOG.info("Committer statistics: \n{}",
- successData.dumpMetrics(" ", " = ", "\n"));
- LOG.info("Diagnostics\n{}",
- successData.dumpDiagnostics(" ", " = ", "\n"));
- assertEquals("Wrong committer in " + commitDetails,
- committerName(), successData.getCommitter());
+ SuccessData successData = validateSuccessFile(fs, outputPath,
+ committerName());
List<String> successFiles = successData.getFilenames();
- assertTrue("No filenames in " + commitDetails,
+ String commitData = successData.toString();
+ assertTrue("No filenames in " + commitData,
!successFiles.isEmpty());
assertEquals("Should commit the expected files",
@@ -249,42 +155,13 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
Set<String> summaryKeys = Sets.newHashSet();
summaryKeys.addAll(successFiles);
assertEquals("Summary keyset doesn't list the the expected paths "
- + commitDetails, expectedKeys, summaryKeys);
+ + commitData, expectedKeys, summaryKeys);
assertPathDoesNotExist("temporary dir",
new Path(outputPath, CommitConstants.TEMPORARY));
customPostExecutionValidation(outputPath, successData);
}
/**
- * Get the file count for the test.
- * @return the number of mappers to create.
- */
- public int getTestFileCount() {
- return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT;
- }
-
- /**
- * Override point to let implementations tune the MR Job conf.
- * @param jobConf configuration
- */
- protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
-
- }
-
- /**
- * Override point for any committer specific validation operations;
- * called after the base assertions have all passed.
- * @param destPath destination of work
- * @param successData loaded success data
- * @throws Exception failure
- */
- protected void customPostExecutionValidation(Path destPath,
- SuccessData successData)
- throws Exception {
-
- }
-
- /**
* Test Mapper.
* This is executed in separate process, and must not make any assumptions
* about external state.
@@ -305,7 +182,7 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
org.apache.log4j.BasicConfigurator.configure();
boolean scaleMap = context.getConfiguration()
.getBoolean(KEY_SCALE_TESTS_ENABLED, false);
- operations = scaleMap ? 1000 : 10;
+ operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS;
id = context.getTaskAttemptID().toString();
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java
new file mode 100644
index 0000000..45f0738
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES;
+
+/**
+ * Full integration test MR jobs.
+ *
+ * This is all done on shared static mini YARN and HDFS clusters, set up before
+ * any of the tests methods run.
+ *
+ * To isolate tests properly for parallel test runs, that static state
+ * needs to be stored in the final classes implementing the tests, and
+ * exposed to the base class, with the setup clusters in the
+ * specific test suites creating the clusters with unique names.
+ *
+ * This is "hard" to do in Java, unlike, say, Scala.
+ *
+ * Note: this turns out not to be the root cause of ordering problems
+ * with the Terasort tests (that is hard coded use of a file in the local FS),
+ * but this design here does make it clear that the before and after class
+ * operations are explicitly called in the subclasses.
+ * If two subclasses of this class are instantiated in the same JVM, in order,
+ * they are guaranteed to be isolated.
+ *
+ * History: this is a superclass extracted from
+ * {@link AbstractITCommitMRJob} while adding support for testing terasorting.
+ *
+ */
+public abstract class AbstractYarnClusterITest extends AbstractCommitITest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractYarnClusterITest.class);
+
+ private static final int TEST_FILE_COUNT = 2;
+ private static final int SCALE_TEST_FILE_COUNT = 20;
+
+ public static final int SCALE_TEST_KEYS = 1000;
+ public static final int BASE_TEST_KEYS = 10;
+
+ private boolean scaleTest;
+
+ private boolean uniqueFilenames = false;
+
+ /**
+ * This is the cluster binding which every subclass must create.
+ */
+ protected static final class ClusterBinding {
+
+ private final MiniDFSClusterService hdfs;
+
+ private final MiniMRYarnCluster yarn;
+
+ public ClusterBinding(
+ final MiniDFSClusterService hdfs,
+ final MiniMRYarnCluster yarn) {
+ this.hdfs = checkNotNull(hdfs);
+ this.yarn = checkNotNull(yarn);
+ }
+
+ public MiniDFSClusterService getHdfs() {
+ return hdfs;
+ }
+
+ public MiniMRYarnCluster getYarn() {
+ return yarn;
+ }
+
+ public Configuration getConf() {
+ return getYarn().getConfig();
+ }
+
+ public void terminate() {
+ terminateService(getYarn());
+ terminateService(getHdfs());
+ }
+ }
+
+ /**
+ * Create the cluster binding. This must be done in
+ * class setup of the (final) subclass.
+ * The HDFS and YARN clusters share the same configuration, so
+ * the HDFS cluster binding is implicitly propagated to YARN.
+ * @param conf configuration to start with.
+ * @return the cluster binding.
+ * @throws IOException failure.
+ */
+ protected static ClusterBinding createCluster(JobConf conf)
+ throws IOException {
+
+ conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
+ conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE);
+
+ // create a unique cluster name.
+ String clusterName = "yarn-" + UUID.randomUUID();
+ MiniDFSClusterService miniDFSClusterService = deployService(conf,
+ new MiniDFSClusterService());
+ MiniMRYarnCluster yarnCluster = deployService(conf,
+ new MiniMRYarnCluster(clusterName, 2));
+ return new ClusterBinding(miniDFSClusterService, yarnCluster);
+ }
+
+ /**
+ * Get the cluster binding for this subclass
+ * @return
+ */
+ protected abstract ClusterBinding getClusterBinding();
+
+ protected MiniDFSClusterService getHdfs() {
+ return getClusterBinding().getHdfs();
+ }
+
+
+ protected MiniMRYarnCluster getYarn() {
+ return getClusterBinding().getYarn();
+ }
+
+ public FileSystem getLocalFS() {
+ return getHdfs().getLocalFS();
+ }
+
+ protected FileSystem getDFS() {
+ return getHdfs().getClusterFS();
+ }
+
+ /**
+ * The name of the committer as returned by
+ * {@link AbstractS3ACommitter#getName()} and used for committer construction.
+ */
+ protected abstract String committerName();
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ assertNotNull("cluster is not bound",
+ getClusterBinding());
+
+ scaleTest = getTestPropertyBool(
+ getConfiguration(),
+ KEY_SCALE_TESTS_ENABLED,
+ DEFAULT_SCALE_TESTS_ENABLED);
+ }
+
+ @Override
+ protected int getTestTimeoutMillis() {
+ return SCALE_TEST_TIMEOUT_SECONDS * 1000;
+ }
+
+ protected JobConf newJobConf() {
+ return new JobConf(getYarn().getConfig());
+ }
+
+
+ protected Job createJob() throws IOException {
+ Job mrJob = Job.getInstance(getClusterBinding().getConf(),
+ getMethodName());
+ patchConfigurationForCommitter(mrJob.getConfiguration());
+ return mrJob;
+ }
+
+ protected Configuration patchConfigurationForCommitter(
+ final Configuration jobConf) {
+ jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
+ uniqueFilenames);
+ bindCommitter(jobConf,
+ CommitConstants.S3A_COMMITTER_FACTORY,
+ committerName());
+ // pass down the scale test flag
+ jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest);
+ return jobConf;
+ }
+
+ /**
+ * Get the file count for the test.
+ * @return the number of mappers to create.
+ */
+ public int getTestFileCount() {
+ return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT;
+ }
+
+ /**
+ * Override point to let implementations tune the MR Job conf.
+ * @param jobConf configuration
+ */
+ protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
+
+ }
+
+ /**
+ * Override point for any committer specific validation operations;
+ * called after the base assertions have all passed.
+ * @param destPath destination of work
+ * @param successData loaded success data
+ * @throws Exception failure
+ */
+ protected void customPostExecutionValidation(Path destPath,
+ SuccessData successData)
+ throws Exception {
+
+ }
+
+ /**
+ * Assume that scale tests are enabled.
+ */
+ protected void requireScaleTestsEnabled() {
+ assume("Scale test disabled: to enable set property " +
+ KEY_SCALE_TESTS_ENABLED,
+ isScaleTest());
+ }
+
+ public boolean isScaleTest() {
+ return scaleTest;
+ }
+
+ public boolean isUniqueFilenames() {
+ return uniqueFilenames;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java
similarity index 74%
copy from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
copy to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java
index b7be17a..a9b9c2c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java
@@ -18,6 +18,11 @@
package org.apache.hadoop.fs.s3a.commit.magic;
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
@@ -33,7 +38,30 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
* the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are
* passed down to these processes.
*/
-public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
+public final class ITestMagicCommitMRJob extends AbstractITCommitMRJob {
+
+ /**
+ * The static cluster binding with the lifecycle of this test; served
+ * through instance-level methods for sharing across methods in the
+ * suite.
+ */
+ @SuppressWarnings("StaticNonFinalField")
+ private static ClusterBinding clusterBinding;
+
+ @BeforeClass
+ public static void setupClusters() throws IOException {
+ clusterBinding = createCluster(new JobConf());
+ }
+
+ @AfterClass
+ public static void teardownClusters() throws IOException {
+ clusterBinding.terminate();
+ }
+
+ @Override
+ public ClusterBinding getClusterBinding() {
+ return clusterBinding;
+ }
/**
* Need consistency here.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java
similarity index 59%
copy from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java
copy to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java
index c10ebed..8d44ddb 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java
@@ -18,13 +18,41 @@
package org.apache.hadoop.fs.s3a.commit.staging.integration;
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
+import org.apache.hadoop.mapred.JobConf;
/**
* Full integration test for the directory committer.
*/
-public class ITDirectoryCommitMRJob extends AbstractITCommitMRJob {
+public final class ITestDirectoryCommitMRJob extends AbstractITCommitMRJob {
+
+ /**
+ * The static cluster binding with the lifecycle of this test; served
+ * through instance-level methods for sharing across methods in the
+ * suite.
+ */
+ @SuppressWarnings("StaticNonFinalField")
+ private static ClusterBinding clusterBinding;
+
+ @BeforeClass
+ public static void setupClusters() throws IOException {
+ clusterBinding = createCluster(new JobConf()); }
+
+ @AfterClass
+ public static void teardownClusters() throws IOException {
+ clusterBinding.terminate();
+ }
+
+ @Override
+ public ClusterBinding getClusterBinding() {
+ return clusterBinding;
+ }
@Override
protected String committerName() {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java
similarity index 59%
rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java
rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java
index 1c19a95..f71479c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java
@@ -18,13 +18,42 @@
package org.apache.hadoop.fs.s3a.commit.staging.integration;
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
+import org.apache.hadoop.mapred.JobConf;
/**
* Full integration test for the partition committer.
*/
-public class ITPartitionCommitMRJob extends AbstractITCommitMRJob {
+public final class ITestPartitionCommitMRJob extends AbstractITCommitMRJob {
+
+ /**
+ * The static cluster binding with the lifecycle of this test; served
+ * through instance-level methods for sharing across methods in the
+ * suite.
+ */
+ @SuppressWarnings("StaticNonFinalField")
+ private static ClusterBinding clusterBinding;
+
+ @BeforeClass
+ public static void setupClusters() throws IOException {
+ clusterBinding = createCluster(new JobConf());
+ }
+
+ @AfterClass
+ public static void teardownClusters() throws IOException {
+ clusterBinding.terminate();
+ }
+
+ @Override
+ public ClusterBinding getClusterBinding() {
+ return clusterBinding;
+ }
@Override
protected String committerName() {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java
similarity index 63%
rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java
rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java
index 76ad464..d4a351f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java
@@ -18,25 +18,53 @@
package org.apache.hadoop.fs.s3a.commit.staging.integration;
-import org.junit.Test;
+import java.io.IOException;
import org.hamcrest.core.StringContains;
import org.hamcrest.core.StringEndsWith;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
-import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
-import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS;
/**
* Full integration test for the staging committer.
*/
-public class ITStagingCommitMRJob extends AbstractITCommitMRJob {
+public final class ITestStagingCommitMRJob extends AbstractITCommitMRJob {
+
+ /**
+ * The static cluster binding with the lifecycle of this test; served
+ * through instance-level methods for sharing across methods in the
+ * suite.
+ */
+ @SuppressWarnings("StaticNonFinalField")
+ private static ClusterBinding clusterBinding;
+
+ @BeforeClass
+ public static void setupClusters() throws IOException {
+ clusterBinding = createCluster(new JobConf());
+ }
+
+ @AfterClass
+ public static void teardownClusters() throws IOException {
+ clusterBinding.terminate();
+ }
+
+ @Override
+ public ClusterBinding getClusterBinding() {
+ return clusterBinding;
+ }
@Override
protected String committerName() {
@@ -51,12 +79,12 @@ public class ITStagingCommitMRJob extends AbstractITCommitMRJob {
public void testStagingDirectory() throws Throwable {
FileSystem hdfs = getDFS();
Configuration conf = hdfs.getConf();
- conf.set(CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH,
- "private");
+ conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "private");
Path dir = getMultipartUploadCommitsDirectory(conf, "UUID");
- assertThat(dir.toString(), StringEndsWith.endsWith(
- "UUID/"
- + StagingCommitterConstants.STAGING_UPLOADS));
+ assertThat("Directory " + dir + " path is wrong",
+ dir.toString(),
+ StringEndsWith.endsWith("UUID/"
+ + STAGING_UPLOADS));
assertTrue("path unqualified", dir.isAbsolute());
String self = UserGroupInformation.getCurrentUser().getShortUserName();
assertThat(dir.toString(),
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java
similarity index 74%
rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java
rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java
index be477a7..68926f9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.fs.s3a.commit.staging.integration;
import java.io.IOException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
@@ -33,7 +36,30 @@ import org.apache.hadoop.test.LambdaTestUtils;
* This is a test to verify that the committer will fail if the destination
* directory exists, and that this happens in job setup.
*/
-public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
+public final class ITestStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
+
+ /**
+ * The static cluster binding with the lifecycle of this test; served
+ * through instance-level methods for sharing across methods in the
+ * suite.
+ */
+ @SuppressWarnings("StaticNonFinalField")
+ private static ClusterBinding clusterBinding;
+
+ @BeforeClass
+ public static void setupClusters() throws IOException {
+ clusterBinding = createCluster(new JobConf());
+ }
+
+ @AfterClass
+ public static void teardownClusters() throws IOException {
+ clusterBinding.terminate();
+ }
+
+ @Override
+ public ClusterBinding getClusterBinding() {
+ return clusterBinding;
+ }
@Override
protected String committerName() {
@@ -59,4 +85,5 @@ public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
"Output directory",
super::testMRJob);
}
+
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java
new file mode 100644
index 0000000..d286212
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.terasort;
+
+import java.util.Optional;
+import java.util.function.BiConsumer;
+
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.terasort.TeraGen;
+import org.apache.hadoop.examples.terasort.TeraSort;
+import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
+import org.apache.hadoop.examples.terasort.TeraValidate;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
+import org.apache.hadoop.fs.s3a.commit.DurationInfo;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import static java.util.Optional.empty;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
+
+/**
+ * Runs Terasort against S3A.
+ *
+ * This is all done on a shared mini YARN and HDFS clusters, set up before
+ * any of the tests methods run.
+ *
+ * The tests run in sequence, so each operation is isolated.
+ * This also means that the test paths deleted in test
+ * teardown; shared variables must all be static.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@SuppressWarnings("StaticNonFinalField")
+public abstract class AbstractCommitTerasortIT extends
+ AbstractYarnClusterITest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractCommitTerasortIT.class);
+
+ // all the durations are optional as they only get filled in when
+ // a test run successfully completes. Failed tests don't have numbers.
+ private static Optional<DurationInfo> terasortDuration = empty();
+
+ private static Optional<DurationInfo> teragenStageDuration = empty();
+
+ private static Optional<DurationInfo> terasortStageDuration = empty();
+
+ private static Optional<DurationInfo> teravalidateStageDuration = empty();
+
+ private Path terasortPath;
+
+ private Path sortInput;
+
+ private Path sortOutput;
+
+ private Path sortValidate;
+
+ /**
+ * Not using special paths here.
+ * @return false
+ */
+ @Override
+ public boolean useInconsistentClient() {
+ return false;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ requireScaleTestsEnabled();
+ prepareToTerasort();
+ }
+
+ /**
+ * Set up for terasorting by initializing paths.
+ * The paths used must be unique across parallel runs.
+ */
+ private void prepareToTerasort() {
+ // small sample size for faster runs
+ Configuration yarnConfig = getYarn().getConfig();
+ yarnConfig.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
+ yarnConfig.setBoolean(
+ TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
+ true);
+ terasortPath = new Path("/terasort-" + getClass().getSimpleName())
+ .makeQualified(getFileSystem());
+ sortInput = new Path(terasortPath, "sortin");
+ sortOutput = new Path(terasortPath, "sortout");
+ sortValidate = new Path(terasortPath, "validate");
+ if (!terasortDuration.isPresent()) {
+ terasortDuration = Optional.of(new DurationInfo(LOG, "Terasort"));
+ }
+ }
+
+ /**
+ * Execute a single stage in the terasort,
+ * @param stage Stage name for messages/assertions.
+ * @param jobConf job conf
+ * @param dest destination directory -the _SUCCESS File will be expected here.
+ * @param tool tool to run.
+ * @param args args for the tool.
+ * @throws Exception any failure
+ */
+ private Optional<DurationInfo> executeStage(
+ final String stage,
+ final JobConf jobConf,
+ final Path dest,
+ final Tool tool,
+ final String[] args) throws Exception {
+ int result;
+ DurationInfo d = new DurationInfo(LOG, stage);
+ try {
+ result = ToolRunner.run(jobConf, tool, args);
+ } finally {
+ d.close();
+ }
+ assertEquals(stage
+ + "(" + StringUtils.join(", ", args) + ")"
+ + " failed", 0, result);
+ validateSuccessFile(getFileSystem(), dest, committerName());
+ return Optional.of(d);
+ }
+
+ /**
+ * Set up terasort by cleaning out the destination, and note the initial
+ * time before any of the jobs are executed.
+ */
+ @Test
+ public void test_100_terasort_setup() throws Throwable {
+ describe("Setting up for a terasort");
+
+ getFileSystem().delete(terasortPath, true);
+ }
+
+ @Test
+ public void test_110_teragen() throws Throwable {
+ describe("Teragen to %s", sortInput);
+
+ JobConf jobConf = newJobConf();
+ patchConfigurationForCommitter(jobConf);
+ teragenStageDuration = executeStage("Teragen",
+ jobConf,
+ sortInput,
+ new TeraGen(),
+ new String[]{Integer.toString(SCALE_TEST_KEYS), sortInput.toString()});
+ }
+
+ @Test
+ public void test_120_terasort() throws Throwable {
+ describe("Terasort from %s to %s", sortInput, sortOutput);
+ loadSuccessFile(getFileSystem(), sortInput);
+ JobConf jobConf = newJobConf();
+ patchConfigurationForCommitter(jobConf);
+ // this job adds some data, so skip it.
+ jobConf.set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
+ terasortStageDuration = executeStage("TeraSort",
+ jobConf,
+ sortOutput,
+ new TeraSort(),
+ new String[]{sortInput.toString(), sortOutput.toString()});
+ }
+
+ @Test
+ public void test_130_teravalidate() throws Throwable {
+ describe("TeraValidate from %s to %s", sortOutput, sortValidate);
+ loadSuccessFile(getFileSystem(), sortOutput);
+ JobConf jobConf = newJobConf();
+ patchConfigurationForCommitter(jobConf);
+ teravalidateStageDuration = executeStage("TeraValidate",
+ jobConf,
+ sortValidate,
+ new TeraValidate(),
+ new String[]{sortOutput.toString(), sortValidate.toString()});
+ }
+
+ /**
+ * Print the results, and save to the base dir as a CSV file.
+ * Why there? Makes it easy to list and compare.
+ */
+ @Test
+ public void test_140_teracomplete() throws Throwable {
+ terasortDuration.get().close();
+
+ final StringBuilder results = new StringBuilder();
+ results.append("\"Operation\"\t\"Duration\"\n");
+
+ // this is how you dynamically create a function in a method
+ // for use afterwards.
+ // Works because there's no IOEs being raised in this sequence.
+ BiConsumer<String, Optional<DurationInfo>> stage =
+ (s, od) ->
+ results.append(String.format("\"%s\"\t\"%s\"\n",
+ s,
+ od.map(DurationInfo::getDurationString).orElse("")));
+
+ stage.accept("Generate", teragenStageDuration);
+ stage.accept("Terasort", terasortStageDuration);
+ stage.accept("Validate", teravalidateStageDuration);
+ stage.accept("Completed", terasortDuration);
+ String text = results.toString();
+ Path path = new Path(terasortPath, "results.csv");
+ LOG.info("Results are in {}\n{}", path, text);
+ ContractTestUtils.writeTextFile(getFileSystem(), path, text, true);
+ }
+
+ /**
+ * Reset the duration so if two committer tests are run sequentially.
+ * Without this the total execution time is reported as from the start of
+ * the first test suite to the end of the second.
+ */
+ @Test
+ public void test_150_teracleanup() throws Throwable {
+ terasortDuration = Optional.empty();
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java
similarity index 52%
rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java
rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java
index c10ebed..cb9cdd0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java
@@ -16,18 +16,47 @@
* limitations under the License.
*/
-package org.apache.hadoop.fs.s3a.commit.staging.integration;
+package org.apache.hadoop.fs.s3a.commit.terasort;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
-import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
+import org.apache.hadoop.mapred.JobConf;
/**
- * Full integration test for the directory committer.
+ * Terasort with the directory committer.
*/
-public class ITDirectoryCommitMRJob extends AbstractITCommitMRJob {
+public final class ITestTerasortDirectoryCommitter extends AbstractCommitTerasortIT {
+
+ /**
+ * The static cluster binding with the lifecycle of this test; served
+ * through instance-level methods for sharing across methods in the
+ * suite.
+ */
+ @SuppressWarnings("StaticNonFinalField")
+ private static ClusterBinding clusterBinding;
+
+ @BeforeClass
+ public static void setupClusters() throws IOException {
+ clusterBinding = createCluster(new JobConf());
+ }
+
+ @AfterClass
+ public static void teardownClusters() throws IOException {
+ clusterBinding.terminate();
+ }
+
+ @Override
+ public ClusterBinding getClusterBinding() {
+ return clusterBinding;
+ }
@Override
protected String committerName() {
return DirectoryStagingCommitter.NAME;
}
+
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java
similarity index 54%
rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java
index b7be17a..e1b4eac 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java
@@ -16,35 +16,47 @@
* limitations under the License.
*/
-package org.apache.hadoop.fs.s3a.commit.magic;
+package org.apache.hadoop.fs.s3a.commit.terasort;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
-import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
import org.apache.hadoop.mapred.JobConf;
-import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
/**
- * Full integration test for the Magic Committer.
- *
- * There's no need to disable the committer setting for the filesystem here,
- * because the committers are being instantiated in their own processes;
- * the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are
- * passed down to these processes.
+ * Terasort with the magic committer.
*/
-public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
+public final class ITestTerasortMagicCommitter
+ extends AbstractCommitTerasortIT {
/**
- * Need consistency here.
- * @return false
+ * The static cluster binding with the lifecycle of this test; served
+ * through instance-level methods for sharing across methods in the
+ * suite.
*/
- @Override
- public boolean useInconsistentClient() {
- return false;
+ @SuppressWarnings("StaticNonFinalField")
+ private static ClusterBinding clusterBinding;
+
+ @BeforeClass
+ public static void setupClusters() throws IOException {
+ clusterBinding = createCluster(new JobConf());
+ }
+
+ @AfterClass
+ public static void teardownClusters() throws IOException {
+ clusterBinding.terminate();
}
@Override
+ public ClusterBinding getClusterBinding() {
+ return clusterBinding;
+ }
+ @Override
protected String committerName() {
return MagicS3GuardCommitter.NAME;
}
@@ -58,13 +70,4 @@ public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
}
- /**
- * Check that the magic dir was cleaned up.
- * {@inheritDoc}
- */
- @Override
- protected void customPostExecutionValidation(Path destPath,
- SuccessData successData) throws Exception {
- assertPathDoesNotExist("No cleanup", new Path(destPath, MAGIC));
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org