You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2013/04/13 01:05:45 UTC
svn commit: r1467511 [2/4] - in
/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-ap...
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Apr 12 23:05:28 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
@@ -37,7 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
@@ -69,8 +70,10 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
@@ -152,6 +155,12 @@ public abstract class TaskImpl implement
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
.addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
TaskEventType.T_KILL, new KillNewTransition())
+ .addTransition(TaskStateInternal.NEW,
+ EnumSet.of(TaskStateInternal.FAILED,
+ TaskStateInternal.KILLED,
+ TaskStateInternal.RUNNING,
+ TaskStateInternal.SUCCEEDED),
+ TaskEventType.T_RECOVER, new RecoverTransition())
// Transitions from SCHEDULED state
//when the first attempt is launched, the task state is set to RUNNING
@@ -250,20 +259,16 @@ public abstract class TaskImpl implement
// By default, the next TaskAttempt number is zero. Changes during recovery
protected int nextAttemptNumber = 0;
- private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
- new ArrayList<TaskAttemptInfo>();
- private static final class RecoverdAttemptsComparator implements
- Comparator<TaskAttemptInfo> {
- @Override
- public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
- long diff = attempt1.getStartTime() - attempt2.getStartTime();
- return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
- }
- }
-
- private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
- new RecoverdAttemptsComparator();
+ // For sorting task attempts by completion time
+ private static final Comparator<TaskAttemptInfo> TA_INFO_COMPARATOR =
+ new Comparator<TaskAttemptInfo>() {
+ @Override
+ public int compare(TaskAttemptInfo a, TaskAttemptInfo b) {
+ long diff = a.getFinishTime() - b.getFinishTime();
+ return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
+ }
+ };
@Override
public TaskState getState() {
@@ -280,8 +285,7 @@ public abstract class TaskImpl implement
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
- Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
- MRAppMetrics metrics, AppContext appContext) {
+ int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
this.conf = conf;
this.clock = clock;
this.jobFile = remoteJobConfFile;
@@ -307,41 +311,15 @@ public abstract class TaskImpl implement
this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
- // See if this is from a previous generation.
- if (completedTasksFromPreviousRun != null
- && completedTasksFromPreviousRun.containsKey(taskId)) {
- // This task has TaskAttempts from previous generation. We have to replay
- // them.
- LOG.info("Task is from previous run " + taskId);
- TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
- Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
- taskInfo.getAllTaskAttempts();
- taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
- taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
- Collections.sort(taskAttemptsFromPreviousGeneration,
- RECOVERED_ATTEMPTS_COMPARATOR);
- }
-
- if (taskAttemptsFromPreviousGeneration.isEmpty()) {
- // All the previous attempts are exhausted, now start with a new
- // generation.
-
- // All the new TaskAttemptIDs are generated based on MR
- // ApplicationAttemptID so that attempts from previous lives don't
- // over-step the current one. This assumes that a task won't have more
- // than 1000 attempts in its single generation, which is very reasonable.
- // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
- // and requires serious medical attention.
- nextAttemptNumber = (startCount - 1) * 1000;
- } else {
- // There are still some TaskAttempts from previous generation, use them
- nextAttemptNumber =
- taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
- }
-
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
+
+ // All the new TaskAttemptIDs are generated based on MR
+ // ApplicationAttemptID so that attempts from previous lives don't
+ // over-step the current one. This assumes that a task won't have more
+ // than 1000 attempts in its single generation, which is very reasonable.
+ nextAttemptNumber = (appAttemptId - 1) * 1000;
}
@Override
@@ -600,14 +578,28 @@ public abstract class TaskImpl implement
// This is always called in the Write Lock
private void addAndScheduleAttempt(Avataar avataar) {
- TaskAttempt attempt = createAttempt();
- ((TaskAttemptImpl) attempt).setAvataar(avataar);
+ TaskAttempt attempt = addAttempt(avataar);
+ inProgressAttempts.add(attempt.getID());
+ //schedule the nextAttemptNumber
+ if (failedAttempts.size() > 0) {
+ eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+ TaskAttemptEventType.TA_RESCHEDULE));
+ } else {
+ eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+ TaskAttemptEventType.TA_SCHEDULE));
+ }
+ }
+
+ private TaskAttemptImpl addAttempt(Avataar avataar) {
+ TaskAttemptImpl attempt = createAttempt();
+ attempt.setAvataar(avataar);
if (LOG.isDebugEnabled()) {
LOG.debug("Created attempt " + attempt.getID());
}
switch (attempts.size()) {
case 0:
- attempts = Collections.singletonMap(attempt.getID(), attempt);
+ attempts = Collections.singletonMap(attempt.getID(),
+ (TaskAttempt) attempt);
break;
case 1:
@@ -623,24 +615,8 @@ public abstract class TaskImpl implement
break;
}
- // Update nextATtemptNumber
- if (taskAttemptsFromPreviousGeneration.isEmpty()) {
- ++nextAttemptNumber;
- } else {
- // There are still some TaskAttempts from previous generation, use them
- nextAttemptNumber =
- taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
- }
-
- inProgressAttempts.add(attempt.getID());
- //schedule the nextAttemptNumber
- if (failedAttempts.size() > 0) {
- eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
- TaskAttemptEventType.TA_RESCHEDULE));
- } else {
- eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
- TaskAttemptEventType.TA_SCHEDULE));
- }
+ ++nextAttemptNumber;
+ return attempt;
}
@Override
@@ -705,6 +681,16 @@ public abstract class TaskImpl implement
}
}
+ private void sendTaskStartedEvent() {
+ TaskStartedEvent tse = new TaskStartedEvent(
+ TypeConverter.fromYarn(taskId), getLaunchTime(),
+ TypeConverter.fromYarn(taskId.getTaskType()),
+ getSplitsAsString());
+ eventHandler
+ .handle(new JobHistoryEvent(taskId.getJobId(), tse));
+ historyTaskStartGenerated = true;
+ }
+
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
@@ -740,6 +726,16 @@ public abstract class TaskImpl implement
task.successfulAttempt = null;
}
+ private void sendTaskSucceededEvents() {
+ eventHandler.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
+ LOG.info("Task succeeded with attempt " + successfulAttempt);
+ if (historyTaskStartGenerated) {
+ TaskFinishedEvent tfe = createTaskFinishedEvent(this,
+ TaskStateInternal.SUCCEEDED);
+ eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
+ }
+ }
+
/**
* @return a String representation of the splits.
*
@@ -751,6 +747,122 @@ public abstract class TaskImpl implement
return "";
}
+ /**
+ * Recover a completed task from a previous application attempt
+ * @param taskInfo recovered info about the task
+ * @param recoverTaskOutput whether to recover task outputs
+ * @return state of the task after recovery
+ */
+ private TaskStateInternal recover(TaskInfo taskInfo,
+ OutputCommitter committer, boolean recoverTaskOutput) {
+ LOG.info("Recovering task " + taskId
+ + " from prior app attempt, status was " + taskInfo.getTaskStatus());
+
+ scheduledTime = taskInfo.getStartTime();
+ sendTaskStartedEvent();
+ Collection<TaskAttemptInfo> attemptInfos =
+ taskInfo.getAllTaskAttempts().values();
+
+ if (attemptInfos.size() > 0) {
+ metrics.launchedTask(this);
+ }
+
+ // recover the attempts for this task in the order they finished
+ // so task attempt completion events are ordered properly
+ int savedNextAttemptNumber = nextAttemptNumber;
+ ArrayList<TaskAttemptInfo> taInfos =
+ new ArrayList<TaskAttemptInfo>(taskInfo.getAllTaskAttempts().values());
+ Collections.sort(taInfos, TA_INFO_COMPARATOR);
+ for (TaskAttemptInfo taInfo : taInfos) {
+ nextAttemptNumber = taInfo.getAttemptId().getId();
+ TaskAttemptImpl attempt = addAttempt(Avataar.VIRGIN);
+ // handle the recovery inline so attempts complete before task does
+ attempt.handle(new TaskAttemptRecoverEvent(attempt.getID(), taInfo,
+ committer, recoverTaskOutput));
+ finishedAttempts.add(attempt.getID());
+ TaskAttemptCompletionEventStatus taces = null;
+ TaskAttemptState attemptState = attempt.getState();
+ switch (attemptState) {
+ case FAILED:
+ taces = TaskAttemptCompletionEventStatus.FAILED;
+ break;
+ case KILLED:
+ taces = TaskAttemptCompletionEventStatus.KILLED;
+ break;
+ case SUCCEEDED:
+ taces = TaskAttemptCompletionEventStatus.SUCCEEDED;
+ break;
+ default:
+ throw new IllegalStateException(
+ "Unexpected attempt state during recovery: " + attemptState);
+ }
+ if (attemptState == TaskAttemptState.FAILED) {
+ failedAttempts.add(attempt.getID());
+ if (failedAttempts.size() >= maxAttempts) {
+ taces = TaskAttemptCompletionEventStatus.TIPFAILED;
+ }
+ }
+
+ // don't clobber the successful attempt completion event
+ // TODO: this shouldn't be necessary after MAPREDUCE-4330
+ if (successfulAttempt == null) {
+ handleTaskAttemptCompletion(attempt.getID(), taces);
+ if (attemptState == TaskAttemptState.SUCCEEDED) {
+ successfulAttempt = attempt.getID();
+ }
+ }
+ }
+ nextAttemptNumber = savedNextAttemptNumber;
+
+ TaskStateInternal taskState = TaskStateInternal.valueOf(
+ taskInfo.getTaskStatus());
+ switch (taskState) {
+ case SUCCEEDED:
+ if (successfulAttempt != null) {
+ sendTaskSucceededEvents();
+ } else {
+ LOG.info("Missing successful attempt for task " + taskId
+ + ", recovering as RUNNING");
+ // there must have been a fetch failure and the retry wasn't complete
+ taskState = TaskStateInternal.RUNNING;
+ metrics.runningTask(this);
+ addAndScheduleAttempt(Avataar.VIRGIN);
+ }
+ break;
+ case FAILED:
+ case KILLED:
+ {
+ if (taskState == TaskStateInternal.KILLED && attemptInfos.size() == 0) {
+ metrics.endWaitingTask(this);
+ }
+ TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
+ taskInfo.getFinishTime(), taskInfo.getTaskType(),
+ taskInfo.getError(), taskInfo.getTaskStatus(),
+ taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters());
+ eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
+ eventHandler.handle(
+ new JobTaskEvent(taskId, getExternalState(taskState)));
+ break;
+ }
+ default:
+ throw new java.lang.AssertionError("Unexpected recovered task state: "
+ + taskState);
+ }
+
+ return taskState;
+ }
+
+ private static class RecoverTransition
+ implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+
+ @Override
+ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
+ TaskRecoverEvent tre = (TaskRecoverEvent) event;
+ return task.recover(tre.getTaskInfo(), tre.getOutputCommitter(),
+ tre.getRecoverTaskOutput());
+ }
+ }
+
private static class InitialScheduleTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@@ -758,13 +870,7 @@ public abstract class TaskImpl implement
public void transition(TaskImpl task, TaskEvent event) {
task.addAndScheduleAttempt(Avataar.VIRGIN);
task.scheduledTime = task.clock.getTime();
- TaskStartedEvent tse = new TaskStartedEvent(
- TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
- TypeConverter.fromYarn(task.taskId.getTaskType()),
- task.getSplitsAsString());
- task.eventHandler
- .handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
- task.historyTaskStartGenerated = true;
+ task.sendTaskStartedEvent();
}
}
@@ -818,16 +924,7 @@ public abstract class TaskImpl implement
task.finishedAttempts.add(taskAttemptId);
task.inProgressAttempts.remove(taskAttemptId);
task.successfulAttempt = taskAttemptId;
- task.eventHandler.handle(new JobTaskEvent(
- task.taskId, TaskState.SUCCEEDED));
- LOG.info("Task succeeded with attempt " + task.successfulAttempt);
- // issue kill to all other attempts
- if (task.historyTaskStartGenerated) {
- TaskFinishedEvent tfe = createTaskFinishedEvent(task,
- TaskStateInternal.SUCCEEDED);
- task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
- tfe));
- }
+ task.sendTaskSucceededEvents();
for (TaskAttempt attempt : task.attempts.values()) {
if (attempt.getID() != task.successfulAttempt &&
// This is okay because it can only talk us out of sending a
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Fri Apr 12 23:05:28 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -150,10 +151,14 @@ public class ContainerLauncherImpl exten
ContainerLaunchContext containerLaunchContext =
event.getContainer();
+ org.apache.hadoop.yarn.api.records.Container container =
+ BuilderUtils.newContainer(containerID, null, null,
+ event.getResource(), null, containerToken);
// Now launch the actual container
StartContainerRequest startRequest = Records
.newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
+ startRequest.setContainer(container);
StartContainerResponse response = proxy.startContainer(startRequest);
ByteBuffer portInfo = response
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java Fri Apr 12 23:05:28 2013
@@ -23,26 +23,34 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.Resource;
public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
private final ContainerLaunchContext container;
private final Task task;
+ private final Resource resource;
public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
ContainerId containerID, String containerMgrAddress,
ContainerToken containerToken,
- ContainerLaunchContext containerLaunchContext, Task remoteTask) {
+ ContainerLaunchContext containerLaunchContext, Resource resource,
+ Task remoteTask) {
super(taskAttemptID, containerID, containerMgrAddress, containerToken,
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
this.container = containerLaunchContext;
this.task = remoteTask;
+ this.resource = resource;
}
public ContainerLaunchContext getContainer() {
return this.container;
}
+ public Resource getResource() {
+ return this.resource;
+ }
+
public Task getRemoteTask() {
return this.task;
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Apr 12 23:05:28 2013
@@ -123,7 +123,7 @@ public class LocalContainerAllocator ext
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
- JobEventType.INTERNAL_ERROR));
+ JobEventType.JOB_AM_REBOOT));
throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Apr 12 23:05:28 2013
@@ -574,7 +574,7 @@ public class RMContainerAllocator extend
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
- JobEventType.INTERNAL_ERROR));
+ JobEventType.JOB_AM_REBOOT));
throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Fri Apr 12 23:05:28 2013
@@ -33,6 +33,9 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import com.google.inject.Inject;
@@ -53,7 +56,7 @@ public class TaskPage extends AppView {
h2($(TITLE));
return;
}
- html.
+ TBODY<TABLE<Hamlet>> tbody = html.
table("#attempts").
thead().
tr().
@@ -65,7 +68,8 @@ public class TaskPage extends AppView {
th(".tsh", "Started").
th(".tsh", "Finished").
th(".tsh", "Elapsed").
- th(".note", "Note")._()._();
+ th(".note", "Note")._()._().
+ tbody();
// Write all the data into a JavaScript array of arrays for JQuery
// DataTables to display
StringBuilder attemptsTableData = new StringBuilder("[\n");
@@ -105,6 +109,9 @@ public class TaskPage extends AppView {
attemptsTableData.append("]");
html.script().$type("text/javascript").
_("var attemptsTableData=" + attemptsTableData)._();
+
+ tbody._()._();
+
}
protected boolean isValidRequest() {
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Apr 12 23:05:28 2013
@@ -414,7 +414,8 @@ public class MRApp extends MRAppMaster {
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
- isNewApiCommitter(), currentUser.getUserName(), getContext(),
+ getCommitter(), isNewApiCommitter(),
+ currentUser.getUserName(), getContext(),
forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
@@ -648,12 +649,13 @@ public class MRApp extends MRAppMaster {
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock,
- boolean newApiCommitter, String user, AppContext appContext,
+ OutputCommitter committer, boolean newApiCommitter,
+ String user, AppContext appContext,
JobStateInternal forcedState, String diagnostic) {
super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock,
- getCompletedTaskFromPreviousRun(), metrics,
+ getCompletedTaskFromPreviousRun(), metrics, committer,
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
appContext, forcedState, diagnostic);
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Fri Apr 12 23:05:28 2013
@@ -18,10 +18,21 @@
package org.apache.hadoop.mapreduce.v2.app;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import junit.framework.Assert;
@@ -31,36 +42,66 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.Event;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.MapTaskImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRecovery {
@@ -75,6 +116,7 @@ public class TestRecovery {
private Text val1 = new Text("val1");
private Text val2 = new Text("val2");
+
/**
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
* completely disappears because of failed launch, one attempt gets killed and
@@ -1011,6 +1053,423 @@ public class TestRecovery {
app.verifyCompleted();
}
+ @Test
+ public void testRecoverySuccessAttempt() {
+ LOG.info("--- START: testRecoverySuccessAttempt ---");
+
+ long clusterTimestamp = System.currentTimeMillis();
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+ mockEventHandler);
+
+ TaskId taskId = recoverMapTask.getID();
+ JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+ TaskID taskID = new TaskID(jobID,
+ org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+ //Mock up the TaskAttempts
+ Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+ new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+ TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+ TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+ TaskAttemptState.SUCCEEDED);
+ mockTaskAttempts.put(taId1, mockTAinfo1);
+
+ TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+ TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+ TaskAttemptState.FAILED);
+ mockTaskAttempts.put(taId2, mockTAinfo2);
+
+ OutputCommitter mockCommitter = mock (OutputCommitter.class);
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+ when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+ when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+ recoverMapTask.handle(
+ new TaskRecoverEvent(taskId, mockTaskInfo,mockCommitter, true));
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler,atLeast(1)).handle(
+ (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+ Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+ new HashMap<TaskAttemptID, TaskAttemptState>();
+ finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
+ finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+
+ List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+ jobHistoryEvents.add(EventType.TASK_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+ jobHistoryEvents.add(EventType.TASK_FINISHED);
+ recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
+ arg, jobHistoryEvents, 2L, 1L);
+ }
+
+ @Test
+ public void testRecoveryAllFailAttempts() {
+ LOG.info("--- START: testRecoveryAllFailAttempts ---");
+
+ long clusterTimestamp = System.currentTimeMillis();
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+ mockEventHandler);
+
+ TaskId taskId = recoverMapTask.getID();
+ JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+ TaskID taskID = new TaskID(jobID,
+ org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+ //Mock up the TaskAttempts
+ Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+ new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+ TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+ TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+ TaskAttemptState.FAILED);
+ mockTaskAttempts.put(taId1, mockTAinfo1);
+
+ TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+ TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+ TaskAttemptState.FAILED);
+ mockTaskAttempts.put(taId2, mockTAinfo2);
+
+ OutputCommitter mockCommitter = mock (OutputCommitter.class);
+
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskStatus()).thenReturn("FAILED");
+ when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+ when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+ recoverMapTask.handle(
+ new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler,atLeast(1)).handle(
+ (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+ Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+ new HashMap<TaskAttemptID, TaskAttemptState>();
+ finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
+ finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+
+ List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+ jobHistoryEvents.add(EventType.TASK_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+ jobHistoryEvents.add(EventType.TASK_FAILED);
+ recoveryChecker(recoverMapTask, TaskState.FAILED, finalAttemptStates,
+ arg, jobHistoryEvents, 2L, 2L);
+ }
+
+ @Test
+ public void testRecoveryTaskSuccessAllAttemptsFail() {
+ LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---");
+
+ long clusterTimestamp = System.currentTimeMillis();
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+ mockEventHandler);
+
+ TaskId taskId = recoverMapTask.getID();
+ JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+ TaskID taskID = new TaskID(jobID,
+ org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+ //Mock up the TaskAttempts
+ Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+ new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+ TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+ TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+ TaskAttemptState.FAILED);
+ mockTaskAttempts.put(taId1, mockTAinfo1);
+
+ TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+ TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+ TaskAttemptState.FAILED);
+ mockTaskAttempts.put(taId2, mockTAinfo2);
+
+ OutputCommitter mockCommitter = mock (OutputCommitter.class);
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+ when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+ when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+ recoverMapTask.handle(
+ new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler,atLeast(1)).handle(
+ (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+ Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+ new HashMap<TaskAttemptID, TaskAttemptState>();
+ finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
+ finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+ // check for one new attempt launched since successful attempt not found
+ TaskAttemptID taId3 = new TaskAttemptID(taskID, 2000);
+ finalAttemptStates.put(taId3, TaskAttemptState.NEW);
+
+ List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+ jobHistoryEvents.add(EventType.TASK_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+ recoveryChecker(recoverMapTask, TaskState.RUNNING, finalAttemptStates,
+ arg, jobHistoryEvents, 2L, 2L);
+ }
+
+ @Test
+ public void testRecoveryTaskSuccessAllAttemptsSucceed() {
+ LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---");
+
+ long clusterTimestamp = System.currentTimeMillis();
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+ mockEventHandler);
+
+ TaskId taskId = recoverMapTask.getID();
+ JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+ TaskID taskID = new TaskID(jobID,
+ org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+ //Mock up the TaskAttempts
+ Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+ new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+ TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+ TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+ TaskAttemptState.SUCCEEDED);
+ mockTaskAttempts.put(taId1, mockTAinfo1);
+
+ TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+ TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+ TaskAttemptState.SUCCEEDED);
+ mockTaskAttempts.put(taId2, mockTAinfo2);
+
+ OutputCommitter mockCommitter = mock (OutputCommitter.class);
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+ when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+ when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+ recoverMapTask.handle(
+ new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler,atLeast(1)).handle(
+ (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+ Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+ new HashMap<TaskAttemptID, TaskAttemptState>();
+ finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
+ finalAttemptStates.put(taId2, TaskAttemptState.SUCCEEDED);
+
+ List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+ jobHistoryEvents.add(EventType.TASK_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+ jobHistoryEvents.add(EventType.TASK_FINISHED);
+ recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
+ arg, jobHistoryEvents, 2L, 0L);
+ }
+
+ @Test
+ public void testRecoveryAllAttemptsKilled() {
+ LOG.info("--- START: testRecoveryAllAttemptsKilled ---");
+
+ long clusterTimestamp = System.currentTimeMillis();
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+ mockEventHandler);
+
+ TaskId taskId = recoverMapTask.getID();
+ JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+ TaskID taskID = new TaskID(jobID,
+ org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+ //Mock up the TaskAttempts
+ Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+ new HashMap<TaskAttemptID, TaskAttemptInfo>();
+ TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+ TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+ TaskAttemptState.KILLED);
+ mockTaskAttempts.put(taId1, mockTAinfo1);
+
+ TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+ TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+ TaskAttemptState.KILLED);
+ mockTaskAttempts.put(taId2, mockTAinfo2);
+
+ OutputCommitter mockCommitter = mock (OutputCommitter.class);
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskStatus()).thenReturn("KILLED");
+ when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+ when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+ recoverMapTask.handle(
+ new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler,atLeast(1)).handle(
+ (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+ Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+ new HashMap<TaskAttemptID, TaskAttemptState>();
+ finalAttemptStates.put(taId1, TaskAttemptState.KILLED);
+ finalAttemptStates.put(taId2, TaskAttemptState.KILLED);
+
+ List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+ jobHistoryEvents.add(EventType.TASK_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
+ jobHistoryEvents.add(EventType.TASK_FAILED);
+ recoveryChecker(recoverMapTask, TaskState.KILLED, finalAttemptStates,
+ arg, jobHistoryEvents, 2L, 0L);
+ }
+
+ private void recoveryChecker(MapTaskImpl checkTask, TaskState finalState,
+ Map<TaskAttemptID, TaskAttemptState> finalAttemptStates,
+ ArgumentCaptor<Event> arg, List<EventType> expectedJobHistoryEvents,
+ long expectedMapLaunches, long expectedFailedMaps) {
+
+ assertEquals("Final State of Task", finalState, checkTask.getState());
+
+ Map<TaskAttemptId, TaskAttempt> recoveredAttempts =
+ checkTask.getAttempts();
+ assertEquals("Expected Number of Task Attempts",
+ finalAttemptStates.size(), recoveredAttempts.size());
+ for (TaskAttemptID taID : finalAttemptStates.keySet()) {
+ assertEquals("Expected Task Attempt State",
+ finalAttemptStates.get(taID),
+ recoveredAttempts.get(TypeConverter.toYarn(taID)).getState());
+ }
+
+ Iterator<Event> ie = arg.getAllValues().iterator();
+ int eventNum = 0;
+ long totalLaunchedMaps = 0;
+ long totalFailedMaps = 0;
+ boolean jobTaskEventReceived = false;
+
+ while (ie.hasNext()) {
+ Object current = ie.next();
+ ++eventNum;
+ LOG.info(eventNum + " " + current.getClass().getName());
+ if (current instanceof JobHistoryEvent) {
+ JobHistoryEvent jhe = (JobHistoryEvent) current;
+ LOG.info(expectedJobHistoryEvents.get(0).toString() + " " +
+ jhe.getHistoryEvent().getEventType().toString() + " " +
+ jhe.getJobID());
+ assertEquals(expectedJobHistoryEvents.get(0),
+ jhe.getHistoryEvent().getEventType());
+ expectedJobHistoryEvents.remove(0);
+ } else if (current instanceof JobCounterUpdateEvent) {
+ JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current;
+
+ LOG.info("JobCounterUpdateEvent "
+ + jcue.getCounterUpdates().get(0).getCounterKey()
+ + " " + jcue.getCounterUpdates().get(0).getIncrementValue());
+ if (jcue.getCounterUpdates().get(0).getCounterKey() ==
+ JobCounter.NUM_FAILED_MAPS) {
+ totalFailedMaps += jcue.getCounterUpdates().get(0)
+ .getIncrementValue();
+ } else if (jcue.getCounterUpdates().get(0).getCounterKey() ==
+ JobCounter.TOTAL_LAUNCHED_MAPS) {
+ totalLaunchedMaps += jcue.getCounterUpdates().get(0)
+ .getIncrementValue();
+ }
+ } else if (current instanceof JobTaskEvent) {
+ JobTaskEvent jte = (JobTaskEvent) current;
+ assertEquals(jte.getState(), finalState);
+ jobTaskEventReceived = true;
+ }
+ }
+ assertTrue(jobTaskEventReceived || (finalState == TaskState.RUNNING));
+ assertEquals("Did not process all expected JobHistoryEvents",
+ 0, expectedJobHistoryEvents.size());
+ assertEquals("Expected Map Launches",
+ expectedMapLaunches, totalLaunchedMaps);
+ assertEquals("Expected Failed Maps",
+ expectedFailedMaps, totalFailedMaps);
+ }
+
+ private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) {
+
+ ApplicationId appId = BuilderUtils.newApplicationId(clusterTimestamp, 1);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+
+ int partitions = 2;
+
+ Path remoteJobConfFile = mock(Path.class);
+ JobConf conf = new JobConf();
+ TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class);
+ Token<JobTokenIdentifier> jobToken =
+ (Token<JobTokenIdentifier>) mock(Token.class);
+ Credentials credentials = null;
+ Clock clock = new SystemClock();
+ int appAttemptId = 3;
+ MRAppMetrics metrics = mock(MRAppMetrics.class);
+ Resource minContainerRequirements = mock(Resource.class);
+ when(minContainerRequirements.getMemory()).thenReturn(1000);
+
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ when(clusterInfo.getMinContainerCapability()).thenReturn(
+ minContainerRequirements);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getClusterInfo()).thenReturn(clusterInfo);
+
+ TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
+ MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions,
+ eh, remoteJobConfFile, conf,
+ taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock,
+ appAttemptId, metrics, appContext);
+ return mapTask;
+ }
+
+ private TaskAttemptInfo getMockTaskAttemptInfo(TaskAttemptID tai,
+ TaskAttemptState tas) {
+
+ ContainerId ci = mock(ContainerId.class);
+ Counters counters = mock(Counters.class);
+ TaskType tt = TaskType.MAP;
+
+ long finishTime = System.currentTimeMillis();
+
+ TaskAttemptInfo mockTAinfo = mock(TaskAttemptInfo.class);
+
+ when(mockTAinfo.getAttemptId()).thenReturn(tai);
+ when(mockTAinfo.getContainerId()).thenReturn(ci);
+ when(mockTAinfo.getCounters()).thenReturn(counters);
+ when(mockTAinfo.getError()).thenReturn("");
+ when(mockTAinfo.getFinishTime()).thenReturn(finishTime);
+ when(mockTAinfo.getHostname()).thenReturn("localhost");
+ when(mockTAinfo.getHttpPort()).thenReturn(23);
+ when(mockTAinfo.getMapFinishTime()).thenReturn(finishTime - 1000L);
+ when(mockTAinfo.getPort()).thenReturn(24);
+ when(mockTAinfo.getRackname()).thenReturn("defaultRack");
+ when(mockTAinfo.getShuffleFinishTime()).thenReturn(finishTime - 2000L);
+ when(mockTAinfo.getShufflePort()).thenReturn(25);
+ when(mockTAinfo.getSortFinishTime()).thenReturn(finishTime - 3000L);
+ when(mockTAinfo.getStartTime()).thenReturn(finishTime -10000);
+ when(mockTAinfo.getState()).thenReturn("task in progress");
+ when(mockTAinfo.getTaskStatus()).thenReturn(tas.toString());
+ when(mockTAinfo.getTaskType()).thenReturn(tt);
+ when(mockTAinfo.getTrackerName()).thenReturn("TrackerName");
+ return mockTAinfo;
+ }
+
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
@@ -1145,5 +1604,16 @@ public class TestRecovery {
public static void main(String[] arg) throws Exception {
TestRecovery test = new TestRecovery();
test.testCrashed();
+ test.testMultipleCrashes();
+ test.testOutputRecovery();
+ test.testOutputRecoveryMapsOnly();
+ test.testRecoveryWithOldCommiter();
+ test.testSpeculative();
+ test.testRecoveryWithoutShuffleSecret();
+ test.testRecoverySuccessAttempt();
+ test.testRecoveryAllFailAttempts();
+ test.testRecoveryTaskSuccessAllAttemptsFail();
+ test.testRecoveryTaskSuccessAllAttemptsSucceed();
+ test.testRecoveryAllAttemptsKilled();
}
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Fri Apr 12 23:05:28 2013
@@ -33,7 +33,9 @@ import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -45,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -86,9 +89,68 @@ import org.junit.Test;
attemptId.setApplicationId(appId);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
- MRAppMaster appMaster = new TestMRApp(attemptId);
+ ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+ JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+ appMaster.init(conf);
+ appMaster.start();
+ appMaster.shutDownJob();
+ //test whether notifyIsLastAMRetry called
+ Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
+ verify(fs).delete(stagingJobPath, true);
+ }
+
+ @Test (timeout = 30000)
+ public void testNoDeletionofStagingOnReboot() throws IOException {
+ conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+ fs = mock(FileSystem.class);
+ when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true);
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+ when(fs.exists(stagingDir)).thenReturn(true);
+ ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+ ApplicationAttemptId.class);
+ attemptId.setAttemptId(0);
+ ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+ appId.setClusterTimestamp(System.currentTimeMillis());
+ appId.setId(0);
+ attemptId.setApplicationId(appId);
+ ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+ JobStateInternal.REBOOT, 4);
+ appMaster.init(conf);
+ appMaster.start();
+ //shutdown the job, not the lastRetry
+ appMaster.shutDownJob();
+ //test whether notifyIsLastAMRetry called
+ Assert.assertEquals(false, ((TestMRApp)appMaster).getTestIsLastAMRetry());
+ verify(fs, times(0)).delete(stagingJobPath, true);
+ }
+
+ @Test (timeout = 30000)
+ public void testDeletionofStagingOnReboot() throws IOException {
+ conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+ fs = mock(FileSystem.class);
+ when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true);
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+ when(fs.exists(stagingDir)).thenReturn(true);
+ ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+ ApplicationAttemptId.class);
+ attemptId.setAttemptId(1);
+ ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+ appId.setClusterTimestamp(System.currentTimeMillis());
+ appId.setId(0);
+ attemptId.setApplicationId(appId);
+ ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+ JobStateInternal.REBOOT, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
appMaster.init(conf);
+ appMaster.start();
+ //shutdown the job, is lastRetry
appMaster.shutDownJob();
+ //test whether notifyIsLastAMRetry called
+ Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
verify(fs).delete(stagingJobPath, true);
}
@@ -151,6 +213,8 @@ import org.junit.Test;
private class TestMRApp extends MRAppMaster {
ContainerAllocator allocator;
+ boolean testIsLastAMRetry = false;
+ JobStateInternal jobStateInternal;
public TestMRApp(ApplicationAttemptId applicationAttemptId,
ContainerAllocator allocator, int maxAppAttempts) {
@@ -160,9 +224,11 @@ import org.junit.Test;
this.allocator = allocator;
}
- public TestMRApp(ApplicationAttemptId applicationAttemptId) {
- this(applicationAttemptId, null,
- MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+ public TestMRApp(ApplicationAttemptId applicationAttemptId,
+ ContainerAllocator allocator, JobStateInternal jobStateInternal,
+ int maxAppAttempts) {
+ this(applicationAttemptId, allocator, maxAppAttempts);
+ this.jobStateInternal = jobStateInternal;
}
@Override
@@ -180,6 +246,31 @@ import org.junit.Test;
}
@Override
+ protected Job createJob(Configuration conf, JobStateInternal forcedState,
+ String diagnostic) {
+ JobImpl jobImpl = mock(JobImpl.class);
+ when(jobImpl.getInternalState()).thenReturn(this.jobStateInternal);
+ JobID jobID = JobID.forName("job_1234567890000_0001");
+ JobId jobId = TypeConverter.toYarn(jobID);
+ when(jobImpl.getID()).thenReturn(jobId);
+ ((AppContext) getContext())
+ .getAllJobs().put(jobImpl.getID(), jobImpl);
+ return jobImpl;
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ DefaultMetricsSystem.shutdown();
+ }
+
+ @Override
+ public void notifyIsLastAMRetry(boolean isLastAMRetry){
+ testIsLastAMRetry = isLastAMRetry;
+ super.notifyIsLastAMRetry(isLastAMRetry);
+ }
+
+ @Override
public RMHeartbeatHandler getRMHeartbeatHandler() {
return getStubbedHeartbeatHandler(getContext());
}
@@ -197,6 +288,9 @@ import org.junit.Test;
protected void downloadTokensAndSetupUGI(Configuration conf) {
}
+ public boolean getTestIsLastAMRetry(){
+ return testIsLastAMRetry;
+ }
}
private final class MRAppTestCleanup extends MRApp {
@@ -222,7 +316,8 @@ import org.junit.Test;
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
- isNewApiCommitter(), currentUser.getUserName(), getContext(),
+ getCommitter(), isNewApiCommitter(),
+ currentUser.getUserName(), getContext(),
forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
@@ -288,7 +383,7 @@ import org.junit.Test;
};
}
- @Test
+ @Test(timeout=20000)
public void testStagingCleanupOrder() throws Exception {
MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
this.getClass().getName(), true);
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Apr 12 23:05:28 2013
@@ -25,6 +25,8 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
@@ -35,6 +37,7 @@ import org.apache.hadoop.mapreduce.JobAC
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus.State;
@@ -47,6 +50,7 @@ import org.apache.hadoop.mapreduce.secur
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -57,6 +61,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
@@ -69,7 +74,6 @@ import org.apache.hadoop.yarn.SystemCloc
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -114,6 +118,7 @@ public class TestJobImpl {
conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName");
conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1");
conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2");
+ conf.set(MRJobConfig.WORKFLOW_TAGS, "tag1,tag2");
AsyncDispatcher dispatcher = new AsyncDispatcher();
@@ -126,12 +131,13 @@ public class TestJobImpl {
commitHandler.start();
JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId",
- "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ");
+ "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ",
+ "tag1,tag2");
dispatcher.register(EventType.class, jseHandler);
JobImpl job = createStubbedJob(conf, dispatcher, 0);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
- job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+ job.handle(new JobStartEvent(job.getID()));
assertJobState(job, JobStateInternal.SUCCEEDED);
dispatcher.stop();
commitHandler.stop();
@@ -193,6 +199,68 @@ public class TestJobImpl {
}
@Test(timeout=20000)
+ public void testRebootedDuringSetup() throws Exception{
+ Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ OutputCommitter committer = new StubbedOutputCommitter() {
+ @Override
+ public synchronized void setupJob(JobContext jobContext)
+ throws IOException {
+ while(!Thread.interrupted()){
+ try{
+ wait();
+ }catch (InterruptedException e) {
+ }
+ }
+ }
+ };
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
+
+ JobImpl job = createStubbedJob(conf, dispatcher, 2);
+ JobId jobId = job.getID();
+ job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+ assertJobState(job, JobStateInternal.INITED);
+ job.handle(new JobStartEvent(jobId));
+ assertJobState(job, JobStateInternal.SETUP);
+
+ job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
+ assertJobState(job, JobStateInternal.REBOOT);
+ dispatcher.stop();
+ commitHandler.stop();
+ }
+
+ @Test(timeout=20000)
+ public void testRebootedDuringCommit() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ CyclicBarrier syncBarrier = new CyclicBarrier(2);
+ OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true);
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
+
+ JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+ completeJobTasks(job);
+ assertJobState(job, JobStateInternal.COMMITTING);
+
+ syncBarrier.await();
+ job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
+ assertJobState(job, JobStateInternal.REBOOT);
+ dispatcher.stop();
+ commitHandler.stop();
+ }
+
+ @Test(timeout=20000)
public void testKilledDuringSetup() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
@@ -220,7 +288,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
- job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.SETUP);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
@@ -287,7 +355,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
- job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.FAIL_ABORT);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
@@ -324,7 +392,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
- job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.SETUP);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
@@ -364,7 +432,7 @@ public class TestJobImpl {
// Verify access
JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
- null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -375,7 +443,7 @@ public class TestJobImpl {
// Verify access
JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
- null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -386,7 +454,7 @@ public class TestJobImpl {
// Verify access
JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
- null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -397,7 +465,7 @@ public class TestJobImpl {
// Verify access
JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
- null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -408,7 +476,7 @@ public class TestJobImpl {
// Verify access
JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
- null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
}
@@ -426,7 +494,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
- mrAppMetrics, true, null, 0, null, null, null, null);
+ mrAppMetrics, null, true, null, 0, null, null, null, null);
job.handle(diagUpdateEvent);
String diagnostics = job.getReport().getDiagnostics();
Assert.assertNotNull(diagnostics);
@@ -437,7 +505,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
- mrAppMetrics, true, null, 0, null, null, null, null);
+ mrAppMetrics, null, true, null, 0, null, null, null, null);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
job.handle(diagUpdateEvent);
diagnostics = job.getReport().getDiagnostics();
@@ -492,7 +560,7 @@ public class TestJobImpl {
JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
null, new JobTokenSecretManager(), new Credentials(), null, null,
- mrAppMetrics, true, null, 0, null, null, null, null);
+ mrAppMetrics, null, true, null, 0, null, null, null, null);
InitTransition initTransition = getInitTransition(2);
JobEvent mockJobEvent = mock(JobEvent.class);
initTransition.transition(job, mockJobEvent);
@@ -533,7 +601,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
- job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
@@ -597,7 +665,7 @@ public class TestJobImpl {
StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
- job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+ job.handle(new JobStartEvent(job.getID()));
assertJobState(job, JobStateInternal.RUNNING);
return job;
}
@@ -644,14 +712,18 @@ public class TestJobImpl {
private String workflowAdjacencies;
+ private String workflowTags;
+
private Boolean assertBoolean;
public JobSubmittedEventHandler(String workflowId, String workflowName,
- String workflowNodeName, String workflowAdjacencies) {
+ String workflowNodeName, String workflowAdjacencies,
+ String workflowTags) {
this.workflowId = workflowId;
this.workflowName = workflowName;
this.workflowNodeName = workflowNodeName;
this.workflowAdjacencies = workflowAdjacencies;
+ this.workflowTags = workflowTags;
assertBoolean = null;
}
@@ -673,7 +745,16 @@ public class TestJobImpl {
setAssertValue(false);
return;
}
- if (!workflowAdjacencies.equals(jsEvent.getWorkflowAdjacencies())) {
+
+ String[] wrkflowAdj = workflowAdjacencies.split(" ");
+ String[] jswrkflowAdj = jsEvent.getWorkflowAdjacencies().split(" ");
+ Arrays.sort(wrkflowAdj);
+ Arrays.sort(jswrkflowAdj);
+ if (!Arrays.equals(wrkflowAdj, jswrkflowAdj)) {
+ setAssertValue(false);
+ return;
+ }
+ if (!workflowTags.equals(jsEvent.getWorkflowTags())) {
setAssertValue(false);
return;
}
@@ -713,9 +794,9 @@ public class TestJobImpl {
boolean newApiCommitter, String user, int numSplits) {
super(jobId, applicationAttemptId, conf, eventHandler,
null, new JobTokenSecretManager(), new Credentials(),
- new SystemClock(), null, MRAppMetrics.create(),
- newApiCommitter, user, System.currentTimeMillis(), null, null, null,
- null);
+ new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(),
+ MRAppMetrics.create(), null, newApiCommitter, user,
+ System.currentTimeMillis(), null, null, null, null);
initTransition = getInitTransition(numSplits);
localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java Fri Apr 12 23:05:28 2013
@@ -113,9 +113,9 @@ public class TestTaskAttemptContainerReq
ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
ContainerLaunchContext launchCtx =
- TaskAttemptImpl.createContainerLaunchContext(acls, containerId,
+ TaskAttemptImpl.createContainerLaunchContext(acls,
jobConf, jobToken, taImpl.createRemoteTask(),
- TypeConverter.fromYarn(jobId), mock(Resource.class),
+ TypeConverter.fromYarn(jobId),
mock(WrappedJvmID.class), taListener,
credentials);
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Fri Apr 12 23:05:28 2013
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.TaskUmbi
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
@@ -80,7 +78,6 @@ public class TestTaskImpl {
private Path remoteJobConfFile;
private Credentials credentials;
private Clock clock;
- private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private MRAppMetrics metrics;
private TaskImpl mockTask;
private ApplicationId appId;
@@ -104,13 +101,12 @@ public class TestTaskImpl {
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
- Credentials credentials, Clock clock,
- Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
+ Credentials credentials, Clock clock, int startCount,
MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
super(jobId, taskType , partition, eventHandler,
remoteJobConfFile, conf, taskAttemptListener,
jobToken, credentials, clock,
- completedTasksFromPreviousRun, startCount, metrics, appContext);
+ startCount, metrics, appContext);
this.taskType = taskType;
}
@@ -247,8 +243,7 @@ public class TestTaskImpl {
return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, jobToken,
credentials, clock,
- completedTasksFromPreviousRun, startCount,
- metrics, appContext, taskType);
+ startCount, metrics, appContext, taskType);
}
@After
@@ -652,9 +647,7 @@ public class TestTaskImpl {
public void testFailedTransitions() {
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, jobToken,
- credentials, clock,
- completedTasksFromPreviousRun, startCount,
- metrics, appContext, TaskType.MAP) {
+ credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
@Override
protected int getMaxAttempts() {
return 1;
@@ -721,9 +714,7 @@ public class TestTaskImpl {
public void testCountersWithSpeculation() {
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, jobToken,
- credentials, clock,
- completedTasksFromPreviousRun, startCount,
- metrics, appContext, TaskType.MAP) {
+ credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
@Override
protected int getMaxAttempts() {
return 1;
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java Fri Apr 12 23:05:28 2013
@@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerToken;
@@ -383,7 +382,6 @@ public class TestContainerLauncher {
@Override
public StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException {
- ContainerLaunchContext container = request.getContainerLaunchContext();
StartContainerResponse response = recordFactory
.newRecordInstance(StartContainerResponse.class);
status = recordFactory.newRecordInstance(ContainerStatus.class);
@@ -395,7 +393,7 @@ public class TestContainerLauncher {
throw new UndeclaredThrowableException(e);
}
status.setState(ContainerState.RUNNING);
- status.setContainerId(container.getContainerId());
+ status.setContainerId(request.getContainer().getId());
status.setExitStatus(0);
return response;
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java Fri Apr 12 23:05:28 2013
@@ -144,7 +144,10 @@ public class TestAMWebApp {
@Test public void testTaskView() {
AppContext appContext = new TestAppContext();
Map<String, String> params = getTaskParams(appContext);
- WebAppTests.testPage(TaskPage.class, AppContext.class, appContext, params);
+ App app = new App(appContext);
+ app.setJob(appContext.getAllJobs().values().iterator().next());
+ app.setTask(app.getJob().getTasks().values().iterator().next());
+ WebAppTests.testPage(TaskPage.class, App.class, app, params);
}
public static Map<String, String> getJobParams(AppContext appContext) {
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Fri Apr 12 23:05:28 2013
@@ -32,13 +32,13 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -91,6 +91,9 @@ class LocalDistributedCacheManager {
Map<String, LocalResource> localResources =
new LinkedHashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
+ // Generating unique numbers for FSDownload.
+ AtomicLong uniqueNumberGenerator =
+ new AtomicLong(System.currentTimeMillis());
// Find which resources are to be put on the local classpath
Map<String, Path> classpaths = new HashMap<String, Path>();
@@ -128,8 +131,10 @@ class LocalDistributedCacheManager {
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
for (LocalResource resource : localResources.values()) {
- Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
- destPath, resource, new Random());
+ Callable<Path> download =
+ new FSDownload(localFSFileContext, ugi, conf, new Path(destPath,
+ Long.toString(uniqueNumberGenerator.incrementAndGet())),
+ resource);
Future<Path> future = exec.submit(download);
resourcesToPaths.put(resource, future);
}