You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2018/01/18 23:48:11 UTC
[40/49] hadoop git commit: MAPREDUCE-7029. FileOutputCommitter is
slow on filesystems lacking recursive delete. Contributed by Karthik
Palaniappan
MAPREDUCE-7029. FileOutputCommitter is slow on filesystems lacking recursive delete. Contributed by Karthik Palaniappan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6e42d058
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6e42d058
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6e42d058
Branch: refs/heads/YARN-7402
Commit: 6e42d058292d9656e9ebc9a47be13280e3c919ea
Parents: 09efdfe
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jan 17 08:14:11 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Jan 17 08:14:11 2018 -0600
----------------------------------------------------------------------
.../lib/output/FileOutputCommitter.java | 22 +++++++++++
.../src/main/resources/mapred-default.xml | 11 ++++++
.../lib/output/TestFileOutputCommitter.java | 39 ++++++++++++++++++--
3 files changed, 68 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e42d058/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
index 86af2cf..cbae575 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
@@ -87,6 +87,17 @@ public class FileOutputCommitter extends PathOutputCommitter {
// default value to be 1 to keep consistent with previous behavior
public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1;
+ // Whether tasks should delete their task temporary directories. This is
+ // purely an optimization for filesystems without O(1) recursive delete, as
+ // commitJob will recursively delete the entire job temporary directory.
+ // HDFS has O(1) recursive delete, so this parameter is left false by default.
+ // Users of object stores, for example, may want to set this to true. Note:
+ // this is only used if mapreduce.fileoutputcommitter.algorithm.version=2
+ public static final String FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED =
+ "mapreduce.fileoutputcommitter.task.cleanup.enabled";
+ public static final boolean
+ FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false;
+
private Path outputPath = null;
private Path workPath = null;
private final int algorithmVersion;
@@ -586,6 +597,17 @@ public class FileOutputCommitter extends PathOutputCommitter {
mergePaths(fs, taskAttemptDirStatus, outputPath);
LOG.info("Saved output of task '" + attemptId + "' to " +
outputPath);
+
+ if (context.getConfiguration().getBoolean(
+ FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED,
+ FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT)) {
+ LOG.debug(String.format(
+ "Deleting the temporary directory of '%s': '%s'",
+ attemptId, taskAttemptPath));
+ if(!fs.delete(taskAttemptPath, true)) {
+ LOG.warn("Could not delete " + taskAttemptPath);
+ }
+ }
}
} else {
LOG.warn("No Output found for " + attemptId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e42d058/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 1e432ce..62f3dfa 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1495,6 +1495,17 @@
</property>
<property>
+ <name>mapreduce.fileoutputcommitter.task.cleanup.enabled</name>
+ <value>false</value>
+ <description>Whether tasks should delete their task temporary directories. This is purely an
+ optimization for filesystems without O(1) recursive delete, as commitJob will recursively delete
+ the entire job temporary directory. HDFS has O(1) recursive delete, so this parameter is left
+ false by default. Users of object stores, for example, may want to set this to true.
+
+ Note: this is only used if mapreduce.fileoutputcommitter.algorithm.version=2</description>
+</property>
+
+<property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value>
<description>The interval in ms at which the MR AppMaster should send
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e42d058/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
index f72aa55..cd9d44b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
@@ -255,13 +255,18 @@ public class TestFileOutputCommitter {
assert(dataFileFound && indexFileFound);
}
- private void testCommitterInternal(int version) throws Exception {
+ private void testCommitterInternal(int version, boolean taskCleanup)
+ throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
- conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ conf.setInt(
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
+ conf.setBoolean(
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED,
+ taskCleanup);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@@ -275,9 +280,30 @@ public class TestFileOutputCommitter {
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext);
+ // check task and job temp directories exist
+ File jobOutputDir = new File(
+ new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME).toString());
+ File taskOutputDir = new File(Path.getPathWithoutSchemeAndAuthority(
+ committer.getWorkPath()).toString());
+ assertTrue("job temp dir does not exist", jobOutputDir.exists());
+ assertTrue("task temp dir does not exist", taskOutputDir.exists());
+
// do commit
committer.commitTask(tContext);
+ assertTrue("job temp dir does not exist", jobOutputDir.exists());
+ if (version == 1 || taskCleanup) {
+ // Task temp dir gets renamed in v1 and deleted if taskCleanup is
+ // enabled in v2
+ assertFalse("task temp dir still exists", taskOutputDir.exists());
+ } else {
+ // By default, in v2 the task temp dir is only deleted during commitJob
+ assertTrue("task temp dir does not exist", taskOutputDir.exists());
+ }
+
+ // Entire job temp directory gets deleted, including task temp dir
committer.commitJob(jContext);
+ assertFalse("job temp dir still exists", jobOutputDir.exists());
+ assertFalse("task temp dir still exists", taskOutputDir.exists());
// validate output
validateContent(outDir);
@@ -286,12 +312,17 @@ public class TestFileOutputCommitter {
@Test
public void testCommitterV1() throws Exception {
- testCommitterInternal(1);
+ testCommitterInternal(1, false);
}
@Test
public void testCommitterV2() throws Exception {
- testCommitterInternal(2);
+ testCommitterInternal(2, false);
+ }
+
+ @Test
+ public void testCommitterV2TaskCleanupEnabled() throws Exception {
+ testCommitterInternal(2, true);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org