You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/01/19 21:05:17 UTC
hadoop git commit: MAPREDUCE-6984. MR AM to clean up temporary files
from previous attempt in case of no recovery. (Gergo Repas via Haibo Chen)
Repository: hadoop
Updated Branches:
refs/heads/trunk 4aca4ff75 -> cce71dcee
MAPREDUCE-6984. MR AM to clean up temporary files from previous attempt in case of no recovery. (Gergo Repas via Haibo Chen)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cce71dce
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cce71dce
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cce71dce
Branch: refs/heads/trunk
Commit: cce71dceef9e82d31fe8ec59648b2a4a50c8869a
Parents: 4aca4ff
Author: Haibo Chen <ha...@apache.org>
Authored: Fri Jan 19 12:56:17 2018 -0800
Committer: Haibo Chen <ha...@apache.org>
Committed: Fri Jan 19 12:56:17 2018 -0800
----------------------------------------------------------------------
.../hadoop/mapreduce/v2/app/MRAppMaster.java | 23 ++++++
.../hadoop/mapreduce/v2/app/TestRecovery.java | 82 ++++++++++++++++++++
2 files changed, 105 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cce71dce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index df61928..e6a45cf 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@@ -54,6 +55,7 @@ import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -1217,6 +1219,7 @@ public class MRAppMaster extends CompositeService {
amInfos = new LinkedList<AMInfo>();
completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
processRecovery();
+ cleanUpPreviousJobOutput();
// Current an AMInfo for the current AM generation.
AMInfo amInfo =
@@ -1395,6 +1398,26 @@ public class MRAppMaster extends CompositeService {
return true;
}
+ private void cleanUpPreviousJobOutput() {
+ // recovered application masters should not remove data from previous job
+ if (!recovered()) {
+ JobContext jobContext = getJobContextFromConf(getConfig());
+ try {
+ LOG.info("Starting to clean up previous job's temporary files");
+ this.committer.abortJob(jobContext, State.FAILED);
+ LOG.info("Finished cleaning up previous job temporary files");
+ } catch (FileNotFoundException e) {
+ LOG.info("Previous job temporary files do not exist, " +
+ "no clean up was necessary.");
+ } catch (Exception e) {
+ // the clean up of a previous attempt is not critical to the success
+ // of this job - only logging the error
+ LOG.error("Error while trying to clean up previous job's temporary " +
+ "files", e);
+ }
+ }
+ }
+
private static FSDataInputStream getPreviousJobHistoryStream(
Configuration conf, ApplicationAttemptId appAttemptId)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cce71dce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
index 17e07b1..893c4a0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
@@ -19,9 +19,11 @@
package org.apache.hadoop.mapreduce.v2.app;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
+
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -43,6 +45,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobID;
@@ -452,6 +455,8 @@ public class TestRecovery {
public static class TestFileOutputCommitter extends
org.apache.hadoop.mapred.FileOutputCommitter {
+ private boolean abortJobCalled;
+
@Override
public boolean isRecoverySupported(
org.apache.hadoop.mapred.JobContext jobContext) {
@@ -462,6 +467,16 @@ public class TestRecovery {
}
return isRecoverySupported;
}
+
+ @Override
+ public void abortJob(JobContext context, int runState) throws IOException {
+ super.abortJob(context, runState);
+ this.abortJobCalled = true;
+ }
+
+ private boolean isAbortJobCalled() {
+ return this.abortJobCalled;
+ }
}
/**
@@ -1010,6 +1025,73 @@ public class TestRecovery {
}
@Test
+ public void testPreviousJobOutputCleanedWhenNoRecovery() throws Exception {
+ int runCount = 0;
+ MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+ true, ++runCount);
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
+ conf.setClass("mapred.output.committer.class",
+ TestFileOutputCommitter.class,
+ org.apache.hadoop.mapred.OutputCommitter.class);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+ //stop the app before the job completes.
+ app.stop();
+ app.close();
+
+ //rerun
+ app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+ ++runCount);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+ TestFileOutputCommitter committer = (
+ TestFileOutputCommitter) app.getCommitter();
+ assertTrue("commiter.abortJob() has not been called",
+ committer.isAbortJobCalled());
+ app.close();
+ }
+
+ @Test
+ public void testPreviousJobIsNotCleanedWhenRecovery()
+ throws Exception {
+ int runCount = 0;
+ MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+ true, ++runCount);
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ conf.setClass("mapred.output.committer.class",
+ TestFileOutputCommitter.class,
+ org.apache.hadoop.mapred.OutputCommitter.class);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ // TestFileOutputCommitter supports recovery if want.am.recovery=true
+ conf.setBoolean("want.am.recovery", true);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+ //stop the app before the job completes.
+ app.stop();
+ app.close();
+
+ //rerun
+ app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+ ++runCount);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+ TestFileOutputCommitter committer = (
+ TestFileOutputCommitter) app.getCommitter();
+ assertFalse("commiter.abortJob() has been called",
+ committer.isAbortJobCalled());
+ app.close();
+ }
+
+ @Test
public void testOutputRecoveryMapsOnly() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org