You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2018/01/18 23:48:11 UTC

[40/49] hadoop git commit: MAPREDUCE-7029. FileOutputCommitter is slow on filesystems lacking recursive delete. Contributed by Karthik Palaniappan

MAPREDUCE-7029. FileOutputCommitter is slow on filesystems lacking recursive delete. Contributed by Karthik Palaniappan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6e42d058
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6e42d058
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6e42d058

Branch: refs/heads/YARN-7402
Commit: 6e42d058292d9656e9ebc9a47be13280e3c919ea
Parents: 09efdfe
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jan 17 08:14:11 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Jan 17 08:14:11 2018 -0600

----------------------------------------------------------------------
 .../lib/output/FileOutputCommitter.java         | 22 +++++++++++
 .../src/main/resources/mapred-default.xml       | 11 ++++++
 .../lib/output/TestFileOutputCommitter.java     | 39 ++++++++++++++++++--
 3 files changed, 68 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e42d058/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
index 86af2cf..cbae575 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
@@ -87,6 +87,17 @@ public class FileOutputCommitter extends PathOutputCommitter {
   // default value to be 1 to keep consistent with previous behavior
   public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1;
 
+  // Whether tasks should delete their task temporary directories. This is
+  // purely an optimization for filesystems without O(1) recursive delete, as
+  // commitJob will recursively delete the entire job temporary directory.
+  // HDFS has O(1) recursive delete, so this parameter is left false by default.
+  // Users of object stores, for example, may want to set this to true. Note:
+  // this is only used if mapreduce.fileoutputcommitter.algorithm.version=2
+  public static final String FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED =
+      "mapreduce.fileoutputcommitter.task.cleanup.enabled";
+  public static final boolean
+      FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false;
+
   private Path outputPath = null;
   private Path workPath = null;
   private final int algorithmVersion;
@@ -586,6 +597,17 @@ public class FileOutputCommitter extends PathOutputCommitter {
           mergePaths(fs, taskAttemptDirStatus, outputPath);
           LOG.info("Saved output of task '" + attemptId + "' to " +
               outputPath);
+
+          if (context.getConfiguration().getBoolean(
+              FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED,
+              FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT)) {
+            LOG.debug(String.format(
+                "Deleting the temporary directory of '%s': '%s'",
+                attemptId, taskAttemptPath));
+            if(!fs.delete(taskAttemptPath, true)) {
+              LOG.warn("Could not delete " + taskAttemptPath);
+            }
+          }
         }
       } else {
         LOG.warn("No Output found for " + attemptId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e42d058/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 1e432ce..62f3dfa 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1495,6 +1495,17 @@
 </property>
 
 <property>
+  <name>mapreduce.fileoutputcommitter.task.cleanup.enabled</name>
+  <value>false</value>
+  <description>Whether tasks should delete their task temporary directories. This is purely an
+    optimization for filesystems without O(1) recursive delete, as commitJob will recursively delete
+    the entire job temporary directory. HDFS has O(1) recursive delete, so this parameter is left
+    false by default. Users of object stores, for example, may want to set this to true.
+
+    Note: this is only used if mapreduce.fileoutputcommitter.algorithm.version=2</description>
+</property>
+
+<property>
   <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
   <value>1000</value>
   <description>The interval in ms at which the MR AppMaster should send

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e42d058/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
index f72aa55..cd9d44b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
@@ -255,13 +255,18 @@ public class TestFileOutputCommitter {
     assert(dataFileFound && indexFileFound);
   }
 
-  private void testCommitterInternal(int version) throws Exception {
+  private void testCommitterInternal(int version, boolean taskCleanup)
+      throws Exception {
     Job job = Job.getInstance();
     FileOutputFormat.setOutputPath(job, outDir);
     Configuration conf = job.getConfiguration();
     conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
-    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+    conf.setInt(
+        FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
         version);
+    conf.setBoolean(
+        FileOutputCommitter.FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED,
+        taskCleanup);
     JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
     FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@@ -275,9 +280,30 @@ public class TestFileOutputCommitter {
     RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
     writeOutput(theRecordWriter, tContext);
 
+    // check task and job temp directories exist
+    File jobOutputDir = new File(
+        new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME).toString());
+    File taskOutputDir = new File(Path.getPathWithoutSchemeAndAuthority(
+        committer.getWorkPath()).toString());
+    assertTrue("job temp dir does not exist", jobOutputDir.exists());
+    assertTrue("task temp dir does not exist", taskOutputDir.exists());
+
     // do commit
     committer.commitTask(tContext);
+    assertTrue("job temp dir does not exist", jobOutputDir.exists());
+    if (version == 1 || taskCleanup) {
+      // Task temp dir gets renamed in v1 and deleted if taskCleanup is
+      // enabled in v2
+      assertFalse("task temp dir still exists", taskOutputDir.exists());
+    } else {
+      // By default, in v2 the task temp dir is only deleted during commitJob
+      assertTrue("task temp dir does not exist", taskOutputDir.exists());
+    }
+
+    // Entire job temp directory gets deleted, including task temp dir
     committer.commitJob(jContext);
+    assertFalse("job temp dir still exists", jobOutputDir.exists());
+    assertFalse("task temp dir still exists", taskOutputDir.exists());
 
     // validate output
     validateContent(outDir);
@@ -286,12 +312,17 @@ public class TestFileOutputCommitter {
 
   @Test
   public void testCommitterV1() throws Exception {
-    testCommitterInternal(1);
+    testCommitterInternal(1, false);
   }
 
   @Test
   public void testCommitterV2() throws Exception {
-    testCommitterInternal(2);
+    testCommitterInternal(2, false);
+  }
+
+  @Test
+  public void testCommitterV2TaskCleanupEnabled() throws Exception {
+    testCommitterInternal(2, true);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org