You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/01/10 00:04:03 UTC
svn commit: r1431137 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai...
Author: jlowe
Date: Wed Jan 9 23:04:03 2013
New Revision: 1431137
URL: http://svn.apache.org/viewvc?rev=1431137&view=rev
Log:
svn merge -c 1431131 FIXES: MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery. Contributed by Jerry Chen
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1431137&r1=1431136&r2=1431137&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Jan 9 23:04:03 2013
@@ -72,6 +72,9 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4913. TestMRAppMaster#testMRAppMasterMissingStaging occasionally
exits (Jason Lowe via tgraves)
+ MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery (Jerry
+ Chen via jlowe)
+
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1431137&r1=1431136&r2=1431137&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Jan 9 23:04:03 2013
@@ -579,7 +579,7 @@ public class MRAppMaster extends Composi
*/
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryService(appContext.getApplicationAttemptId(),
- appContext.getClock(), getCommitter());
+ appContext.getClock(), getCommitter(), isNewApiCommitter());
}
/** Create and initialize (but don't start) a single job.
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1431137&r1=1431136&r2=1431137&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Wed Jan 9 23:04:03 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -100,6 +101,7 @@ public class RecoveryService extends Com
private final ApplicationAttemptId applicationAttemptId;
private final OutputCommitter committer;
+ private final boolean newApiCommitter;
private final Dispatcher dispatcher;
private final ControlledClock clock;
@@ -113,10 +115,11 @@ public class RecoveryService extends Com
private volatile boolean recoveryMode = false;
public RecoveryService(ApplicationAttemptId applicationAttemptId,
- Clock clock, OutputCommitter committer) {
+ Clock clock, OutputCommitter committer, boolean newApiCommitter) {
super("RecoveringDispatcher");
this.applicationAttemptId = applicationAttemptId;
this.committer = committer;
+ this.newApiCommitter = newApiCommitter;
this.dispatcher = createRecoveryDispatcher();
this.clock = new ControlledClock(clock);
addService((Service) dispatcher);
@@ -360,8 +363,17 @@ public class RecoveryService extends Com
switch (state) {
case SUCCEEDED:
//recover the task output
- TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
- attInfo.getAttemptId());
+
+ // check the committer type and construct corresponding context
+ TaskAttemptContext taskContext = null;
+ if(newApiCommitter) {
+ taskContext = new TaskAttemptContextImpl(getConfig(),
+ attInfo.getAttemptId());
+ } else {
+ taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()),
+ TypeConverter.fromYarn(aId));
+ }
+
try {
TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1431137&r1=1431136&r2=1431137&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Wed Jan 9 23:04:03 2013
@@ -626,6 +626,115 @@ public class TestRecovery {
validateOutput();
}
+ @Test
+ public void testRecoveryWithOldCommiter() throws Exception {
+ int runCount = 0;
+ MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+ true, ++runCount);
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.mapper.new-api", false);
+ conf.setBoolean("mapred.reducer.new-api", false);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask1 = it.next();
+ Task reduceTask1 = it.next();
+
+ // all maps must be running
+ app.waitForState(mapTask1, TaskState.RUNNING);
+
+ TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+ .next();
+
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+
+ //send the done signal to the map
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task1Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for map task to complete
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+ app.waitForState(reduceTask1, TaskState.RUNNING);
+ TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+
+ // write output corresponding to reduce1
+ writeOutput(reduce1Attempt1, conf);
+
+ //send the done signal to the 1st reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ reduce1Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for first reduce task to complete
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+
+ //stop the app before the job completes.
+ app.stop();
+
+ //rerun
+ //in rerun the map will be recovered from previous run
+ app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+ ++runCount);
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ conf.setBoolean("mapred.mapper.new-api", false);
+ conf.setBoolean("mapred.reducer.new-api", false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ it = job.getTasks().values().iterator();
+ mapTask1 = it.next();
+ reduceTask1 = it.next();
+ Task reduceTask2 = it.next();
+
+ // map will be recovered, no need to send done
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port after recovery
+ task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+ // first reduce will be recovered, no need to send done
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+
+ app.waitForState(reduceTask2, TaskState.RUNNING);
+
+ TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
+ .iterator().next();
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
+
+ //send the done signal to the 2nd reduce task
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ reduce2Attempt.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait to get it completed
+ app.waitForState(reduceTask2, TaskState.SUCCEEDED);
+
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+ validateOutput();
+ }
+
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,