You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2013/04/13 01:05:45 UTC

svn commit: r1467511 [2/4] - in /hadoop/common/branches/HDFS-347/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-ap...

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Apr 12 23:05:28 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
@@ -37,7 +38,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
@@ -69,8 +70,10 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
@@ -152,6 +155,12 @@ public abstract class TaskImpl implement
         TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED, 
         TaskEventType.T_KILL, new KillNewTransition())
+    .addTransition(TaskStateInternal.NEW,
+        EnumSet.of(TaskStateInternal.FAILED,
+                   TaskStateInternal.KILLED,
+                   TaskStateInternal.RUNNING,
+                   TaskStateInternal.SUCCEEDED),
+        TaskEventType.T_RECOVER, new RecoverTransition())
 
     // Transitions from SCHEDULED state
       //when the first attempt is launched, the task state is set to RUNNING
@@ -250,20 +259,16 @@ public abstract class TaskImpl implement
 
   // By default, the next TaskAttempt number is zero. Changes during recovery  
   protected int nextAttemptNumber = 0;
-  private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
-      new ArrayList<TaskAttemptInfo>();
 
-  private static final class RecoverdAttemptsComparator implements
-      Comparator<TaskAttemptInfo> {
-    @Override
-    public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
-      long diff = attempt1.getStartTime() - attempt2.getStartTime();
-      return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
-    }
-  }
-
-  private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
-      new RecoverdAttemptsComparator();
+  // For sorting task attempts by completion time
+  private static final Comparator<TaskAttemptInfo> TA_INFO_COMPARATOR =
+      new Comparator<TaskAttemptInfo>() {
+        @Override
+        public int compare(TaskAttemptInfo a, TaskAttemptInfo b) {
+          long diff = a.getFinishTime() - b.getFinishTime();
+          return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
+        }
+      };
 
   @Override
   public TaskState getState() {
@@ -280,8 +285,7 @@ public abstract class TaskImpl implement
       TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
-      Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
-      MRAppMetrics metrics, AppContext appContext) {
+      int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
     this.conf = conf;
     this.clock = clock;
     this.jobFile = remoteJobConfFile;
@@ -307,41 +311,15 @@ public abstract class TaskImpl implement
     this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
                                             MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
 
-    // See if this is from a previous generation.
-    if (completedTasksFromPreviousRun != null
-        && completedTasksFromPreviousRun.containsKey(taskId)) {
-      // This task has TaskAttempts from previous generation. We have to replay
-      // them.
-      LOG.info("Task is from previous run " + taskId);
-      TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
-      Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
-          taskInfo.getAllTaskAttempts();
-      taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
-      taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
-      Collections.sort(taskAttemptsFromPreviousGeneration,
-        RECOVERED_ATTEMPTS_COMPARATOR);
-    }
-
-    if (taskAttemptsFromPreviousGeneration.isEmpty()) {
-      // All the previous attempts are exhausted, now start with a new
-      // generation.
-
-      // All the new TaskAttemptIDs are generated based on MR
-      // ApplicationAttemptID so that attempts from previous lives don't
-      // over-step the current one. This assumes that a task won't have more
-      // than 1000 attempts in its single generation, which is very reasonable.
-      // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
-      // and requires serious medical attention.
-      nextAttemptNumber = (startCount - 1) * 1000;
-    } else {
-      // There are still some TaskAttempts from previous generation, use them
-      nextAttemptNumber =
-          taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
-    }
-
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
+
+    // All the new TaskAttemptIDs are generated based on MR
+    // ApplicationAttemptID so that attempts from previous lives don't
+    // over-step the current one. This assumes that a task won't have more
+    // than 1000 attempts in its single generation, which is very reasonable.
+    nextAttemptNumber = (appAttemptId - 1) * 1000;
   }
 
   @Override
@@ -600,14 +578,28 @@ public abstract class TaskImpl implement
 
   // This is always called in the Write Lock
   private void addAndScheduleAttempt(Avataar avataar) {
-    TaskAttempt attempt = createAttempt();
-    ((TaskAttemptImpl) attempt).setAvataar(avataar);
+    TaskAttempt attempt = addAttempt(avataar);
+    inProgressAttempts.add(attempt.getID());
+    //schedule the nextAttemptNumber
+    if (failedAttempts.size() > 0) {
+      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+          TaskAttemptEventType.TA_RESCHEDULE));
+    } else {
+      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+          TaskAttemptEventType.TA_SCHEDULE));
+    }
+  }
+
+  private TaskAttemptImpl addAttempt(Avataar avataar) {
+    TaskAttemptImpl attempt = createAttempt();
+    attempt.setAvataar(avataar);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Created attempt " + attempt.getID());
     }
     switch (attempts.size()) {
       case 0:
-        attempts = Collections.singletonMap(attempt.getID(), attempt);
+        attempts = Collections.singletonMap(attempt.getID(),
+            (TaskAttempt) attempt);
         break;
         
       case 1:
@@ -623,24 +615,8 @@ public abstract class TaskImpl implement
         break;
     }
 
-    // Update nextATtemptNumber
-    if (taskAttemptsFromPreviousGeneration.isEmpty()) {
-      ++nextAttemptNumber;
-    } else {
-      // There are still some TaskAttempts from previous generation, use them
-      nextAttemptNumber =
-          taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
-    }
-
-    inProgressAttempts.add(attempt.getID());
-    //schedule the nextAttemptNumber
-    if (failedAttempts.size() > 0) {
-      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
-        TaskAttemptEventType.TA_RESCHEDULE));
-    } else {
-      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
-          TaskAttemptEventType.TA_SCHEDULE));
-    }
+    ++nextAttemptNumber;
+    return attempt;
   }
 
   @Override
@@ -705,6 +681,16 @@ public abstract class TaskImpl implement
     }
   }
 
+  private void sendTaskStartedEvent() {
+    TaskStartedEvent tse = new TaskStartedEvent(
+        TypeConverter.fromYarn(taskId), getLaunchTime(),
+        TypeConverter.fromYarn(taskId.getTaskType()),
+        getSplitsAsString());
+    eventHandler
+        .handle(new JobHistoryEvent(taskId.getJobId(), tse));
+    historyTaskStartGenerated = true;
+  }
+
   private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
     TaskFinishedEvent tfe =
       new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
@@ -740,6 +726,16 @@ public abstract class TaskImpl implement
     task.successfulAttempt = null;
   }
 
+  private void sendTaskSucceededEvents() {
+    eventHandler.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
+    LOG.info("Task succeeded with attempt " + successfulAttempt);
+    if (historyTaskStartGenerated) {
+      TaskFinishedEvent tfe = createTaskFinishedEvent(this,
+          TaskStateInternal.SUCCEEDED);
+      eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
+    }
+  }
+
   /**
   * @return a String representation of the splits.
   *
@@ -751,6 +747,122 @@ public abstract class TaskImpl implement
 	  return "";
   }
 
+  /**
+   * Recover a completed task from a previous application attempt
+   * @param taskInfo recovered info about the task
+   * @param recoverTaskOutput whether to recover task outputs
+   * @return state of the task after recovery
+   */
+  private TaskStateInternal recover(TaskInfo taskInfo,
+      OutputCommitter committer, boolean recoverTaskOutput) {
+    LOG.info("Recovering task " + taskId
+        + " from prior app attempt, status was " + taskInfo.getTaskStatus());
+
+    scheduledTime = taskInfo.getStartTime();
+    sendTaskStartedEvent();
+    Collection<TaskAttemptInfo> attemptInfos =
+        taskInfo.getAllTaskAttempts().values();
+
+    if (attemptInfos.size() > 0) {
+      metrics.launchedTask(this);
+    }
+
+    // recover the attempts for this task in the order they finished
+    // so task attempt completion events are ordered properly
+    int savedNextAttemptNumber = nextAttemptNumber;
+    ArrayList<TaskAttemptInfo> taInfos =
+        new ArrayList<TaskAttemptInfo>(taskInfo.getAllTaskAttempts().values());
+    Collections.sort(taInfos, TA_INFO_COMPARATOR);
+    for (TaskAttemptInfo taInfo : taInfos) {
+      nextAttemptNumber = taInfo.getAttemptId().getId();
+      TaskAttemptImpl attempt = addAttempt(Avataar.VIRGIN);
+      // handle the recovery inline so attempts complete before task does
+      attempt.handle(new TaskAttemptRecoverEvent(attempt.getID(), taInfo,
+          committer, recoverTaskOutput));
+      finishedAttempts.add(attempt.getID());
+      TaskAttemptCompletionEventStatus taces = null;
+      TaskAttemptState attemptState = attempt.getState();
+      switch (attemptState) {
+      case FAILED:
+        taces = TaskAttemptCompletionEventStatus.FAILED;
+        break;
+      case KILLED:
+        taces = TaskAttemptCompletionEventStatus.KILLED;
+        break;
+      case SUCCEEDED:
+        taces = TaskAttemptCompletionEventStatus.SUCCEEDED;
+        break;
+      default:
+        throw new IllegalStateException(
+            "Unexpected attempt state during recovery: " + attemptState);
+      }
+      if (attemptState == TaskAttemptState.FAILED) {
+        failedAttempts.add(attempt.getID());
+        if (failedAttempts.size() >= maxAttempts) {
+          taces = TaskAttemptCompletionEventStatus.TIPFAILED;
+        }
+      }
+
+      // don't clobber the successful attempt completion event
+      // TODO: this shouldn't be necessary after MAPREDUCE-4330
+      if (successfulAttempt == null) {
+        handleTaskAttemptCompletion(attempt.getID(), taces);
+        if (attemptState == TaskAttemptState.SUCCEEDED) {
+          successfulAttempt = attempt.getID();
+        }
+      }
+    }
+    nextAttemptNumber = savedNextAttemptNumber;
+
+    TaskStateInternal taskState = TaskStateInternal.valueOf(
+        taskInfo.getTaskStatus());
+    switch (taskState) {
+    case SUCCEEDED:
+      if (successfulAttempt != null) {
+        sendTaskSucceededEvents();
+      } else {
+        LOG.info("Missing successful attempt for task " + taskId
+            + ", recovering as RUNNING");
+        // there must have been a fetch failure and the retry wasn't complete
+        taskState = TaskStateInternal.RUNNING;
+        metrics.runningTask(this);
+        addAndScheduleAttempt(Avataar.VIRGIN);
+      }
+      break;
+    case FAILED:
+    case KILLED:
+    {
+      if (taskState == TaskStateInternal.KILLED && attemptInfos.size() == 0) {
+        metrics.endWaitingTask(this);
+      }
+      TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
+          taskInfo.getFinishTime(), taskInfo.getTaskType(),
+          taskInfo.getError(), taskInfo.getTaskStatus(),
+          taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters());
+      eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
+      eventHandler.handle(
+          new JobTaskEvent(taskId, getExternalState(taskState)));
+      break;
+    }
+    default:
+      throw new java.lang.AssertionError("Unexpected recovered task state: "
+          + taskState);
+    }
+
+    return taskState;
+  }
+
+  private static class RecoverTransition
+    implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+
+    @Override
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
+      TaskRecoverEvent tre = (TaskRecoverEvent) event;
+      return task.recover(tre.getTaskInfo(), tre.getOutputCommitter(),
+          tre.getRecoverTaskOutput());
+    }
+  }
+
   private static class InitialScheduleTransition
     implements SingleArcTransition<TaskImpl, TaskEvent> {
 
@@ -758,13 +870,7 @@ public abstract class TaskImpl implement
     public void transition(TaskImpl task, TaskEvent event) {
       task.addAndScheduleAttempt(Avataar.VIRGIN);
       task.scheduledTime = task.clock.getTime();
-      TaskStartedEvent tse = new TaskStartedEvent(
-          TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
-          TypeConverter.fromYarn(task.taskId.getTaskType()),
-          task.getSplitsAsString());
-      task.eventHandler
-          .handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
-      task.historyTaskStartGenerated = true;
+      task.sendTaskStartedEvent();
     }
   }
 
@@ -818,16 +924,7 @@ public abstract class TaskImpl implement
       task.finishedAttempts.add(taskAttemptId);
       task.inProgressAttempts.remove(taskAttemptId);
       task.successfulAttempt = taskAttemptId;
-      task.eventHandler.handle(new JobTaskEvent(
-          task.taskId, TaskState.SUCCEEDED));
-      LOG.info("Task succeeded with attempt " + task.successfulAttempt);
-      // issue kill to all other attempts
-      if (task.historyTaskStartGenerated) {
-        TaskFinishedEvent tfe = createTaskFinishedEvent(task,
-            TaskStateInternal.SUCCEEDED);
-        task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
-            tfe));
-      }
+      task.sendTaskSucceededEvents();
       for (TaskAttempt attempt : task.attempts.values()) {
         if (attempt.getID() != task.successfulAttempt &&
             // This is okay because it can only talk us out of sending a

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Fri Apr 12 23:05:28 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -150,10 +151,14 @@ public class ContainerLauncherImpl exten
         ContainerLaunchContext containerLaunchContext =
           event.getContainer();
 
+        org.apache.hadoop.yarn.api.records.Container container =
+            BuilderUtils.newContainer(containerID, null, null,
+                event.getResource(), null, containerToken);
         // Now launch the actual container
         StartContainerRequest startRequest = Records
           .newRecord(StartContainerRequest.class);
         startRequest.setContainerLaunchContext(containerLaunchContext);
+        startRequest.setContainer(container);
         StartContainerResponse response = proxy.startContainer(startRequest);
 
         ByteBuffer portInfo = response

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java Fri Apr 12 23:05:28 2013
@@ -23,26 +23,34 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.Resource;
 
 public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
 
   private final ContainerLaunchContext container;
   private final Task task;
+  private final Resource resource;
 
   public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
       ContainerId containerID, String containerMgrAddress,
       ContainerToken containerToken,
-      ContainerLaunchContext containerLaunchContext, Task remoteTask) {
+      ContainerLaunchContext containerLaunchContext, Resource resource,
+      Task remoteTask) {
     super(taskAttemptID, containerID, containerMgrAddress, containerToken,
         ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
     this.container = containerLaunchContext;
     this.task = remoteTask;
+    this.resource = resource;
   }
 
   public ContainerLaunchContext getContainer() {
     return this.container;
   }
 
+  public Resource getResource() {
+    return this.resource;
+  }
+
   public Task getRemoteTask() {
     return this.task;
   }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Apr 12 23:05:28 2013
@@ -123,7 +123,7 @@ public class LocalContainerAllocator ext
       // This can happen if the RM has been restarted. If it is in that state,
       // this application must clean itself up.
       eventHandler.handle(new JobEvent(this.getJob().getID(),
-                                       JobEventType.INTERNAL_ERROR));
+                                       JobEventType.JOB_AM_REBOOT));
       throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
                                this.getContext().getApplicationID());
     }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Apr 12 23:05:28 2013
@@ -574,7 +574,7 @@ public class RMContainerAllocator extend
       // This can happen if the RM has been restarted. If it is in that state,
       // this application must clean itself up.
       eventHandler.handle(new JobEvent(this.getJob().getID(),
-                                       JobEventType.INTERNAL_ERROR));
+                                       JobEventType.JOB_AM_REBOOT));
       throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
                                this.getContext().getApplicationID());
     }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Fri Apr 12 23:05:28 2013
@@ -33,6 +33,9 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
 import org.apache.hadoop.yarn.webapp.SubView;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
 
 import com.google.inject.Inject;
 
@@ -53,7 +56,7 @@ public class TaskPage extends AppView {
           h2($(TITLE));
         return;
       }
-      html.
+      TBODY<TABLE<Hamlet>> tbody = html.
       table("#attempts").
         thead().
           tr().
@@ -65,7 +68,8 @@ public class TaskPage extends AppView {
             th(".tsh", "Started").
             th(".tsh", "Finished").
             th(".tsh", "Elapsed").
-            th(".note", "Note")._()._();
+            th(".note", "Note")._()._().
+      tbody();
       // Write all the data into a JavaScript array of arrays for JQuery
       // DataTables to display
       StringBuilder attemptsTableData = new StringBuilder("[\n");
@@ -105,6 +109,9 @@ public class TaskPage extends AppView {
       attemptsTableData.append("]");
       html.script().$type("text/javascript").
       _("var attemptsTableData=" + attemptsTableData)._();
+
+      tbody._()._();
+
     }
 
     protected boolean isValidRequest() {

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Apr 12 23:05:28 2013
@@ -414,7 +414,8 @@ public class MRApp extends MRAppMaster {
     Job newJob = new TestJob(getJobId(), getAttemptID(), conf, 
     		getDispatcher().getEventHandler(),
             getTaskAttemptListener(), getContext().getClock(),
-            isNewApiCommitter(), currentUser.getUserName(), getContext(),
+            getCommitter(), isNewApiCommitter(),
+            currentUser.getUserName(), getContext(),
             forcedState, diagnostic);
     ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
@@ -648,12 +649,13 @@ public class MRApp extends MRAppMaster {
     public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
         Configuration conf, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Clock clock,
-        boolean newApiCommitter, String user, AppContext appContext, 
+        OutputCommitter committer, boolean newApiCommitter,
+        String user, AppContext appContext,
         JobStateInternal forcedState, String diagnostic) {
       super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
           conf, eventHandler, taskAttemptListener,
           new JobTokenSecretManager(), new Credentials(), clock,
-          getCompletedTaskFromPreviousRun(), metrics,
+          getCompletedTaskFromPreviousRun(), metrics, committer,
           newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
           appContext, forcedState, diagnostic);
 

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Fri Apr 12 23:05:28 2013
@@ -18,10 +18,21 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import junit.framework.Assert;
 
@@ -31,36 +42,66 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.Event;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.MapTaskImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class TestRecovery {
@@ -75,6 +116,7 @@ public class TestRecovery {
   private Text val1 = new Text("val1");
   private Text val2 = new Text("val2");
 
+
   /**
    * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
    * completely disappears because of failed launch, one attempt gets killed and
@@ -1011,6 +1053,423 @@ public class TestRecovery {
     app.verifyCompleted();
   }
 
+  @Test
+  public void testRecoverySuccessAttempt() {
+    LOG.info("--- START: testRecoverySuccessAttempt ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.SUCCEEDED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo,mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
+    finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    jobHistoryEvents.add(EventType.TASK_FINISHED);
+    recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 1L);
+  }
+
+  @Test
+  public void testRecoveryAllFailAttempts() {
+    LOG.info("--- START: testRecoveryAllFailAttempts ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("FAILED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
+    finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    jobHistoryEvents.add(EventType.TASK_FAILED);
+    recoveryChecker(recoverMapTask, TaskState.FAILED, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 2L);
+  }
+
+  @Test
+  public void testRecoveryTaskSuccessAllAttemptsFail() {
+    LOG.info("--- START:  testRecoveryTaskSuccessAllAttemptsFail ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
+    finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+    // check for one new attempt launched since successful attempt not found
+    TaskAttemptID taId3 = new TaskAttemptID(taskID, 2000);
+    finalAttemptStates.put(taId3, TaskAttemptState.NEW);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    recoveryChecker(recoverMapTask, TaskState.RUNNING, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 2L);
+  }
+
+  @Test
+  public void testRecoveryTaskSuccessAllAttemptsSucceed() {
+    LOG.info("--- START:  testRecoveryTaskSuccessAllAttemptsFail ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.SUCCEEDED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.SUCCEEDED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
+    finalAttemptStates.put(taId2, TaskAttemptState.SUCCEEDED);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+    jobHistoryEvents.add(EventType.TASK_FINISHED);
+    recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 0L);
+  }
+
+  @Test
+  public void testRecoveryAllAttemptsKilled() {
+    LOG.info("--- START:  testRecoveryAllAttemptsKilled ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.KILLED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.KILLED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("KILLED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.KILLED);
+    finalAttemptStates.put(taId2, TaskAttemptState.KILLED);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
+    jobHistoryEvents.add(EventType.TASK_FAILED);
+    recoveryChecker(recoverMapTask, TaskState.KILLED, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 0L);
+  }
+
+  private void recoveryChecker(MapTaskImpl checkTask, TaskState finalState,
+      Map<TaskAttemptID, TaskAttemptState> finalAttemptStates,
+      ArgumentCaptor<Event> arg, List<EventType> expectedJobHistoryEvents,
+      long expectedMapLaunches, long expectedFailedMaps) {
+
+    assertEquals("Final State of Task", finalState, checkTask.getState());
+
+    Map<TaskAttemptId, TaskAttempt> recoveredAttempts =
+        checkTask.getAttempts();
+    assertEquals("Expected Number of Task Attempts",
+        finalAttemptStates.size(), recoveredAttempts.size());
+    for (TaskAttemptID taID : finalAttemptStates.keySet()) {
+      assertEquals("Expected Task Attempt State",
+          finalAttemptStates.get(taID),
+          recoveredAttempts.get(TypeConverter.toYarn(taID)).getState());
+    }
+
+    Iterator<Event> ie = arg.getAllValues().iterator();
+    int eventNum = 0;
+    long totalLaunchedMaps = 0;
+    long totalFailedMaps = 0;
+    boolean jobTaskEventReceived = false;
+
+    while (ie.hasNext()) {
+      Object current = ie.next();
+      ++eventNum;
+      LOG.info(eventNum + " " + current.getClass().getName());
+      if (current instanceof JobHistoryEvent) {
+        JobHistoryEvent jhe = (JobHistoryEvent) current;
+        LOG.info(expectedJobHistoryEvents.get(0).toString() + " " +
+            jhe.getHistoryEvent().getEventType().toString() + " " +
+            jhe.getJobID());
+        assertEquals(expectedJobHistoryEvents.get(0),
+            jhe.getHistoryEvent().getEventType());
+        expectedJobHistoryEvents.remove(0);
+      }  else if (current instanceof JobCounterUpdateEvent) {
+        JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current;
+
+        LOG.info("JobCounterUpdateEvent "
+            + jcue.getCounterUpdates().get(0).getCounterKey()
+            + " " + jcue.getCounterUpdates().get(0).getIncrementValue());
+        if (jcue.getCounterUpdates().get(0).getCounterKey() ==
+            JobCounter.NUM_FAILED_MAPS) {
+          totalFailedMaps += jcue.getCounterUpdates().get(0)
+              .getIncrementValue();
+        } else if (jcue.getCounterUpdates().get(0).getCounterKey() ==
+            JobCounter.TOTAL_LAUNCHED_MAPS) {
+          totalLaunchedMaps += jcue.getCounterUpdates().get(0)
+              .getIncrementValue();
+        }
+      } else if (current instanceof JobTaskEvent) {
+        JobTaskEvent jte = (JobTaskEvent) current;
+        assertEquals(jte.getState(), finalState);
+        jobTaskEventReceived = true;
+      }
+    }
+    assertTrue(jobTaskEventReceived || (finalState == TaskState.RUNNING));
+    assertEquals("Did not process all expected JobHistoryEvents",
+        0, expectedJobHistoryEvents.size());
+    assertEquals("Expected Map Launches",
+        expectedMapLaunches, totalLaunchedMaps);
+    assertEquals("Expected Failed Maps",
+        expectedFailedMaps, totalFailedMaps);
+  }
+
+  private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) {
+
+    ApplicationId appId = BuilderUtils.newApplicationId(clusterTimestamp, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+
+    int partitions = 2;
+
+    Path remoteJobConfFile = mock(Path.class);
+    JobConf conf = new JobConf();
+    TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class);
+    Token<JobTokenIdentifier> jobToken =
+        (Token<JobTokenIdentifier>) mock(Token.class);
+    Credentials credentials = null;
+    Clock clock = new SystemClock();
+    int appAttemptId = 3;
+    MRAppMetrics metrics = mock(MRAppMetrics.class);
+    Resource minContainerRequirements = mock(Resource.class);
+    when(minContainerRequirements.getMemory()).thenReturn(1000);
+
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    when(clusterInfo.getMinContainerCapability()).thenReturn(
+        minContainerRequirements);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getClusterInfo()).thenReturn(clusterInfo);
+
+    TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
+    MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions,
+        eh, remoteJobConfFile, conf,
+        taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock,
+        appAttemptId, metrics, appContext);
+    return mapTask;
+  }
+
+  private TaskAttemptInfo getMockTaskAttemptInfo(TaskAttemptID tai,
+      TaskAttemptState tas) {
+
+    ContainerId ci = mock(ContainerId.class);
+    Counters counters = mock(Counters.class);
+    TaskType tt = TaskType.MAP;
+
+    long finishTime = System.currentTimeMillis();
+
+    TaskAttemptInfo mockTAinfo = mock(TaskAttemptInfo.class);
+
+    when(mockTAinfo.getAttemptId()).thenReturn(tai);
+    when(mockTAinfo.getContainerId()).thenReturn(ci);
+    when(mockTAinfo.getCounters()).thenReturn(counters);
+    when(mockTAinfo.getError()).thenReturn("");
+    when(mockTAinfo.getFinishTime()).thenReturn(finishTime);
+    when(mockTAinfo.getHostname()).thenReturn("localhost");
+    when(mockTAinfo.getHttpPort()).thenReturn(23);
+    when(mockTAinfo.getMapFinishTime()).thenReturn(finishTime - 1000L);
+    when(mockTAinfo.getPort()).thenReturn(24);
+    when(mockTAinfo.getRackname()).thenReturn("defaultRack");
+    when(mockTAinfo.getShuffleFinishTime()).thenReturn(finishTime - 2000L);
+    when(mockTAinfo.getShufflePort()).thenReturn(25);
+    when(mockTAinfo.getSortFinishTime()).thenReturn(finishTime - 3000L);
+    when(mockTAinfo.getStartTime()).thenReturn(finishTime -10000);
+    when(mockTAinfo.getState()).thenReturn("task in progress");
+    when(mockTAinfo.getTaskStatus()).thenReturn(tas.toString());
+    when(mockTAinfo.getTaskType()).thenReturn(tt);
+    when(mockTAinfo.getTrackerName()).thenReturn("TrackerName");
+    return mockTAinfo;
+  }
+
   private void writeBadOutput(TaskAttempt attempt, Configuration conf)
   throws Exception {
   TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
@@ -1145,5 +1604,16 @@ public class TestRecovery {
   public static void main(String[] arg) throws Exception {
     TestRecovery test = new TestRecovery();
     test.testCrashed();
+    test.testMultipleCrashes();
+    test.testOutputRecovery();
+    test.testOutputRecoveryMapsOnly();
+    test.testRecoveryWithOldCommiter();
+    test.testSpeculative();
+    test.testRecoveryWithoutShuffleSecret();
+    test.testRecoverySuccessAttempt();
+    test.testRecoveryAllFailAttempts();
+    test.testRecoveryTaskSuccessAllAttemptsFail();
+    test.testRecoveryTaskSuccessAllAttemptsSucceed();
+    test.testRecoveryAllAttemptsKilled();
   }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Fri Apr 12 23:05:28 2013
@@ -33,7 +33,9 @@ import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -45,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -86,9 +89,68 @@ import org.junit.Test;
      attemptId.setApplicationId(appId);
      JobId jobid = recordFactory.newRecordInstance(JobId.class);
      jobid.setAppId(appId);
-     MRAppMaster appMaster = new TestMRApp(attemptId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+         JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+     appMaster.init(conf);
+     appMaster.start();
+     appMaster.shutDownJob();
+     //test whether notifyIsLastAMRetry called
+     Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
+     verify(fs).delete(stagingJobPath, true);
+   }
+
+   @Test (timeout = 30000)
+   public void testNoDeletionofStagingOnReboot() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true);
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
+     ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+         ApplicationAttemptId.class);
+     attemptId.setAttemptId(0);
+     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+     appId.setClusterTimestamp(System.currentTimeMillis());
+     appId.setId(0);
+     attemptId.setApplicationId(appId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+         JobStateInternal.REBOOT, 4);
+     appMaster.init(conf);
+     appMaster.start();
+     //shutdown the job, not the lastRetry
+     appMaster.shutDownJob();
+     //test whether notifyIsLastAMRetry called
+     Assert.assertEquals(false, ((TestMRApp)appMaster).getTestIsLastAMRetry());
+     verify(fs, times(0)).delete(stagingJobPath, true);
+   }
+
+   @Test (timeout = 30000)
+   public void testDeletionofStagingOnReboot() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true);
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
+     ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+         ApplicationAttemptId.class);
+     attemptId.setAttemptId(1);
+     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+     appId.setClusterTimestamp(System.currentTimeMillis());
+     appId.setId(0);
+     attemptId.setApplicationId(appId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+         JobStateInternal.REBOOT, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
      appMaster.init(conf);
+     appMaster.start();
+     //shutdown the job, is lastRetry
      appMaster.shutDownJob();
+     //test whether notifyIsLastAMRetry called
+     Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
      verify(fs).delete(stagingJobPath, true);
    }
    
@@ -151,6 +213,8 @@ import org.junit.Test;
 
    private class TestMRApp extends MRAppMaster {
      ContainerAllocator allocator;
+     boolean testIsLastAMRetry = false;
+     JobStateInternal jobStateInternal;
 
      public TestMRApp(ApplicationAttemptId applicationAttemptId, 
          ContainerAllocator allocator, int maxAppAttempts) {
@@ -160,9 +224,11 @@ import org.junit.Test;
        this.allocator = allocator;
      }
 
-     public TestMRApp(ApplicationAttemptId applicationAttemptId) {
-       this(applicationAttemptId, null,
-           MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+     public TestMRApp(ApplicationAttemptId applicationAttemptId,
+         ContainerAllocator allocator, JobStateInternal jobStateInternal,
+             int maxAppAttempts) {
+       this(applicationAttemptId, allocator, maxAppAttempts);
+       this.jobStateInternal = jobStateInternal;
      }
 
      @Override
@@ -180,6 +246,31 @@ import org.junit.Test;
      }
 
      @Override
+     protected Job createJob(Configuration conf, JobStateInternal forcedState,
+         String diagnostic) {
+       JobImpl jobImpl = mock(JobImpl.class);
+       when(jobImpl.getInternalState()).thenReturn(this.jobStateInternal);
+       JobID jobID = JobID.forName("job_1234567890000_0001");
+       JobId jobId = TypeConverter.toYarn(jobID);
+       when(jobImpl.getID()).thenReturn(jobId);
+       ((AppContext) getContext())
+           .getAllJobs().put(jobImpl.getID(), jobImpl);
+       return jobImpl;
+     }
+
+     @Override
+     public void start() {
+       super.start();
+       DefaultMetricsSystem.shutdown();
+     }
+
+     @Override
+     public void notifyIsLastAMRetry(boolean isLastAMRetry){
+       testIsLastAMRetry = isLastAMRetry;
+       super.notifyIsLastAMRetry(isLastAMRetry);
+     }
+
+     @Override
      public RMHeartbeatHandler getRMHeartbeatHandler() {
        return getStubbedHeartbeatHandler(getContext());
      }
@@ -197,6 +288,9 @@ import org.junit.Test;
      protected void downloadTokensAndSetupUGI(Configuration conf) {
      }
 
+     public boolean getTestIsLastAMRetry(){
+       return testIsLastAMRetry;
+     }
    }
 
   private final class MRAppTestCleanup extends MRApp {
@@ -222,7 +316,8 @@ import org.junit.Test;
       Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
           getDispatcher().getEventHandler(),
           getTaskAttemptListener(), getContext().getClock(),
-          isNewApiCommitter(), currentUser.getUserName(), getContext(),
+          getCommitter(), isNewApiCommitter(),
+          currentUser.getUserName(), getContext(),
           forcedState, diagnostic);
       ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
@@ -288,7 +383,7 @@ import org.junit.Test;
     };
   }
 
-  @Test
+  @Test(timeout=20000)
   public void testStagingCleanupOrder() throws Exception {
     MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
         this.getClass().getName(), true);

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Apr 12 23:05:28 2013
@@ -25,6 +25,8 @@ import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
@@ -35,6 +37,7 @@ import org.apache.hadoop.mapreduce.JobAC
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.jobhistory.EventType;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus.State;
@@ -47,6 +50,7 @@ import org.apache.hadoop.mapreduce.secur
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -57,6 +61,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
@@ -69,7 +74,6 @@ import org.apache.hadoop.yarn.SystemCloc
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -114,6 +118,7 @@ public class TestJobImpl {
     conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName");
     conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1");
     conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2");
+    conf.set(MRJobConfig.WORKFLOW_TAGS, "tag1,tag2");
     
  
     AsyncDispatcher dispatcher = new AsyncDispatcher();
@@ -126,12 +131,13 @@ public class TestJobImpl {
     commitHandler.start();
 
     JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId",
-        "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ");
+        "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ",
+        "tag1,tag2");
     dispatcher.register(EventType.class, jseHandler);
     JobImpl job = createStubbedJob(conf, dispatcher, 0);
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+    job.handle(new JobStartEvent(job.getID()));
     assertJobState(job, JobStateInternal.SUCCEEDED);
     dispatcher.stop();
     commitHandler.stop();
@@ -193,6 +199,68 @@ public class TestJobImpl {
   }
 
   @Test(timeout=20000)
+  public void testRebootedDuringSetup() throws Exception{
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = new StubbedOutputCommitter() {
+      @Override
+      public synchronized void setupJob(JobContext jobContext)
+          throws IOException {
+        while(!Thread.interrupted()){
+          try{
+            wait();
+          }catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobStartEvent(jobId));
+    assertJobState(job, JobStateInternal.SETUP);
+
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
+    assertJobState(job, JobStateInternal.REBOOT);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
+  @Test(timeout=20000)
+  public void testRebootedDuringCommit() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    completeJobTasks(job);
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    syncBarrier.await();
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
+    assertJobState(job, JobStateInternal.REBOOT);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
+  @Test(timeout=20000)
   public void testKilledDuringSetup() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
@@ -220,7 +288,7 @@ public class TestJobImpl {
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    job.handle(new JobStartEvent(jobId));
     assertJobState(job, JobStateInternal.SETUP);
 
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
@@ -287,7 +355,7 @@ public class TestJobImpl {
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    job.handle(new JobStartEvent(jobId));
     assertJobState(job, JobStateInternal.FAIL_ABORT);
 
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
@@ -324,7 +392,7 @@ public class TestJobImpl {
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    job.handle(new JobStartEvent(jobId));
     assertJobState(job, JobStateInternal.SETUP);
 
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
@@ -364,7 +432,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -375,7 +443,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -386,7 +454,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -397,7 +465,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -408,7 +476,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
@@ -426,7 +494,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, true, null, 0, null, null, null, null);
+        mrAppMetrics, null, true, null, 0, null, null, null, null);
     job.handle(diagUpdateEvent);
     String diagnostics = job.getReport().getDiagnostics();
     Assert.assertNotNull(diagnostics);
@@ -437,7 +505,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, true, null, 0, null, null, null, null);
+        mrAppMetrics, null, true, null, 0, null, null, null, null);
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
     job.handle(diagUpdateEvent);
     diagnostics = job.getReport().getDiagnostics();
@@ -492,7 +560,7 @@ public class TestJobImpl {
     JobImpl job = new JobImpl(jobId, Records
         .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
         null, new JobTokenSecretManager(), new Credentials(), null, null,
-        mrAppMetrics, true, null, 0, null, null, null, null);
+        mrAppMetrics, null, true, null, 0, null, null, null, null);
     InitTransition initTransition = getInitTransition(2);
     JobEvent mockJobEvent = mock(JobEvent.class);
     initTransition.transition(job, mockJobEvent);
@@ -533,7 +601,7 @@ public class TestJobImpl {
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    job.handle(new JobStartEvent(jobId));
     assertJobState(job, JobStateInternal.FAILED);
 
     job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
@@ -597,7 +665,7 @@ public class TestJobImpl {
     StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+    job.handle(new JobStartEvent(job.getID()));
     assertJobState(job, JobStateInternal.RUNNING);
     return job;
   }
@@ -644,14 +712,18 @@ public class TestJobImpl {
     
     private String workflowAdjacencies;
     
+    private String workflowTags;
+    
     private Boolean assertBoolean;
 
     public JobSubmittedEventHandler(String workflowId, String workflowName,
-        String workflowNodeName, String workflowAdjacencies) {
+        String workflowNodeName, String workflowAdjacencies,
+        String workflowTags) {
       this.workflowId = workflowId;
       this.workflowName = workflowName;
       this.workflowNodeName = workflowNodeName;
       this.workflowAdjacencies = workflowAdjacencies;
+      this.workflowTags = workflowTags;
       assertBoolean = null;
     }
 
@@ -673,7 +745,16 @@ public class TestJobImpl {
         setAssertValue(false);
         return;
       }
-      if (!workflowAdjacencies.equals(jsEvent.getWorkflowAdjacencies())) {
+     
+      String[] wrkflowAdj = workflowAdjacencies.split(" ");
+      String[] jswrkflowAdj = jsEvent.getWorkflowAdjacencies().split(" ");
+      Arrays.sort(wrkflowAdj);
+      Arrays.sort(jswrkflowAdj);
+      if (!Arrays.equals(wrkflowAdj, jswrkflowAdj)) {
+        setAssertValue(false);
+        return;
+      }
+      if (!workflowTags.equals(jsEvent.getWorkflowTags())) {
         setAssertValue(false);
         return;
       }
@@ -713,9 +794,9 @@ public class TestJobImpl {
         boolean newApiCommitter, String user, int numSplits) {
       super(jobId, applicationAttemptId, conf, eventHandler,
           null, new JobTokenSecretManager(), new Credentials(),
-          new SystemClock(), null, MRAppMetrics.create(),
-          newApiCommitter, user, System.currentTimeMillis(), null, null, null,
-          null);
+          new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(),
+          MRAppMetrics.create(), null, newApiCommitter, user,
+          System.currentTimeMillis(), null, null, null, null);
 
       initTransition = getInitTransition(numSplits);
       localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java Fri Apr 12 23:05:28 2013
@@ -113,9 +113,9 @@ public class TestTaskAttemptContainerReq
     ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
 
     ContainerLaunchContext launchCtx =
-        TaskAttemptImpl.createContainerLaunchContext(acls, containerId,
+        TaskAttemptImpl.createContainerLaunchContext(acls,
             jobConf, jobToken, taImpl.createRemoteTask(),
-            TypeConverter.fromYarn(jobId), mock(Resource.class),
+            TypeConverter.fromYarn(jobId),
             mock(WrappedJvmID.class), taListener,
             credentials);
 

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Fri Apr 12 23:05:28 2013
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.TaskUmbi
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
@@ -80,7 +78,6 @@ public class TestTaskImpl {
   private Path remoteJobConfFile;
   private Credentials credentials;
   private Clock clock;
-  private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
   private MRAppMetrics metrics;
   private TaskImpl mockTask;
   private ApplicationId appId;
@@ -104,13 +101,12 @@ public class TestTaskImpl {
         EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
         TaskAttemptListener taskAttemptListener,
         Token<JobTokenIdentifier> jobToken,
-        Credentials credentials, Clock clock,
-        Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
+        Credentials credentials, Clock clock, int startCount,
         MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
       super(jobId, taskType , partition, eventHandler,
           remoteJobConfFile, conf, taskAttemptListener,
           jobToken, credentials, clock,
-          completedTasksFromPreviousRun, startCount, metrics, appContext);
+          startCount, metrics, appContext);
       this.taskType = taskType;
     }
 
@@ -247,8 +243,7 @@ public class TestTaskImpl {
     return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
         remoteJobConfFile, conf, taskAttemptListener, jobToken,
         credentials, clock,
-        completedTasksFromPreviousRun, startCount,
-        metrics, appContext, taskType);
+        startCount, metrics, appContext, taskType);
   }
 
   @After 
@@ -652,9 +647,7 @@ public class TestTaskImpl {
   public void testFailedTransitions() {
     mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
         remoteJobConfFile, conf, taskAttemptListener, jobToken,
-        credentials, clock,
-        completedTasksFromPreviousRun, startCount,
-        metrics, appContext, TaskType.MAP) {
+        credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
           @Override
           protected int getMaxAttempts() {
             return 1;
@@ -721,9 +714,7 @@ public class TestTaskImpl {
   public void testCountersWithSpeculation() {
     mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
         remoteJobConfFile, conf, taskAttemptListener, jobToken,
-        credentials, clock,
-        completedTasksFromPreviousRun, startCount,
-        metrics, appContext, TaskType.MAP) {
+        credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
           @Override
           protected int getMaxAttempts() {
             return 1;

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java Fri Apr 12 23:05:28 2013
@@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
@@ -383,7 +382,6 @@ public class TestContainerLauncher {
     @Override
     public StartContainerResponse startContainer(StartContainerRequest request)
         throws YarnRemoteException {
-      ContainerLaunchContext container = request.getContainerLaunchContext();
       StartContainerResponse response = recordFactory
           .newRecordInstance(StartContainerResponse.class);
       status = recordFactory.newRecordInstance(ContainerStatus.class);
@@ -395,7 +393,7 @@ public class TestContainerLauncher {
         throw new UndeclaredThrowableException(e);
             }
       status.setState(ContainerState.RUNNING);
-      status.setContainerId(container.getContainerId());
+      status.setContainerId(request.getContainer().getId());
       status.setExitStatus(0);
       return response;
             }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java Fri Apr 12 23:05:28 2013
@@ -144,7 +144,10 @@ public class TestAMWebApp {
   @Test public void testTaskView() {
     AppContext appContext = new TestAppContext();
     Map<String, String> params = getTaskParams(appContext);
-    WebAppTests.testPage(TaskPage.class, AppContext.class, appContext, params);
+    App app = new App(appContext);
+    app.setJob(appContext.getAllJobs().values().iterator().next());
+    app.setTask(app.getJob().getTasks().values().iterator().next());
+    WebAppTests.testPage(TaskPage.class, App.class, app, params);
   }
 
   public static Map<String, String> getJobParams(AppContext appContext) {

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Fri Apr 12 23:05:28 2013
@@ -32,13 +32,13 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -91,6 +91,9 @@ class LocalDistributedCacheManager {
     Map<String, LocalResource> localResources = 
       new LinkedHashMap<String, LocalResource>();
     MRApps.setupDistributedCache(conf, localResources);
+    // Generating unique numbers for FSDownload.
+    AtomicLong uniqueNumberGenerator =
+        new AtomicLong(System.currentTimeMillis());
     
     // Find which resources are to be put on the local classpath
     Map<String, Path> classpaths = new HashMap<String, Path>();
@@ -128,8 +131,10 @@ class LocalDistributedCacheManager {
       Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
       Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
       for (LocalResource resource : localResources.values()) {
-        Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
-            destPath, resource, new Random());
+        Callable<Path> download =
+            new FSDownload(localFSFileContext, ugi, conf, new Path(destPath,
+                Long.toString(uniqueNumberGenerator.incrementAndGet())),
+                resource);
         Future<Path> future = exec.submit(download);
         resourcesToPaths.put(resource, future);
       }