You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2013/08/22 03:53:18 UTC
svn commit: r1516359 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/j...
Author: acmurthy
Date: Thu Aug 22 01:53:18 2013
New Revision: 1516359
URL: http://svn.apache.org/r1516359
Log:
Merge -c 1516358 from trunk to branch-2 to fix MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. Contributed by Vinod K. V.
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1516359&r1=1516358&r2=1516359&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Aug 22 01:53:18 2013
@@ -94,6 +94,9 @@ Release 2.1.1-beta - UNRELEASED
pick up the right history file for the last successful AM. (Jian He via
vinodkv)
+ MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. (vinodkv via
+ acmurthy)
+
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1516359&r1=1516358&r2=1516359&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Aug 22 01:53:18 2013
@@ -1042,11 +1042,11 @@ public class MRAppMaster extends Composi
// attempt will generate one. However that disables recovery if there
// are reducers as the shuffle secret would be app attempt specific.
int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
- boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
- TokenCache.getShuffleSecretKey(jobCredentials) != null);
+ boolean shuffleKeyValidForRecovery =
+ TokenCache.getShuffleSecretKey(jobCredentials) != null;
if (recoveryEnabled && recoverySupportedByCommitter
- && shuffleKeyValidForRecovery) {
+ && (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
LOG.info("Recovery is enabled. "
+ "Will try to recover from previous life on best effort basis.");
try {
@@ -1059,7 +1059,8 @@ public class MRAppMaster extends Composi
} else {
LOG.info("Will not try to recover. recoveryEnabled: "
+ recoveryEnabled + " recoverySupportedByCommitter: "
- + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
+ + recoverySupportedByCommitter + " numReduceTasks: "
+ + numReduceTasks + " shuffleKeyValidForRecovery: "
+ shuffleKeyValidForRecovery + " ApplicationAttemptID: "
+ appAttemptID.getAttemptId());
// Get the amInfos anyways whether recovery is enabled or not
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1516359&r1=1516358&r2=1516359&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Thu Aug 22 01:53:18 2013
@@ -114,7 +114,6 @@ public class TestRecovery {
private Text val1 = new Text("val1");
private Text val2 = new Text("val2");
-
/**
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
* completely disappears because of failed launch, one attempt gets killed and
@@ -316,6 +315,116 @@ public class TestRecovery {
// available in the failed attempt should be available here
}
+ /**
+ * AM with 3 maps and 0 reduce. AM crashes after the first two tasks finishes
+ * and recovers completely and succeeds in the second generation.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCrashOfMapsOnlyJob() throws Exception {
+ int runCount = 0;
+ MRApp app =
+ new MRAppWithHistory(3, 0, false, this.getClass().getName(), true,
+ ++runCount);
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+
+ // all maps would be running
+ Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask1 = it.next();
+ Task mapTask2 = it.next();
+ Task mapTask3 = it.next();
+
+ // all maps must be running
+ app.waitForState(mapTask1, TaskState.RUNNING);
+ app.waitForState(mapTask2, TaskState.RUNNING);
+ app.waitForState(mapTask3, TaskState.RUNNING);
+
+ TaskAttempt task1Attempt =
+ mapTask1.getAttempts().values().iterator().next();
+ TaskAttempt task2Attempt =
+ mapTask2.getAttempts().values().iterator().next();
+ TaskAttempt task3Attempt =
+ mapTask3.getAttempts().values().iterator().next();
+
+ // before sending the TA_DONE, event make sure attempt has come to
+ // RUNNING state
+ app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
+ app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+ app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+ // send the done signal to the 1st two maps
+ app
+ .getContext()
+ .getEventHandler()
+ .handle(
+ new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE));
+ app
+ .getContext()
+ .getEventHandler()
+ .handle(
+ new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE));
+
+ // wait for first two map task to complete
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+ app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+ // stop the app
+ app.stop();
+
+ // rerun
+ // in rerun the 1st two map will be recovered from previous run
+ app =
+ new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+ ++runCount);
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ // Set num-reduces explicitly in conf as recovery logic depends on it.
+ conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+
+ Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+ it = job.getTasks().values().iterator();
+ mapTask1 = it.next();
+ mapTask2 = it.next();
+ mapTask3 = it.next();
+
+ // first two maps will be recovered, no need to send done
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+ app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+ app.waitForState(mapTask3, TaskState.RUNNING);
+
+ task3Attempt = mapTask3.getAttempts().values().iterator().next();
+ // before sending the TA_DONE, event make sure attempt has come to
+ // RUNNING state
+ app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+ // send the done signal to the 3rd map task
+ app
+ .getContext()
+ .getEventHandler()
+ .handle(
+ new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next()
+ .getID(), TaskAttemptEventType.TA_DONE));
+
+ // wait to get it completed
+ app.waitForState(mapTask3, TaskState.SUCCEEDED);
+
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+ }
+
@Test
public void testMultipleCrashes() throws Exception {