You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2018/08/30 14:23:22 UTC
[2/2] hadoop git commit: HADOOP-15107. Stabilize/tune S3A committers;
review correctness & docs. Contributed by Steve Loughran.
HADOOP-15107. Stabilize/tune S3A committers; review correctness & docs.
Contributed by Steve Loughran.
(cherry picked from commit 5a0babf76550f63dad4c17173c4da2bf335c6532)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a0766bf6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a0766bf6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a0766bf6
Branch: refs/heads/branch-3.1
Commit: a0766bf66a6d66514d078db8ae8808fc44f6e11b
Parents: 6f939d4
Author: Steve Loughran <st...@apache.org>
Authored: Thu Aug 30 15:23:08 2018 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Aug 30 15:23:08 2018 +0100
----------------------------------------------------------------------
.../lib/output/PathOutputCommitter.java | 12 +-
.../java/org/apache/hadoop/fs/s3a/Invoker.java | 15 +-
.../fs/s3a/commit/AbstractS3ACommitter.java | 16 +-
.../fs/s3a/commit/S3ACommitterFactory.java | 18 +-
.../s3a/commit/magic/MagicS3GuardCommitter.java | 7 +
.../staging/DirectoryStagingCommitter.java | 8 +-
.../staging/PartitionedStagingCommitter.java | 9 +-
.../hadoop/fs/s3a/commit/staging/Paths.java | 14 +-
.../fs/s3a/commit/staging/StagingCommitter.java | 50 ++++-
.../tools/hadoop-aws/committer_architecture.md | 94 ++++++---
.../markdown/tools/hadoop-aws/committers.md | 2 +-
.../fs/s3a/commit/AbstractCommitITest.java | 19 ++
.../fs/s3a/commit/AbstractITCommitMRJob.java | 5 +-
.../fs/s3a/commit/AbstractITCommitProtocol.java | 63 ++++--
.../fs/s3a/commit/ITestS3ACommitterFactory.java | 200 +++++++++++++++++++
.../fs/s3a/commit/magic/ITMagicCommitMRJob.java | 6 +-
.../commit/magic/ITestMagicCommitProtocol.java | 25 ++-
.../ITStagingCommitMRJobBadDest.java | 62 ++++++
.../integration/ITestStagingCommitProtocol.java | 13 ++
19 files changed, 542 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
index 3679d9f..5e25f50 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
@@ -57,8 +57,8 @@ public abstract class PathOutputCommitter extends OutputCommitter {
protected PathOutputCommitter(Path outputPath,
TaskAttemptContext context) throws IOException {
this.context = Preconditions.checkNotNull(context, "Null context");
- LOG.debug("Creating committer with output path {} and task context"
- + " {}", outputPath, context);
+ LOG.debug("Instantiating committer {} with output path {} and task context"
+ + " {}", this, outputPath, context);
}
/**
@@ -71,8 +71,8 @@ public abstract class PathOutputCommitter extends OutputCommitter {
protected PathOutputCommitter(Path outputPath,
JobContext context) throws IOException {
this.context = Preconditions.checkNotNull(context, "Null context");
- LOG.debug("Creating committer with output path {} and job context"
- + " {}", outputPath, context);
+ LOG.debug("Instantiating committer {} with output path {} and job context"
+ + " {}", this, outputPath, context);
}
/**
@@ -103,6 +103,8 @@ public abstract class PathOutputCommitter extends OutputCommitter {
@Override
public String toString() {
- return "PathOutputCommitter{context=" + context + '}';
+ return "PathOutputCommitter{context=" + context
+ + "; " + super.toString()
+ + '}';
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
index a007ba1..45912a0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
@@ -130,8 +130,9 @@ public class Invoker {
}
/**
- * Execute an operation and ignore all raised IOExceptions; log at INFO.
- * @param log log to log at info.
+ * Execute an operation and ignore all raised IOExceptions; log at INFO;
+ * full stack only at DEBUG.
+ * @param log log to use.
* @param action action to include in log
* @param path optional path to include in log
* @param operation operation to execute
@@ -145,13 +146,17 @@ public class Invoker {
try {
once(action, path, operation);
} catch (IOException e) {
- log.info("{}: {}", toDescription(action, path), e.toString(), e);
+ String description = toDescription(action, path);
+ String error = e.toString();
+ log.info("{}: {}", description, error);
+ log.debug("{}", description, e);
}
}
/**
- * Execute an operation and ignore all raised IOExceptions; log at INFO.
- * @param log log to log at info.
+ * Execute an operation and ignore all raised IOExceptions; log at INFO;
+ * full stack only at DEBUG.
+ * @param log log to use.
* @param action action to include in log
* @param path optional path to include in log
* @param operation operation to execute
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
index 5f1ddfa..d2501da 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
@@ -292,7 +292,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
final StringBuilder sb = new StringBuilder(
"AbstractS3ACommitter{");
sb.append("role=").append(role);
- sb.append(", name").append(getName());
+ sb.append(", name=").append(getName());
sb.append(", outputPath=").append(getOutputPath());
sb.append(", workPath=").append(workPath);
sb.append('}');
@@ -532,8 +532,14 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
new DurationInfo(LOG, "Aborting all pending commits under %s",
dest)) {
CommitOperations ops = getCommitOperations();
- List<MultipartUpload> pending = ops
- .listPendingUploadsUnderPath(dest);
+ List<MultipartUpload> pending;
+ try {
+ pending = ops.listPendingUploadsUnderPath(dest);
+ } catch (IOException e) {
+ // raised if the listPendingUploads call failed.
+ maybeIgnore(suppressExceptions, "aborting pending uploads", e);
+ return;
+ }
Tasks.foreach(pending)
.executeWith(buildThreadPool(getJobContext()))
.suppressExceptions(suppressExceptions)
@@ -656,7 +662,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
}
/**
- * Execute an operation; maybe suppress any raised IOException.
+ * Log or rethrow a caught IOException.
* @param suppress should raised IOEs be suppressed?
* @param action action (for logging when the IOE is suppressed.
* @param ex exception
@@ -667,7 +673,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
String action,
IOException ex) throws IOException {
if (suppress) {
- LOG.info(action, ex);
+ LOG.debug(action, ex);
} else {
throw ex;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
index 6b170f9..36d0af1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
@@ -77,9 +77,20 @@ public class S3ACommitterFactory extends AbstractS3ACommitterFactory {
AbstractS3ACommitterFactory factory = chooseCommitterFactory(fileSystem,
outputPath,
context.getConfiguration());
- return factory != null ?
- factory.createTaskCommitter(fileSystem, outputPath, context)
- : createFileOutputCommitter(outputPath, context);
+ if (factory != null) {
+ PathOutputCommitter committer = factory.createTaskCommitter(
+ fileSystem, outputPath, context);
+ LOG.info("Using committer {} to output data to {}",
+ (committer instanceof AbstractS3ACommitter
+ ? ((AbstractS3ACommitter) committer).getName()
+ : committer.toString()),
+ outputPath);
+ return committer;
+ } else {
+ LOG.warn("Using standard FileOutputCommitter to commit work."
+ + " This is slow and potentially unsafe.");
+ return createFileOutputCommitter(outputPath, context);
+ }
}
/**
@@ -104,6 +115,7 @@ public class S3ACommitterFactory extends AbstractS3ACommitterFactory {
String name = fsConf.getTrimmed(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
name = taskConf.getTrimmed(FS_S3A_COMMITTER_NAME, name);
+ LOG.debug("Committer option is {}", name);
switch (name) {
case COMMITTER_NAME_FILE:
factory = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
index c305141..c956a98 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
@@ -285,4 +285,11 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
return CommitUtilsWithMR.getTempTaskAttemptPath(context, getOutputPath());
}
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "MagicCommitter{");
+ sb.append('}');
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
index 3eda24f..23bb06b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
@@ -27,13 +27,11 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
-import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
/**
* This commits to a directory.
@@ -70,10 +68,8 @@ public class DirectoryStagingCommitter extends StagingCommitter {
if (getConflictResolutionMode(context, fs.getConf())
== ConflictResolution.FAIL
&& fs.exists(outputPath)) {
- LOG.debug("Failing commit by task attempt {} to write"
- + " to existing output path {}",
- context.getJobID(), getOutputPath());
- throw new PathExistsException(outputPath.toString(), E_DEST_EXISTS);
+ throw failDestinationExists(outputPath,
+ "Setting job as " + getRole());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
index bfaf443..b51bcb5 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
@@ -31,14 +31,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
-import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
/**
* Partitioned committer.
@@ -100,11 +98,8 @@ public class PartitionedStagingCommitter extends StagingCommitter {
Path partitionPath = getFinalPath(partition + "/file",
context).getParent();
if (fs.exists(partitionPath)) {
- LOG.debug("Failing commit by task attempt {} to write"
- + " to existing path {} under {}",
- context.getTaskAttemptID(), partitionPath, getOutputPath());
- throw new PathExistsException(partitionPath.toString(),
- E_DEST_EXISTS);
+ throw failDestinationExists(partitionPath,
+ "Committing task " + context.getTaskAttemptID());
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
index a4d39d7..a8c0273 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
@@ -167,13 +167,15 @@ public final class Paths {
return FileSystem.getLocal(conf).makeQualified(
allocator.getLocalPathForWrite(uuid, conf));
});
- } catch (ExecutionException e) {
- throw new RuntimeException(e.getCause());
- } catch (UncheckedExecutionException e) {
- if (e.getCause() instanceof RuntimeException) {
- throw (RuntimeException) e.getCause();
+ } catch (ExecutionException | UncheckedExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
}
- throw new RuntimeException(e);
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ }
+ throw new IOException(e);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 2182eaa..6d02e86 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
@@ -500,6 +502,10 @@ public class StagingCommitter extends AbstractS3ACommitter {
listAndFilter(attemptFS,
wrappedJobAttemptPath, false,
HIDDEN_FILE_FILTER));
+ } catch (FileNotFoundException e) {
+ // this can mean the job was aborted early on, so don't confuse people
+ // with long stack traces that aren't the underlying problem.
+ maybeIgnore(suppressExceptions, "Pending upload directory not found", e);
} catch (IOException e) {
// unable to work with endpoint, if suppressing errors decide our actions
maybeIgnore(suppressExceptions, "Listing pending uploads", e);
@@ -565,13 +571,13 @@ public class StagingCommitter extends AbstractS3ACommitter {
}
/**
- * Delete the working paths of a job. Does not attempt to clean up
- * the work of the wrapped committer.
+ * Delete the working paths of a job.
* <ol>
* <li>The job attempt path</li>
- * <li>$dest/__temporary</li>
+ * <li>{@code $dest/__temporary}</li>
* <li>the local working directory for staged files</li>
* </ol>
+ * Does not attempt to clean up the work of the wrapped committer.
* @param context job context
* @throws IOException IO failure
*/
@@ -836,6 +842,44 @@ public class StagingCommitter extends AbstractS3ACommitter {
}
/**
+ * Generate a {@link PathExistsException} because the destination exists.
+ * Lists some of the child entries first, to help diagnose the problem.
+ * @param path path which exists
+ * @param description description (usually task/job ID)
+ * @return an exception to throw
+ */
+ protected PathExistsException failDestinationExists(final Path path,
+ final String description) {
+
+ LOG.error("{}: Failing commit by job {} to write"
+ + " to existing output path {}.",
+ description,
+ getJobContext().getJobID(), path);
+ // List the first 10 descendants, to give some details
+ // on what is wrong but not overload things if there are many files.
+ try {
+ int limit = 10;
+ RemoteIterator<LocatedFileStatus> lf
+ = getDestFS().listFiles(path, true);
+ LOG.info("Partial Directory listing");
+ while (limit > 0 && lf.hasNext()) {
+ limit--;
+ LocatedFileStatus status = lf.next();
+ LOG.info("{}: {}",
+ status.getPath(),
+ status.isDirectory()
+ ? " dir"
+ : ("file size " + status.getLen() + " bytes"));
+ }
+ } catch (IOException e) {
+ LOG.info("Discarding exception raised when listing {}: " + e, path);
+ LOG.debug("stack trace ", e);
+ }
+ return new PathExistsException(path.toString(),
+ description + ": " + InternalCommitterConstants.E_DEST_EXISTS);
+ }
+
+ /**
* Get the conflict mode option string.
* @param context context with the config
* @param fsConf filesystem config
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
index e4ba75d..3071754 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
@@ -230,7 +230,6 @@ None: directories are created on demand.
Rename task attempt path to task committed path.
```python
-
def needsTaskCommit(fs, jobAttemptPath, taskAttemptPath, dest):
return fs.exists(taskAttemptPath)
@@ -276,12 +275,12 @@ def commitJob(fs, jobAttemptDir, dest):
(See below for details on `mergePaths()`)
-A failure during job abort cannot be recovered from except by re-executing
+A failure during job commit cannot be recovered from except by re-executing
the entire query:
```python
def isCommitJobRepeatable() :
- return True
+ return False
```
Accordingly, it is a failure point in the protocol. With a low number of files
@@ -307,12 +306,28 @@ def cleanupJob(fs, dest):
```
-### Job Recovery
+### Job Recovery Before `commitJob()`
-1. Data under task committed paths is retained
-1. All directories under `$dest/_temporary/$appAttemptId/_temporary/` are deleted.
+For all committers, the recovery process takes place in the application
+master.
+1. The job history file of the previous attempt is loaded and scanned
+to determine which tasks were recorded as having succeeded.
+1. For each successful task, the job committer has its `recoverTask()` method
+invoked with a `TaskAttemptContext` built from the previous attempt's details.
+1. If the method does not raise an exception, it is considered to have been
+recovered, and not to be re-executed.
+1. All other tasks are queued for execution.
-Uncommitted/unexecuted tasks are (re)executed.
+For the v1 committer, task recovery is straightforward.
+The directory of the committed task from the previous attempt is
+moved under the directory of the current application attempt.
+
+```python
+def recoverTask(tac):
+ oldAttemptId = appAttemptId - 1
+ fs.rename('$dest/_temporary/oldAttemptId/${tac.taskId}',
+ '$dest/_temporary/appAttemptId/${tac.taskId}')
+```
This significantly improves time to recover from Job driver (here MR AM) failure.
The only lost work is that of all tasks in progress -those which had generated
@@ -330,6 +345,11 @@ failure simply by rerunning the entire job. This is implicitly the strategy
in Spark, which does not attempt to recover any in-progress jobs. The faster
your queries, the simpler your recovery strategy needs to be.
+### Job Recovery During `commitJob()`
+
+This is not possible; a failure during job commit requires the entire job
+to be re-executed after cleaning up the destination directory.
+
### `mergePaths(FileSystem fs, FileStatus src, Path dest)` Algorithm
`mergePaths()` is the core algorithm to merge data; it is somewhat confusing
@@ -352,24 +372,23 @@ def mergePathsV1(fs, src, dest) :
fs.delete(dest, recursive = True)
fs.rename(src.getPath, dest)
else :
- # destination is directory, choose action on source type
- if src.isDirectory :
- if not toStat is None :
- if not toStat.isDirectory :
- # Destination exists and is not a directory
- fs.delete(dest)
- fs.rename(src.getPath(), dest)
- else :
- # Destination exists and is a directory
- # merge all children under destination directory
- for child in fs.listStatus(src.getPath) :
- mergePathsV1(fs, child, dest + child.getName)
- else :
- # destination does not exist
+ # src is directory, choose action on dest type
+ if not toStat is None :
+ if not toStat.isDirectory :
+ # Destination exists and is not a directory
+ fs.delete(dest)
fs.rename(src.getPath(), dest)
+ else :
+ # Destination exists and is a directory
+ # merge all children under destination directory
+ for child in fs.listStatus(src.getPath) :
+ mergePathsV1(fs, child, dest + child.getName)
+ else :
+ # destination does not exist
+ fs.rename(src.getPath(), dest)
```
-## v2 commit algorithm
+## The v2 Commit Algorithm
The v2 algorithm directly commits task output into the destination directory.
@@ -506,12 +525,31 @@ Cost: `O(1)` for normal filesystems, `O(files)` for object stores.
As no data is written to the destination directory, a task can be cleaned up
by deleting the task attempt directory.
-### v2 Job Recovery
+### v2 Job Recovery Before `commitJob()`
+
+
+Because the data has been renamed into the destination directory, all tasks
+recorded as having being committed have no recovery needed at all:
+
+```python
+def recoverTask(tac):
+```
+
+All active and queued tasks are scheduled for execution.
+
+There is a weakness here, the same one on a failure during `commitTask()`:
+it is only safe to repeat a task which failed during that commit operation
+if the name of all generated files are constant across all task attempts.
+
+If the Job AM fails while a task attempt has been instructed to commit,
+and that commit is not recorded as having completed, the state of that
+in-progress task is unknown...really it isn't be safe to recover the
+job at this point.
+
-Because the data has been renamed into the destination directory, it is nominally
-recoverable. However, this assumes that the number and name of generated
-files are constant on retried tasks.
+### v2 Job Recovery During `commitJob()`
+This is straightforward: `commitJob()` is re-invoked.
## How MapReduce uses the committer in a task
@@ -896,7 +934,7 @@ and metadata.
POST bucket.s3.aws.com/path?uploads
- An UploadId is returned
+ An `UploadId` is returned
1. Caller uploads one or more parts.
@@ -994,7 +1032,7 @@ Task outputs are directed to the local FS by `getTaskAttemptPath` and `getWorkPa
The single-directory and partitioned committers handle conflict resolution by
checking whether target paths exist in S3 before uploading any data.
-There are 3 conflict resolution modes, controlled by setting `fs.s3a.committer.staging.conflict-mode`:
+There are three conflict resolution modes, controlled by setting `fs.s3a.committer.staging.conflict-mode`:
* `fail`: Fail a task if an output directory or partition already exists. (Default)
* `append`: Upload data files without checking whether directories or partitions already exist.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
index 392cde2..09e123d 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
@@ -371,7 +371,7 @@ Put differently: start with the Directory Committer.
To use an S3A committer, the property `mapreduce.outputcommitter.factory.scheme.s3a`
must be set to the S3A committer factory, `org.apache.hadoop.fs.s3a.commit.staging.S3ACommitterFactory`.
-This is done in `core-default.xml`
+This is done in `mapred-default.xml`
```xml
<property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
index b8610d6..243cb37 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -174,6 +174,25 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
}
/**
+ * Create a random Job ID using the fork ID as part of the number.
+ * @return fork ID string in a format parseable by Jobs
+ * @throws Exception failure
+ */
+ protected String randomJobId() throws Exception {
+ String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID, "0001");
+ int l = testUniqueForkId.length();
+ String trailingDigits = testUniqueForkId.substring(l - 4, l);
+ try {
+ int digitValue = Integer.valueOf(trailingDigits);
+ return String.format("20070712%04d_%04d",
+ (long)(Math.random() * 1000),
+ digitValue);
+ } catch (NumberFormatException e) {
+ throw new Exception("Failed to parse " + trailingDigits, e);
+ }
+ }
+
+ /**
* Teardown waits for the consistency delay and resets failure count, so
* FS is stable, before the superclass teardown is called. This
* should clean things up better.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
index 13dfd83..161db85 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
@@ -38,7 +38,6 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -266,9 +265,9 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
/**
* Override point to let implementations tune the MR Job conf.
- * @param c configuration
+ * @param jobConf configuration
*/
- protected void applyCustomConfigOptions(Configuration c) {
+ protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
index 4d7f524..5ae8f54 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -159,25 +159,6 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
cleanupDestDir();
}
- /**
- * Create a random Job ID using the fork ID as part of the number.
- * @return fork ID string in a format parseable by Jobs
- * @throws Exception failure
- */
- private String randomJobId() throws Exception {
- String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID, "0001");
- int l = testUniqueForkId.length();
- String trailingDigits = testUniqueForkId.substring(l - 4, l);
- try {
- int digitValue = Integer.valueOf(trailingDigits);
- return String.format("20070712%04d_%04d",
- (long)(Math.random() * 1000),
- digitValue);
- } catch (NumberFormatException e) {
- throw new Exception("Failed to parse " + trailingDigits, e);
- }
- }
-
@Override
public void teardown() throws Exception {
describe("teardown");
@@ -765,6 +746,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer;
+ validateTaskAttemptWorkingDirectory(committer, tContext);
// write output
describe("1. Writing output");
@@ -1360,12 +1342,55 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
}
+ @Test
+ public void testS3ACommitterFactoryBinding() throws Throwable {
+ describe("Verify that the committer factory returns this "
+ + "committer when configured to do so");
+ Job job = newJob();
+ FileOutputFormat.setOutputPath(job, outDir);
+ Configuration conf = job.getConfiguration();
+ conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
+ conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
+ taskAttempt0);
+ String name = getCommitterName();
+ S3ACommitterFactory factory = new S3ACommitterFactory();
+ assertEquals("Wrong committer from factory",
+ createCommitter(outDir, tContext).getClass(),
+ factory.createOutputCommitter(outDir, tContext).getClass());
+ }
+
+ /**
+ * Validate the path of a file being written to during the write
+ * itself.
+ * @param p path
+ * @throws IOException IO failure
+ */
protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
}
+ /**
+ * Validate the path of a file being written to after the write
+ * operation has completed.
+ * @param p path
+ * @throws IOException IO failure
+ */
protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
}
+ /**
+ * Perform any actions needed to validate the working directory of
+ * a committer.
+ * For example: filesystem, path attributes
+ * @param committer committer instance
+ * @param context task attempt context
+ * @throws IOException IO failure
+ */
+ protected void validateTaskAttemptWorkingDirectory(
+ AbstractS3ACommitter committer,
+ TaskAttemptContext context) throws IOException {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
new file mode 100644
index 0000000..a8547d6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
+import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
+import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+
+/**
+ * Tests for some aspects of the committer factory.
+ * All tests are grouped into one single test so that only one
+ * S3A FS client is set up and used for the entire run.
+ * Saves time and money.
+ */
+public class ITestS3ACommitterFactory extends AbstractCommitITest {
+
+
+ protected static final String INVALID_NAME = "invalid-name";
+
+ /**
+ * Counter to guarantee that even in parallel test runs, no job has the same
+ * ID.
+ */
+
+ private String jobId;
+
+ // A random task attempt id for testing.
+ private String attempt0;
+
+ private TaskAttemptID taskAttempt0;
+
+ private Path outDir;
+
+ private S3ACommitterFactory factory;
+
+ private TaskAttemptContext tContext;
+
+ /**
+ * Parameterized list of bindings of committer name in config file to
+ * expected class instantiated.
+ */
+ private static final Object[][] bindings = {
+ {COMMITTER_NAME_FILE, FileOutputCommitter.class},
+ {COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class},
+ {COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class},
+ {InternalCommitterConstants.COMMITTER_NAME_STAGING,
+ StagingCommitter.class},
+ {COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class}
+ };
+
+ /**
+ * This is a ref to the FS conf, so changes here are visible
+ * to callers querying the FS config.
+ */
+ private Configuration filesystemConfRef;
+
+ private Configuration taskConfRef;
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ jobId = randomJobId();
+ attempt0 = "attempt_" + jobId + "_m_000000_0";
+ taskAttempt0 = TaskAttemptID.forName(attempt0);
+
+ outDir = path(getMethodName());
+ factory = new S3ACommitterFactory();
+ Configuration conf = new Configuration();
+ conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
+ conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
+ conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+ filesystemConfRef = getFileSystem().getConf();
+ tContext = new TaskAttemptContextImpl(conf, taskAttempt0);
+ taskConfRef = tContext.getConfiguration();
+ }
+
+ @Test
+ public void testEverything() throws Throwable {
+ testImplicitFileBinding();
+ testBindingsInTask();
+ testBindingsInFSConfig();
+ testInvalidFileBinding();
+ testInvalidTaskBinding();
+ }
+
+ /**
+ * Verify that if all config options are unset, the FileOutputCommitter
+ *
+ * is returned.
+ */
+ public void testImplicitFileBinding() throws Throwable {
+ taskConfRef.unset(FS_S3A_COMMITTER_NAME);
+ filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
+ assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
+ }
+
+ /**
+ * Verify that task bindings are picked up.
+ */
+ public void testBindingsInTask() throws Throwable {
+ // set this to an invalid value to be confident it is not
+ // being checked.
+ filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID");
+ taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
+ assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
+ for (Object[] binding : bindings) {
+ taskConfRef.set(FS_S3A_COMMITTER_NAME,
+ (String) binding[0]);
+ assertFactoryCreatesExpectedCommitter((Class) binding[1]);
+ }
+ }
+
+ /**
+ * Verify that FS bindings are picked up.
+ */
+ public void testBindingsInFSConfig() throws Throwable {
+ taskConfRef.unset(FS_S3A_COMMITTER_NAME);
+ filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
+ assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
+ for (Object[] binding : bindings) {
+ taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]);
+ assertFactoryCreatesExpectedCommitter((Class) binding[1]);
+ }
+ }
+
+ /**
+ * Create an invalid committer via the FS binding,
+ */
+ public void testInvalidFileBinding() throws Throwable {
+ taskConfRef.unset(FS_S3A_COMMITTER_NAME);
+ filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
+ LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
+ () -> createCommitter());
+ }
+
+ /**
+ * Create an invalid committer via the task attempt.
+ */
+ public void testInvalidTaskBinding() throws Throwable {
+ filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
+ taskConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
+ LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
+ () -> createCommitter());
+ }
+
+ /**
+ * Assert that the factory creates the expected committer.
+ * @param expected expected committer class.
+ * @throws IOException IO failure.
+ */
+ protected void assertFactoryCreatesExpectedCommitter(
+ final Class expected)
+ throws IOException {
+ assertEquals("Wrong Committer from factory",
+ expected,
+ createCommitter().getClass());
+ }
+
+ /**
+ * Create a committer.
+ * @return the committer
+ * @throws IOException IO failure.
+ */
+ private PathOutputCommitter createCommitter() throws IOException {
+ return factory.createOutputCommitter(outDir, tContext);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
index 57eb8b2..b7be17a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.fs.s3a.commit.magic;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import org.apache.hadoop.mapred.JobConf;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@@ -30,7 +30,7 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
*
* There's no need to disable the committer setting for the filesystem here,
* because the committers are being instantiated in their own processes;
- * the settings in {@link #applyCustomConfigOptions(Configuration)} are
+ * the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are
* passed down to these processes.
*/
public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
@@ -54,7 +54,7 @@ public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
* @param conf configuration
*/
@Override
- protected void applyCustomConfigOptions(Configuration conf) {
+ protected void applyCustomConfigOptions(JobConf conf) {
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
index 74c1d9d..057adf5 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.fs.s3a.commit.magic;
+import java.io.IOException;
+import java.net.URI;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -32,9 +35,8 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import java.io.IOException;
-
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.hamcrest.CoreMatchers.containsString;
/**
* Test the magic committer's commit protocol.
@@ -116,6 +118,25 @@ public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
}
/**
+ * The magic committer paths are always on S3, and always have
+ * "__magic" in the path.
+ * @param committer committer instance
+ * @param context task attempt context
+ * @throws IOException IO failure
+ */
+ @Override
+ protected void validateTaskAttemptWorkingDirectory(
+ final AbstractS3ACommitter committer,
+ final TaskAttemptContext context) throws IOException {
+ URI wd = committer.getWorkPath().toUri();
+ assertEquals("Wrong schema for working dir " + wd
+ + " with committer " + committer,
+ "s3a", wd.getScheme());
+ assertThat(wd.getPath(),
+ containsString('/' + CommitConstants.MAGIC + '/'));
+ }
+
+ /**
* The class provides a overridden implementation of commitJobInternal which
* causes the commit failed for the first time then succeed.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java
new file mode 100644
index 0000000..be477a7
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging.integration;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+/**
+ * This is a test to verify that the committer will fail if the destination
+ * directory exists, and that this happens in job setup.
+ */
+public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
+
+ @Override
+ protected String committerName() {
+ return StagingCommitter.NAME;
+ }
+
+ /**
+ * create the destination directory and expect a failure.
+ * @param conf configuration
+ */
+ @Override
+ protected void applyCustomConfigOptions(JobConf conf) throws IOException {
+ // This is the destination in the S3 FS
+ String outdir = conf.get(FileOutputFormat.OUTDIR);
+ S3AFileSystem fs = getFileSystem();
+ Path outputPath = new Path(outdir);
+ fs.mkdirs(outputPath);
+ }
+
+ @Override
+ public void testMRJob() throws Exception {
+ LambdaTestUtils.intercept(FileAlreadyExistsException.class,
+ "Output directory",
+ super::testMRJob);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0766bf6/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
index 08c572e..180e743 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
@@ -118,6 +118,19 @@ public class ITestStagingCommitProtocol extends AbstractITCommitProtocol {
}
/**
+ * The staging committers always have the local FS for their work.
+ * @param committer committer instance
+ * @param context task attempt context
+ * @throws IOException IO failure
+ */
+ @Override
+ protected void validateTaskAttemptWorkingDirectory(final AbstractS3ACommitter committer,
+ final TaskAttemptContext context) throws IOException {
+ Path wd = context.getWorkingDirectory();
+ assertEquals("file", wd.toUri().getScheme());
+ }
+
+ /**
* The class provides a overridden implementation of commitJobInternal which
* causes the commit failed for the first time then succeed.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org