You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/11/12 16:15:53 UTC

svn commit: r1408314 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java...

Author: bobby
Date: Mon Nov 12 15:15:52 2012
New Revision: 1408314

URL: http://svn.apache.org/viewvc?rev=1408314&view=rev
Log:
MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1408314&r1=1408313&r2=1408314&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Nov 12 15:15:52 2012
@@ -650,6 +650,8 @@ Release 0.23.5 - UNRELEASED
 
     MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED
     state (jlowe via bobby)
+
+    MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)
  
 Release 0.23.4 - UNRELEASED
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1408314&r1=1408313&r2=1408314&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon Nov 12 15:15:52 2012
@@ -712,7 +712,10 @@ public class JobImpl implements org.apac
    * The only entry point to change the Job.
    */
   public void handle(JobEvent event) {
-    LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getJobId() + " of type "
+          + event.getType());
+    }
     try {
       writeLock.lock();
       JobStateInternal oldState = getInternalState();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1408314&r1=1408313&r2=1408314&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Mon Nov 12 15:15:52 2012
@@ -22,9 +22,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -118,9 +120,18 @@ public abstract class TaskImpl implement
   protected Credentials credentials;
   protected Token<JobTokenIdentifier> jobToken;
   
+  //should be set to one which comes first
+  //saying COMMIT_PENDING
+  private TaskAttemptId commitAttempt;
+
+  private TaskAttemptId successfulAttempt;
+
+  private final Set<TaskAttemptId> failedAttempts;
+  // Track the finished attempts - successful, failed and killed
+  private final Set<TaskAttemptId> finishedAttempts;
   // counts the number of attempts that are either running or in a state where
   //  they will come to be running when they get a Container
-  private int numberUncompletedAttempts = 0;
+  private final Set<TaskAttemptId> inProgressAttempts;
 
   private boolean historyTaskStartGenerated = false;
   
@@ -182,6 +193,14 @@ public abstract class TaskImpl implement
         EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
         TaskEventType.T_ATTEMPT_KILLED,
         new KillWaitAttemptKilledTransition())
+    .addTransition(TaskStateInternal.KILL_WAIT,
+        EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
+        TaskEventType.T_ATTEMPT_SUCCEEDED,
+        new KillWaitAttemptSucceededTransition())
+    .addTransition(TaskStateInternal.KILL_WAIT,
+        EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
+        TaskEventType.T_ATTEMPT_FAILED,
+        new KillWaitAttemptFailedTransition())
     // Ignore-able transitions.
     .addTransition(
         TaskStateInternal.KILL_WAIT,
@@ -189,8 +208,6 @@ public abstract class TaskImpl implement
         EnumSet.of(TaskEventType.T_KILL,
             TaskEventType.T_ATTEMPT_LAUNCHED,
             TaskEventType.T_ATTEMPT_COMMIT_PENDING,
-            TaskEventType.T_ATTEMPT_FAILED,
-            TaskEventType.T_ATTEMPT_SUCCEEDED,
             TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from SUCCEEDED state
@@ -242,15 +259,6 @@ public abstract class TaskImpl implement
   private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
       new RecoverdAttemptsComparator();
 
-  //should be set to one which comes first
-  //saying COMMIT_PENDING
-  private TaskAttemptId commitAttempt;
-
-  private TaskAttemptId successfulAttempt;
-
-  private int failedAttempts;
-  private int finishedAttempts;//finish are total of success, failed and killed
-
   @Override
   public TaskState getState() {
     readLock.lock();
@@ -275,6 +283,9 @@ public abstract class TaskImpl implement
     readLock = readWriteLock.readLock();
     writeLock = readWriteLock.writeLock();
     this.attempts = Collections.emptyMap();
+    this.finishedAttempts = new HashSet<TaskAttemptId>(2);
+    this.failedAttempts = new HashSet<TaskAttemptId>(2);
+    this.inProgressAttempts = new HashSet<TaskAttemptId>(2);
     // This overridable method call is okay in a constructor because we
     //  have a convention that none of the overrides depends on any
     //  fields that need initialization.
@@ -611,9 +622,9 @@ public abstract class TaskImpl implement
           taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
     }
 
-    ++numberUncompletedAttempts;
+    inProgressAttempts.add(attempt.getID());
     //schedule the nextAttemptNumber
-    if (failedAttempts > 0) {
+    if (failedAttempts.size() > 0) {
       eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
         TaskAttemptEventType.TA_RESCHEDULE));
     } else {
@@ -788,12 +799,14 @@ public abstract class TaskImpl implement
       implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
+      TaskTAttemptEvent taskTAttemptEvent = (TaskTAttemptEvent) event;
+      TaskAttemptId taskAttemptId = taskTAttemptEvent.getTaskAttemptID();
       task.handleTaskAttemptCompletion(
-          ((TaskTAttemptEvent) event).getTaskAttemptID(), 
+          taskAttemptId, 
           TaskAttemptCompletionEventStatus.SUCCEEDED);
-      task.finishedAttempts++;
-      --task.numberUncompletedAttempts;
-      task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID();
+      task.finishedAttempts.add(taskAttemptId);
+      task.inProgressAttempts.remove(taskAttemptId);
+      task.successfulAttempt = taskAttemptId;
       task.eventHandler.handle(new JobTaskEvent(
           task.taskId, TaskState.SUCCEEDED));
       LOG.info("Task succeeded with attempt " + task.successfulAttempt);
@@ -824,11 +837,13 @@ public abstract class TaskImpl implement
       SingleArcTransition<TaskImpl, TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
+      TaskAttemptId taskAttemptId =
+          ((TaskTAttemptEvent) event).getTaskAttemptID();
       task.handleTaskAttemptCompletion(
-          ((TaskTAttemptEvent) event).getTaskAttemptID(), 
+          taskAttemptId, 
           TaskAttemptCompletionEventStatus.KILLED);
-      task.finishedAttempts++;
-      --task.numberUncompletedAttempts;
+      task.finishedAttempts.add(taskAttemptId);
+      task.inProgressAttempts.remove(taskAttemptId);
       if (task.successfulAttempt == null) {
         task.addAndScheduleAttempt();
       }
@@ -840,15 +855,25 @@ public abstract class TaskImpl implement
       MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
     protected TaskStateInternal finalState = TaskStateInternal.KILLED;
+    protected final TaskAttemptCompletionEventStatus taCompletionEventStatus;
+
+    public KillWaitAttemptKilledTransition() {
+      this(TaskAttemptCompletionEventStatus.KILLED);
+    }
+
+    public KillWaitAttemptKilledTransition(
+        TaskAttemptCompletionEventStatus taCompletionEventStatus) {
+      this.taCompletionEventStatus = taCompletionEventStatus;
+    }
 
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
-      task.handleTaskAttemptCompletion(
-          ((TaskTAttemptEvent) event).getTaskAttemptID(), 
-          TaskAttemptCompletionEventStatus.KILLED);
-      task.finishedAttempts++;
+      TaskAttemptId taskAttemptId =
+          ((TaskTAttemptEvent) event).getTaskAttemptID();
+      task.handleTaskAttemptCompletion(taskAttemptId, taCompletionEventStatus);
+      task.finishedAttempts.add(taskAttemptId);
       // check whether all attempts are finished
-      if (task.finishedAttempts == task.attempts.size()) {
+      if (task.finishedAttempts.size() == task.attempts.size()) {
         if (task.historyTaskStartGenerated) {
         TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
               finalState, null); // TODO JH verify failedAttempt null
@@ -867,43 +892,57 @@ public abstract class TaskImpl implement
     }
   }
 
+  private static class KillWaitAttemptSucceededTransition extends
+      KillWaitAttemptKilledTransition {
+    public KillWaitAttemptSucceededTransition() {
+      super(TaskAttemptCompletionEventStatus.SUCCEEDED);
+    }
+  }
+
+  private static class KillWaitAttemptFailedTransition extends
+      KillWaitAttemptKilledTransition {
+    public KillWaitAttemptFailedTransition() {
+      super(TaskAttemptCompletionEventStatus.FAILED);
+    }
+  }
+
   private static class AttemptFailedTransition implements
     MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
-      task.failedAttempts++;
       TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
-      if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
+      TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID();
+      task.failedAttempts.add(taskAttemptId); 
+      if (taskAttemptId.equals(task.commitAttempt)) {
         task.commitAttempt = null;
       }
-      TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
+      TaskAttempt attempt = task.attempts.get(taskAttemptId);
       if (attempt.getAssignedContainerMgrAddress() != null) {
         //container was assigned
         task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(), 
             attempt.getAssignedContainerMgrAddress()));
       }
       
-      task.finishedAttempts++;
-      if (task.failedAttempts < task.maxAttempts) {
+      task.finishedAttempts.add(taskAttemptId);
+      if (task.failedAttempts.size() < task.maxAttempts) {
         task.handleTaskAttemptCompletion(
-            ((TaskTAttemptEvent) event).getTaskAttemptID(), 
+            taskAttemptId, 
             TaskAttemptCompletionEventStatus.FAILED);
         // we don't need a new event if we already have a spare
-        if (--task.numberUncompletedAttempts == 0
+        task.inProgressAttempts.remove(taskAttemptId);
+        if (task.inProgressAttempts.size() == 0
             && task.successfulAttempt == null) {
           task.addAndScheduleAttempt();
         }
       } else {
         task.handleTaskAttemptCompletion(
-            ((TaskTAttemptEvent) event).getTaskAttemptID(), 
+            taskAttemptId, 
             TaskAttemptCompletionEventStatus.TIPFAILED);
-        TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
-        TaskAttemptId taId = ev.getTaskAttemptID();
         
         if (task.historyTaskStartGenerated) {
         TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
-            TaskStateInternal.FAILED, taId);
+            TaskStateInternal.FAILED, taskAttemptId);
         task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
             taskFailedEvent));
         } else {
@@ -927,14 +966,12 @@ public abstract class TaskImpl implement
 
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
-      if (event instanceof TaskTAttemptEvent) {
-        TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
-        if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
-            !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
-          // don't allow a different task attempt to override a previous
-          // succeeded state
-          return TaskStateInternal.SUCCEEDED;
-        }
+      TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+      if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
+          !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
+        // don't allow a different task attempt to override a previous
+        // succeeded state
+        return TaskStateInternal.SUCCEEDED;
       }
 
       // a successful REDUCE task should not be overridden
@@ -953,7 +990,7 @@ public abstract class TaskImpl implement
       //  believe that there's no redundancy.
       unSucceed(task);
       // fake increase in Uncomplete attempts for super.transition
-      ++task.numberUncompletedAttempts;
+      task.inProgressAttempts.add(castEvent.getTaskAttemptID());
       return super.transition(task, event);
     }
 
@@ -1045,7 +1082,7 @@ public abstract class TaskImpl implement
             (attempt, "Task KILL is received. Killing attempt!");
       }
 
-      task.numberUncompletedAttempts = 0;
+      task.inProgressAttempts.clear();
     }
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1408314&r1=1408313&r2=1408314&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Mon Nov 12 15:15:52 2012
@@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
@@ -63,6 +64,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
@@ -243,6 +245,39 @@ public class MRApp extends MRAppMaster {
     return job;
   }
 
+  public void waitForInternalState(JobImpl job,
+      JobStateInternal finalState) throws Exception {
+    int timeoutSecs = 0;
+    JobStateInternal iState = job.getInternalState();
+    while (!finalState.equals(iState) && timeoutSecs++ < 20) {
+      System.out.println("Job Internal State is : " + iState
+          + " Waiting for Internal state : " + finalState);
+      Thread.sleep(500);
+      iState = job.getInternalState();
+    }
+    System.out.println("Task Internal State is : " + iState);
+    Assert.assertEquals("Task Internal state is not correct (timedout)",
+        finalState, iState);
+  }
+
+  public void waitForInternalState(TaskImpl task,
+      TaskStateInternal finalState) throws Exception {
+    int timeoutSecs = 0;
+    TaskReport report = task.getReport();
+    TaskStateInternal iState = task.getInternalState();
+    while (!finalState.equals(iState) && timeoutSecs++ < 20) {
+      System.out.println("Task Internal State is : " + iState
+          + " Waiting for Internal state : " + finalState + "   progress : "
+          + report.getProgress());
+      Thread.sleep(500);
+      report = task.getReport();
+      iState = task.getInternalState();
+    }
+    System.out.println("Task Internal State is : " + iState);
+    Assert.assertEquals("Task Internal state is not correct (timedout)",
+        finalState, iState);
+  }
+
   public void waitForInternalState(TaskAttemptImpl attempt,
       TaskAttemptStateInternal finalState) throws Exception {
     int timeoutSecs = 0;

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java?rev=1408314&r1=1408313&r2=1408314&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java Mon Nov 12 15:15:52 2012
@@ -25,12 +25,15 @@ import java.util.concurrent.CountDownLat
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -39,12 +42,18 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.junit.Test;
 
 /**
  * Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios.
  *
  */
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class TestKill {
 
   @Test
@@ -132,6 +141,80 @@ public class TestKill {
   }
 
   @Test
+  public void testKillTaskWait() throws Exception {
+    final Dispatcher dispatcher = new AsyncDispatcher() {
+      private TaskAttemptEvent cachedKillEvent;
+      @Override
+      protected void dispatch(Event event) {
+        if (event instanceof TaskAttemptEvent) {
+          TaskAttemptEvent killEvent = (TaskAttemptEvent) event;
+          if (killEvent.getType() == TaskAttemptEventType.TA_KILL) {
+            TaskAttemptId taID = killEvent.getTaskAttemptID();
+            if (taID.getTaskId().getTaskType() == TaskType.REDUCE
+                && taID.getTaskId().getId() == 0 && taID.getId() == 0) {
+              // Task is asking the reduce TA to kill itself. 'Create' a race
+              // condition. Make the task succeed and then inform the task that
+              // TA has succeeded. Once Task gets the TA succeeded event at
+              // KILL_WAIT, then relay the actual kill signal to TA
+              super.dispatch(new TaskAttemptEvent(taID,
+                TaskAttemptEventType.TA_DONE));
+              super.dispatch(new TaskAttemptEvent(taID,
+                TaskAttemptEventType.TA_CONTAINER_CLEANED));
+              super.dispatch(new TaskTAttemptEvent(taID,
+                TaskEventType.T_ATTEMPT_SUCCEEDED));
+              this.cachedKillEvent = killEvent;
+              return;
+            }
+          }
+        } else if (event instanceof TaskEvent) {
+          TaskEvent taskEvent = (TaskEvent) event;
+          if (taskEvent.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED
+              && this.cachedKillEvent != null) {
+            // When the TA comes and reports that it is done, send the
+            // cachedKillEvent
+            super.dispatch(this.cachedKillEvent);
+            return;
+          }
+
+        }
+        super.dispatch(event);
+      }
+    };
+    MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
+      @Override
+      public Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    Job job = app.submit(new Configuration());
+    JobId jobId = app.getJobId();
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask = it.next();
+    Task reduceTask = it.next();
+    app.waitForState(mapTask, TaskState.RUNNING);
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
+    TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+
+    // Finish map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapAttempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+    app.waitForState(mapTask, TaskState.SUCCEEDED);
+
+    // Now kill the job
+    app.getContext().getEventHandler()
+      .handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+
+    app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
+  }
+
+  @Test
   public void testKillTaskAttempt() throws Exception {
     final CountDownLatch latch = new CountDownLatch(1);
     MRApp app = new BlockingMRApp(2, 0, latch);