You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/01/23 09:55:46 UTC

[16/50] [abbrv] hadoop git commit: MAPREDUCE-6984. MR AM to clean up temporary files from previous attempt in case of no recovery. (Gergo Repas via Haibo Chen)

MAPREDUCE-6984. MR AM to clean up temporary files from previous attempt in case of no recovery. (Gergo Repas via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cce71dce
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cce71dce
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cce71dce

Branch: refs/heads/YARN-6592
Commit: cce71dceef9e82d31fe8ec59648b2a4a50c8869a
Parents: 4aca4ff
Author: Haibo Chen <ha...@apache.org>
Authored: Fri Jan 19 12:56:17 2018 -0800
Committer: Haibo Chen <ha...@apache.org>
Committed: Fri Jan 19 12:56:17 2018 -0800

----------------------------------------------------------------------
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    | 23 ++++++
 .../hadoop/mapreduce/v2/app/TestRecovery.java   | 82 ++++++++++++++++++++
 2 files changed, 105 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cce71dce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index df61928..e6a45cf 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -54,6 +55,7 @@ import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -1217,6 +1219,7 @@ public class MRAppMaster extends CompositeService {
     amInfos = new LinkedList<AMInfo>();
     completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
     processRecovery();
+    cleanUpPreviousJobOutput();
 
     // Current an AMInfo for the current AM generation.
     AMInfo amInfo =
@@ -1395,6 +1398,26 @@ public class MRAppMaster extends CompositeService {
     return true;
   }
 
+  private void cleanUpPreviousJobOutput() {
+    // recovered application masters should not remove data from previous job
+    if (!recovered()) {
+      JobContext jobContext = getJobContextFromConf(getConfig());
+      try {
+        LOG.info("Starting to clean up previous job's temporary files");
+        this.committer.abortJob(jobContext, State.FAILED);
+        LOG.info("Finished cleaning up previous job temporary files");
+      } catch (FileNotFoundException e) {
+        LOG.info("Previous job temporary files do not exist, " +
+            "no clean up was necessary.");
+      } catch (Exception e) {
+        // the clean up of a previous attempt is not critical to the success
+        // of this job - only logging the error
+        LOG.error("Error while trying to clean up previous job's temporary " +
+            "files", e);
+      }
+    }
+  }
+
   private static FSDataInputStream getPreviousJobHistoryStream(
       Configuration conf, ApplicationAttemptId appAttemptId)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cce71dce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
index 17e07b1..893c4a0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
@@ -19,9 +19,11 @@
 package org.apache.hadoop.mapreduce.v2.app;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.mock;
+
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -43,6 +45,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.JobID;
@@ -452,6 +455,8 @@ public class TestRecovery {
   public static class TestFileOutputCommitter extends
       org.apache.hadoop.mapred.FileOutputCommitter {
 
+    private boolean abortJobCalled;
+
     @Override
     public boolean isRecoverySupported(
         org.apache.hadoop.mapred.JobContext jobContext) {
@@ -462,6 +467,16 @@ public class TestRecovery {
       }
       return isRecoverySupported;
     }
+
+    @Override
+    public void abortJob(JobContext context, int runState) throws IOException {
+      super.abortJob(context, runState);
+      this.abortJobCalled = true;
+    }
+
+    private boolean isAbortJobCalled() {
+      return this.abortJobCalled;
+    }
   }
 
   /**
@@ -1010,6 +1025,73 @@ public class TestRecovery {
   }
 
   @Test
+  public void testPreviousJobOutputCleanedWhenNoRecovery() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
+    conf.setClass("mapred.output.committer.class",
+        TestFileOutputCommitter.class,
+        org.apache.hadoop.mapred.OutputCommitter.class);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    //stop the app before the job completes.
+    app.stop();
+    app.close();
+
+    //rerun
+    app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+        ++runCount);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    TestFileOutputCommitter committer = (
+        TestFileOutputCommitter) app.getCommitter();
+    assertTrue("commiter.abortJob() has not been called",
+        committer.isAbortJobCalled());
+    app.close();
+  }
+
+  @Test
+  public void testPreviousJobIsNotCleanedWhenRecovery()
+      throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setClass("mapred.output.committer.class",
+        TestFileOutputCommitter.class,
+        org.apache.hadoop.mapred.OutputCommitter.class);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    // TestFileOutputCommitter supports recovery if want.am.recovery=true
+    conf.setBoolean("want.am.recovery", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    //stop the app before the job completes.
+    app.stop();
+    app.close();
+
+    //rerun
+    app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+        ++runCount);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    TestFileOutputCommitter committer = (
+        TestFileOutputCommitter) app.getCommitter();
+    assertFalse("commiter.abortJob() has been called",
+        committer.isAbortJobCalled());
+    app.close();
+  }
+
+  @Test
   public void testOutputRecoveryMapsOnly() throws Exception {
     int runCount = 0;
     MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org