You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/02/14 01:08:14 UTC
svn commit: r1243753 - in
/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...
Author: vinodkv
Date: Tue Feb 14 00:08:13 2012
New Revision: 1243753
URL: http://svn.apache.org/viewvc?rev=1243753&view=rev
Log:
MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then the recovery. (vinodkv)
svn merge --ignore-ancestry -c 1243752 ../../trunk/
Modified:
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt Tue Feb 14 00:08:13 2012
@@ -741,6 +741,9 @@ Release 0.23.1 - 2012-02-08
MAPREDUCE-3843. Job summary log file found missing on the RM host
(Anupam Seth via tgraves)
+ MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then
+ the recovery. (vinodkv)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Feb 14 00:08:13 2012
@@ -244,7 +244,7 @@ public class JobHistoryEventHandler exte
while (!stopped && !Thread.currentThread().isInterrupted()) {
// Log the size of the history-event-queue every so often.
- if (eventCounter % 1000 == 0) {
+ if (eventCounter != 0 && eventCounter % 1000 == 0) {
eventCounter = 0;
LOG.info("Size of the JobHistory event queue is "
+ eventQueue.size());
@@ -464,8 +464,10 @@ public class JobHistoryEventHandler exte
}
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID());
- LOG.info("In HistoryEventHandler "
- + event.getHistoryEvent().getEventType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("In HistoryEventHandler "
+ + event.getHistoryEvent().getEventType());
+ }
} catch (IOException e) {
LOG.error("Error writing History Event: " + event.getHistoryEvent(),
e);
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Feb 14 00:08:13 2012
@@ -26,7 +26,6 @@ import java.security.PrivilegedException
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
@@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.TypeC
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
@@ -123,7 +123,7 @@ import org.apache.hadoop.yarn.util.Conve
* The information is shared across different components using AppContext.
*/
-@SuppressWarnings("deprecation")
+@SuppressWarnings("rawtypes")
public class MRAppMaster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
@@ -138,7 +138,7 @@ public class MRAppMaster extends Composi
private final int nmPort;
private final int nmHttpPort;
protected final MRAppMetrics metrics;
- private Set<TaskId> completedTasksFromPreviousRun;
+ private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private List<AMInfo> amInfos;
private AppContext context;
private Dispatcher dispatcher;
@@ -596,7 +596,7 @@ public class MRAppMaster extends Composi
return dispatcher;
}
- public Set<TaskId> getCompletedTaskFromPreviousRun() {
+ public Map<TaskId, TaskInfo> getCompletedTaskFromPreviousRun() {
return completedTasksFromPreviousRun;
}
@@ -737,7 +737,6 @@ public class MRAppMaster extends Composi
return jobs;
}
- @SuppressWarnings("rawtypes")
@Override
public EventHandler getEventHandler() {
return dispatcher.getEventHandler();
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Feb 14 00:08:13 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
@@ -133,7 +134,7 @@ public class JobImpl implements org.apac
private float cleanupWeight = 0.05f;
private float mapWeight = 0.0f;
private float reduceWeight = 0.0f;
- private final Set<TaskId> completedTasksFromPreviousRun;
+ private final Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private final List<AMInfo> amInfos;
private final Lock readLock;
private final Lock writeLock;
@@ -376,7 +377,7 @@ public class JobImpl implements org.apac
TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock,
- Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
+ Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
OutputCommitter committer, boolean newApiCommitter, String userName,
long appSubmitTime, List<AMInfo> amInfos) {
this.applicationAttemptId = applicationAttemptId;
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java Tue Feb 14 00:08:13 2012
@@ -19,13 +19,14 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.Collection;
-import java.util.Set;
+import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -38,7 +39,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
-@SuppressWarnings({ "rawtypes", "deprecation" })
+@SuppressWarnings({ "rawtypes" })
public class MapTaskImpl extends TaskImpl {
private final TaskSplitMetaInfo taskSplitMetaInfo;
@@ -49,7 +50,7 @@ public class MapTaskImpl extends TaskImp
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
- Set<TaskId> completedTasksFromPreviousRun, int startCount,
+ Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics) {
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
conf, taskAttemptListener, committer, jobToken, fsTokens, clock,
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java Tue Feb 14 00:08:13 2012
@@ -19,13 +19,14 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.Collection;
-import java.util.Set;
+import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@@ -37,7 +38,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
-@SuppressWarnings({ "rawtypes", "deprecation" })
+@SuppressWarnings({ "rawtypes" })
public class ReduceTaskImpl extends TaskImpl {
private final int numMapTasks;
@@ -47,7 +48,7 @@ public class ReduceTaskImpl extends Task
int numMapTasks, TaskAttemptListener taskAttemptListener,
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
- Set<TaskId> completedTasksFromPreviousRun, int startCount,
+ Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics) {
super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
taskAttemptListener, committer, jobToken, fsTokens, clock,
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Feb 14 00:08:13 2012
@@ -18,13 +18,14 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -35,8 +36,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
@@ -66,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
@@ -208,8 +213,23 @@ public abstract class TaskImpl implement
private final StateMachine<TaskState, TaskEventType, TaskEvent>
stateMachine;
-
- protected int nextAttemptNumber;
+
+ // By default, the next TaskAttempt number is zero. Changes during recovery
+ protected int nextAttemptNumber = 0;
+ private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
+ new ArrayList<TaskAttemptInfo>();
+
+ private static final class RecoverdAttemptsComparator implements
+ Comparator<TaskAttemptInfo> {
+ @Override
+ public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
+ long diff = attempt1.getStartTime() - attempt2.getStartTime();
+ return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
+ }
+ }
+
+ private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
+ new RecoverdAttemptsComparator();
//should be set to one which comes first
//saying COMMIT_PENDING
@@ -230,7 +250,7 @@ public abstract class TaskImpl implement
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
- Set<TaskId> completedTasksFromPreviousRun, int startCount,
+ Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics) {
this.conf = conf;
this.clock = clock;
@@ -243,10 +263,7 @@ public abstract class TaskImpl implement
// have a convention that none of the overrides depends on any
// fields that need initialization.
maxAttempts = getMaxAttempts();
- taskId = recordFactory.newRecordInstance(TaskId.class);
- taskId.setJobId(jobId);
- taskId.setId(partition);
- taskId.setTaskType(taskType);
+ taskId = MRBuilderUtils.newTaskId(jobId, partition, taskType);
this.partition = partition;
this.taskAttemptListener = taskAttemptListener;
this.eventHandler = eventHandler;
@@ -255,18 +272,38 @@ public abstract class TaskImpl implement
this.jobToken = jobToken;
this.metrics = metrics;
+ // See if this is from a previous generation.
if (completedTasksFromPreviousRun != null
- && completedTasksFromPreviousRun.contains(taskId)) {
+ && completedTasksFromPreviousRun.containsKey(taskId)) {
+ // This task has TaskAttempts from previous generation. We have to replay
+ // them.
LOG.info("Task is from previous run " + taskId);
- startCount = startCount - 1;
+ TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
+ Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
+ taskInfo.getAllTaskAttempts();
+ taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
+ taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
+ Collections.sort(taskAttemptsFromPreviousGeneration,
+ RECOVERED_ATTEMPTS_COMPARATOR);
+ }
+
+ if (taskAttemptsFromPreviousGeneration.isEmpty()) {
+ // All the previous attempts are exhausted, now start with a new
+ // generation.
+
+ // All the new TaskAttemptIDs are generated based on MR
+ // ApplicationAttemptID so that attempts from previous lives don't
+ // over-step the current one. This assumes that a task won't have more
+ // than 1000 attempts in its single generation, which is very reasonable.
+ // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
+ // and requires serious medical attention.
+ nextAttemptNumber = (startCount - 1) * 1000;
+ } else {
+ // There are still some TaskAttempts from previous generation, use them
+ nextAttemptNumber =
+ taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
}
- //attempt ids are generated based on MR app startCount so that attempts
- //from previous lives don't overstep the current one.
- //this assumes that a task won't have more than 1000 attempts in its single
- //life
- nextAttemptNumber = (startCount - 1) * 1000;
-
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
@@ -390,17 +427,23 @@ public abstract class TaskImpl implement
//this is always called in read/write lock
private long getLaunchTime() {
- long launchTime = 0;
+ long taskLaunchTime = 0;
+ boolean launchTimeSet = false;
for (TaskAttempt at : attempts.values()) {
- //select the least launch time of all attempts
- if (launchTime == 0 || launchTime > at.getLaunchTime()) {
- launchTime = at.getLaunchTime();
+ // select the least launch time of all attempts
+ long attemptLaunchTime = at.getLaunchTime();
+ if (attemptLaunchTime != 0 && !launchTimeSet) {
+ // For the first non-zero launch time
+ launchTimeSet = true;
+ taskLaunchTime = attemptLaunchTime;
+ } else if (attemptLaunchTime != 0 && taskLaunchTime > attemptLaunchTime) {
+ taskLaunchTime = attemptLaunchTime;
}
}
- if (launchTime == 0) {
+ if (!launchTimeSet) {
return this.scheduledTime;
}
- return launchTime;
+ return taskLaunchTime;
}
//this is always called in read/write lock
@@ -525,7 +568,16 @@ public abstract class TaskImpl implement
attempts.put(attempt.getID(), attempt);
break;
}
- ++nextAttemptNumber;
+
+ // Update nextATtemptNumber
+ if (taskAttemptsFromPreviousGeneration.isEmpty()) {
+ ++nextAttemptNumber;
+ } else {
+ // There are still some TaskAttempts from previous generation, use them
+ nextAttemptNumber =
+ taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
+ }
+
++numberUncompletedAttempts;
//schedule the nextAttemptNumber
if (failedAttempts > 0) {
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java Tue Feb 14 00:08:13 2012
@@ -19,8 +19,9 @@
package org.apache.hadoop.mapreduce.v2.app.recover;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.yarn.Clock;
@@ -32,7 +33,7 @@ public interface Recovery {
Clock getClock();
- Set<TaskId> getCompletedTasks();
+ Map<TaskId, TaskInfo> getCompletedTasks();
List<AMInfo> getAMInfos();
}
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue Feb 14 00:08:13 2012
@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@@ -153,8 +153,8 @@ public class RecoveryService extends Com
}
@Override
- public Set<TaskId> getCompletedTasks() {
- return completedTasks.keySet();
+ public Map<TaskId, TaskInfo> getCompletedTasks() {
+ return completedTasks;
}
@Override
@@ -189,7 +189,8 @@ public class RecoveryService extends Com
getConfig());
//read the previous history file
historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
- histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));
+ histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));
+ LOG.info("History file is at " + historyFile);
in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse();
@@ -242,7 +243,7 @@ public class RecoveryService extends Com
if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
.getTaskAttemptID());
- LOG.info("Attempt start time " + attInfo.getStartTime());
+ LOG.info("Recovered Attempt start time " + attInfo.getStartTime());
clock.setTime(attInfo.getStartTime());
} else if (event.getType() == TaskAttemptEventType.TA_DONE
@@ -250,7 +251,7 @@ public class RecoveryService extends Com
|| event.getType() == TaskAttemptEventType.TA_KILL) {
TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
.getTaskAttemptID());
- LOG.info("Attempt finish time " + attInfo.getFinishTime());
+ LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime());
clock.setTime(attInfo.getFinishTime());
}
@@ -380,17 +381,17 @@ public class RecoveryService extends Com
}
// send the done event
- LOG.info("Sending done event to " + aId);
+ LOG.info("Sending done event to recovered attempt " + aId);
actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_DONE));
break;
case KILLED:
- LOG.info("Sending kill event to " + aId);
+ LOG.info("Sending kill event to recovered attempt " + aId);
actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_KILL));
break;
default:
- LOG.info("Sending fail event to " + aId);
+ LOG.info("Sending fail event to recovered attempt " + aId);
actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_FAILMSG));
break;
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Feb 14 00:08:13 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.util.ReflectionUtils;
@@ -74,7 +76,14 @@ public class TestRecovery {
private Text val1 = new Text("val1");
private Text val2 = new Text("val2");
-
+ /**
+ * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
+ * completely disappears because of failed launch, one attempt gets killed and
+ * one attempt succeeds. AM crashes after the first tasks finishes and
+ * recovers completely and succeeds in the second generation.
+ *
+ * @throws Exception
+ */
@Test
public void testCrashed() throws Exception {
@@ -112,7 +121,8 @@ public class TestRecovery {
// reduces must be in NEW state
Assert.assertEquals("Reduce Task state not correct",
TaskState.RUNNING, reduceTask.getReport().getTaskState());
-
+
+ /////////// Play some games with the TaskAttempts of the first task //////
//send the fail signal to the 1st map task attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
@@ -120,42 +130,68 @@ public class TestRecovery {
TaskAttemptEventType.TA_FAILMSG));
app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
-
- while (mapTask1.getAttempts().size() != 2) {
+
+ int timeOut = 0;
+ while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
Thread.sleep(2000);
LOG.info("Waiting for next attempt to start");
}
+ Assert.assertEquals(2, mapTask1.getAttempts().size());
Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
itr.next();
TaskAttempt task1Attempt2 = itr.next();
- app.waitForState(task1Attempt2, TaskAttemptState.RUNNING);
+ // This attempt will automatically fail because of the way ContainerLauncher
+ // is setup
+ // This attempt 'disappears' from JobHistory and so causes MAPREDUCE-3846
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(task1Attempt2.getID(),
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
+ app.waitForState(task1Attempt2, TaskAttemptState.FAILED);
- //send the kill signal to the 1st map 2nd attempt
+ timeOut = 0;
+ while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) {
+ Thread.sleep(2000);
+ LOG.info("Waiting for next attempt to start");
+ }
+ Assert.assertEquals(3, mapTask1.getAttempts().size());
+ itr = mapTask1.getAttempts().values().iterator();
+ itr.next();
+ itr.next();
+ TaskAttempt task1Attempt3 = itr.next();
+
+ app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
+
+ //send the kill signal to the 1st map 3rd attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
- task1Attempt2.getID(),
+ task1Attempt3.getID(),
TaskAttemptEventType.TA_KILL));
- app.waitForState(task1Attempt2, TaskAttemptState.KILLED);
-
- while (mapTask1.getAttempts().size() != 3) {
+ app.waitForState(task1Attempt3, TaskAttemptState.KILLED);
+
+ timeOut = 0;
+ while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) {
Thread.sleep(2000);
LOG.info("Waiting for next attempt to start");
}
+ Assert.assertEquals(4, mapTask1.getAttempts().size());
itr = mapTask1.getAttempts().values().iterator();
itr.next();
itr.next();
- TaskAttempt task1Attempt3 = itr.next();
+ itr.next();
+ TaskAttempt task1Attempt4 = itr.next();
- app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
+ app.waitForState(task1Attempt4, TaskAttemptState.RUNNING);
- //send the done signal to the 1st map 3rd attempt
+ //send the done signal to the 1st map 4th attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
- task1Attempt3.getID(),
+ task1Attempt4.getID(),
TaskAttemptEventType.TA_DONE));
+ /////////// End of games with the TaskAttempts of the first task //////
+
//wait for first map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
long task1StartTime = mapTask1.getReport().getStartTime();
@@ -552,7 +588,7 @@ public class TestRecovery {
}
- class MRAppWithHistory extends MRApp {
+ static class MRAppWithHistory extends MRApp {
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) {
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
@@ -567,7 +603,17 @@ public class TestRecovery {
@Override
protected ContainerLauncher createContainerLauncher(AppContext context) {
- MockContainerLauncher launcher = new MockContainerLauncher();
+ MockContainerLauncher launcher = new MockContainerLauncher() {
+ @Override
+ public void handle(ContainerLauncherEvent event) {
+ TaskAttemptId taskAttemptID = event.getTaskAttemptID();
+ // Pass everything except the 2nd attempt of the first task.
+ if (taskAttemptID.getId() != 1
+ || taskAttemptID.getTaskId().getId() != 0) {
+ super.handle(event);
+ }
+ }
+ };
launcher.shufflePort = 5467;
return launcher;
}
@@ -581,7 +627,7 @@ public class TestRecovery {
}
}
- class RecoveryServiceWithCustomDispatcher extends RecoveryService {
+ static class RecoveryServiceWithCustomDispatcher extends RecoveryService {
public RecoveryServiceWithCustomDispatcher(
ApplicationAttemptId applicationAttemptId, Clock clock,
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Tue Feb 14 00:08:13 2012
@@ -25,7 +25,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -72,7 +73,7 @@ public class TestTaskImpl {
private Path remoteJobConfFile;
private Collection<Token<? extends TokenIdentifier>> fsTokens;
private Clock clock;
- private Set<TaskId> completedTasksFromPreviousRun;
+ private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private MRAppMetrics metrics;
private TaskImpl mockTask;
private ApplicationId appId;
@@ -96,7 +97,7 @@ public class TestTaskImpl {
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
- Set<TaskId> completedTasksFromPreviousRun, int startCount,
+ Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics) {
super(jobId, taskType , partition, eventHandler,
remoteJobConfFile, conf, taskAttemptListener, committer,
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Tue Feb 14 00:08:13 2012
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-@SuppressWarnings("deprecation")
public class TypeConverter {
private static RecordFactory recordFactory;
@@ -116,8 +115,8 @@ public class TypeConverter {
}
public static org.apache.hadoop.mapred.TaskID fromYarn(TaskId id) {
- return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()), fromYarn(id.getTaskType()),
- id.getId());
+ return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()),
+ fromYarn(id.getTaskType()), id.getId());
}
public static TaskId toYarn(org.apache.hadoop.mapreduce.TaskID id) {