You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/05/12 22:49:41 UTC

[24/36] hadoop git commit: MAPREDUCE-5465. Tasks are often killed before they exit on their own. Contributed by Ming Ma

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index 1807c1c..79b88d8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -407,6 +408,7 @@ public class TestTaskAttempt{
     Resource resource = mock(Resource.class);
     when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
     when(resource.getMemory()).thenReturn(1024);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -464,6 +466,7 @@ public class TestTaskAttempt{
     Resource resource = mock(Resource.class);
     when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
     when(resource.getMemory()).thenReturn(1024);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -524,6 +527,7 @@ public class TestTaskAttempt{
     Resource resource = mock(Resource.class);
     when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
     when(resource.getMemory()).thenReturn(1024);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -546,7 +550,7 @@ public class TestTaskAttempt{
     taImpl.handle(new TaskAttemptEvent(attemptId,
         TaskAttemptEventType.TA_DONE));
     taImpl.handle(new TaskAttemptEvent(attemptId,
-        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+        TaskAttemptEventType.TA_CONTAINER_COMPLETED));
 
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
@@ -593,6 +597,7 @@ public class TestTaskAttempt{
     Resource resource = mock(Resource.class);
     when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
     when(resource.getMemory()).thenReturn(1024);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
     TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
         jobFile, 1, splits, jobConf, taListener,
@@ -641,6 +646,7 @@ public class TestTaskAttempt{
     Resource resource = mock(Resource.class);
     when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
     when(resource.getMemory()).thenReturn(1024);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -663,7 +669,7 @@ public class TestTaskAttempt{
     taImpl.handle(new TaskAttemptEvent(attemptId,
       TaskAttemptEventType.TA_DONE));
     taImpl.handle(new TaskAttemptEvent(attemptId,
-      TaskAttemptEventType.TA_CONTAINER_CLEANED));
+      TaskAttemptEventType.TA_CONTAINER_COMPLETED));
 
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
       TaskAttemptState.SUCCEEDED);
@@ -708,6 +714,7 @@ public class TestTaskAttempt{
     Resource resource = mock(Resource.class);
     when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
     when(resource.getMemory()).thenReturn(1024);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
     TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
         jobFile, 1, splits, jobConf, taListener,
@@ -753,6 +760,7 @@ public class TestTaskAttempt{
 	AppContext appCtx = mock(AppContext.class);
 	ClusterInfo clusterInfo = mock(ClusterInfo.class);
 	when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+  setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
 
 	TaskAttemptImpl taImpl =
 	  new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -774,7 +782,7 @@ public class TestTaskAttempt{
 	taImpl.handle(new TaskAttemptEvent(attemptId,
 	    TaskAttemptEventType.TA_DONE));
 	taImpl.handle(new TaskAttemptEvent(attemptId,
-	    TaskAttemptEventType.TA_CONTAINER_CLEANED));
+	    TaskAttemptEventType.TA_CONTAINER_COMPLETED));
 	    
 	assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
 		      TaskAttemptState.SUCCEEDED);
@@ -967,6 +975,255 @@ public class TestTaskAttempt{
         taImpl.getInternalState());
   }
 
+
+  @Test
+  public void testKillMapTaskWhileSuccessFinishing() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in SUCCEEDED state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    assertEquals("Task attempt's internal state is not " +
+        "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
+
+    // If the map task is killed when it is in SUCCESS_FINISHING_CONTAINER
+    // state, the state will move to KILL_CONTAINER_CLEANUP
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_KILL));
+    assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+        TaskAttemptState.KILLED);
+    assertEquals("Task attempt's internal state is not KILL_CONTAINER_CLEANUP",
+        taImpl.getInternalState(),
+        TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    assertEquals("Task attempt's internal state is not KILL_TASK_CLEANUP",
+        taImpl.getInternalState(),
+        TaskAttemptStateInternal.KILL_TASK_CLEANUP);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CLEANUP_DONE));
+
+    assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+        TaskAttemptState.KILLED);
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
+  @Test
+  public void testKillMapTaskWhileFailFinishing() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_FAILMSG));
+
+    assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    assertEquals("Task attempt's internal state is not " +
+        "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
+
+    // If the map task is killed when it is in FAIL_FINISHING_CONTAINER state,
+    // the state will stay in FAIL_FINISHING_CONTAINER.
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_KILL));
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    assertEquals("Task attempt's internal state is not " +
+        "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_TIMED_OUT));
+    assertEquals("Task attempt's internal state is not FAIL_CONTAINER_CLEANUP",
+        taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    assertEquals("Task attempt's internal state is not FAIL_TASK_CLEANUP",
+        taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_TASK_CLEANUP);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CLEANUP_DONE));
+
+    assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
+  @Test
+  public void testFailMapTaskByClient() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_FAILMSG_BY_CLIENT));
+
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    assertEquals("Task attempt's internal state is not " +
+        "FAIL_CONTAINER_CLEANUP", taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    assertEquals("Task attempt's internal state is not FAIL_TASK_CLEANUP",
+        taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_TASK_CLEANUP);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CLEANUP_DONE));
+
+    assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
+  @Test
+  public void testTaskAttemptDiagnosticEventOnFinishing() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    assertEquals("Task attempt's internal state is not " +
+        "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
+
+    // TA_DIAGNOSTICS_UPDATE doesn't change state
+    taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(taImpl.getID(),
+        "Task got updated"));
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    assertEquals("Task attempt's internal state is not " +
+        "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
+  @Test
+  public void testTimeoutWhileSuccessFinishing() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    assertEquals("Task attempt's internal state is not " +
+        "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
+
+    // If the task stays in SUCCESS_FINISHING_CONTAINER for too long,
+    // TaskAttemptListenerImpl will time out the attempt.
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_TIMED_OUT));
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    assertEquals("Task attempt's internal state is not " +
+        "SUCCESS_CONTAINER_CLEANUP", taImpl.getInternalState(),
+        TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP);
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
+  @Test
+  public void testTimeoutWhileFailFinishing() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_FAILMSG));
+
+    assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    assertEquals("Task attempt's internal state is not " +
+        "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
+
+    // If the task stays in FAIL_FINISHING_CONTAINER for too long,
+    // TaskAttemptListenerImpl will time out the attempt.
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_TIMED_OUT));
+    assertEquals("Task attempt's internal state is not FAIL_CONTAINER_CLEANUP",
+        taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
+  private void setupTaskAttemptFinishingMonitor(
+      EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
+    TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
+        new TaskAttemptFinishingMonitor(eventHandler);
+    taskAttemptFinishingMonitor.init(jobConf);
+    when(appCtx.getTaskAttemptFinishingMonitor()).
+        thenReturn(taskAttemptFinishingMonitor);
+  }
+
+  private TaskAttemptImpl createTaskAttemptImpl(
+      MockEventHandler eventHandler) {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
+
+    TaskAttemptImpl taImpl =
+        new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+            splits, jobConf, taListener,
+            mock(Token.class), new Credentials(),
+            new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+        container, mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    return taImpl;
+  }
+
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index bc31bb5..1c40632 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -235,7 +235,15 @@ public interface MRJobConfig {
   public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
 
   public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.timeout.check-interval-ms";
-  
+
+  public static final String TASK_EXIT_TIMEOUT = "mapreduce.task.exit.timeout";
+
+  public static final int TASK_EXIT_TIMEOUT_DEFAULT = 60 * 1000;
+
+  public static final String TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.exit.timeout.check-interval-ms";
+
+  public static final int TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT = 20 * 1000;
+
   public static final String TASK_ID = "mapreduce.task.id";
 
   public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index fb761ba..a9e7618 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1678,4 +1678,24 @@
     app master.
   </description>
 </property>
+
+<property>
+  <name>mapreduce.task.exit.timeout</name>
+  <value>60000</value>
+  <description>The number of milliseconds before a task will be
+  terminated if it stays in finishing state for too long.
+  After a task attempt completes from TaskUmbilicalProtocol's point of view,
+  it will be transitioned to finishing state. That will give a chance for the
+  task to exit by itself.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.task.exit.timeout.check-interval-ms</name>
+  <value>20000</value>
+  <description>The interval in milliseconds between which the MR framework
+  checks if task attempts stay in finishing state for too long.
+  </description>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
index 194b85a..41bc90a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
 import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
@@ -399,4 +400,9 @@ public class JobHistory extends AbstractService implements HistoryContext {
     // bogus - Not Required
     return null;
   }
+
+  @Override
+  public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
index d2edd19..5ce2761 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
@@ -102,7 +102,7 @@ public class TestSpeculativeExecutionWithMRApp {
           appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
             TaskAttemptEventType.TA_DONE));
           appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
-            TaskAttemptEventType.TA_CONTAINER_CLEANED));
+            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
           app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED);
         }
       }
@@ -170,7 +170,7 @@ public class TestSpeculativeExecutionWithMRApp {
           appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
             TaskAttemptEventType.TA_DONE));
           appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
-            TaskAttemptEventType.TA_CONTAINER_CLEANED));
+            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
           numTasksToFinish--;
           app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED);
         } else {
@@ -228,7 +228,7 @@ public class TestSpeculativeExecutionWithMRApp {
     appEventHandler.handle(
         new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE));
     appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(),
-        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+        TaskAttemptEventType.TA_CONTAINER_COMPLETED));
     return ta;
   }