You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/05/12 22:49:41 UTC
[24/36] hadoop git commit: MAPREDUCE-5465. Tasks are often killed
before they exit on their own. Contributed by Ming Ma
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index 1807c1c..79b88d8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -407,6 +408,7 @@ public class TestTaskAttempt{
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
+ setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -464,6 +466,7 @@ public class TestTaskAttempt{
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
+ setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -524,6 +527,7 @@ public class TestTaskAttempt{
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
+ setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -546,7 +550,7 @@ public class TestTaskAttempt{
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_DONE));
taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
@@ -593,6 +597,7 @@ public class TestTaskAttempt{
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
+ setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener,
@@ -641,6 +646,7 @@ public class TestTaskAttempt{
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
+ setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -663,7 +669,7 @@ public class TestTaskAttempt{
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_DONE));
taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
@@ -708,6 +714,7 @@ public class TestTaskAttempt{
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
+ setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener,
@@ -753,6 +760,7 @@ public class TestTaskAttempt{
AppContext appCtx = mock(AppContext.class);
ClusterInfo clusterInfo = mock(ClusterInfo.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@@ -774,7 +782,7 @@ public class TestTaskAttempt{
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_DONE));
taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
@@ -967,6 +975,255 @@ public class TestTaskAttempt{
taImpl.getInternalState());
}
+
+ @Test
+ public void testKillMapTaskWhileSuccessFinishing() throws Exception {
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ assertEquals("Task attempt is not in SUCCEEDED state", taImpl.getState(),
+ TaskAttemptState.SUCCEEDED);
+ assertEquals("Task attempt's internal state is not " +
+ "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
+ TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
+
+ // If the map task is killed when it is in SUCCESS_FINISHING_CONTAINER
+ // state, the state will move to KILL_CONTAINER_CLEANUP
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_KILL));
+ assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+ TaskAttemptState.KILLED);
+ assertEquals("Task attempt's internal state is not KILL_CONTAINER_CLEANUP",
+ taImpl.getInternalState(),
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP);
+
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ assertEquals("Task attempt's internal state is not KILL_TASK_CLEANUP",
+ taImpl.getInternalState(),
+ TaskAttemptStateInternal.KILL_TASK_CLEANUP);
+
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_CLEANUP_DONE));
+
+ assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+ TaskAttemptState.KILLED);
+
+ assertFalse("InternalError occurred", eventHandler.internalError);
+ }
+
+ @Test
+ public void testKillMapTaskWhileFailFinishing() throws Exception {
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_FAILMSG));
+
+ assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
+ TaskAttemptState.FAILED);
+ assertEquals("Task attempt's internal state is not " +
+ "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(),
+ TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
+
+ // If the map task is killed when it is in FAIL_FINISHING_CONTAINER state,
+ // the state will stay in FAIL_FINISHING_CONTAINER.
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_KILL));
+ assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+ TaskAttemptState.FAILED);
+ assertEquals("Task attempt's internal state is not " +
+ "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(),
+ TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
+
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_TIMED_OUT));
+ assertEquals("Task attempt's internal state is not FAIL_CONTAINER_CLEANUP",
+ taImpl.getInternalState(),
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
+
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ assertEquals("Task attempt's internal state is not FAIL_TASK_CLEANUP",
+ taImpl.getInternalState(),
+ TaskAttemptStateInternal.FAIL_TASK_CLEANUP);
+
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_CLEANUP_DONE));
+
+ assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+ TaskAttemptState.FAILED);
+
+ assertFalse("InternalError occurred", eventHandler.internalError);
+ }
+
+ @Test
+ public void testFailMapTaskByClient() throws Exception {
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT));
+
+ assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+ TaskAttemptState.FAILED);
+ assertEquals("Task attempt's internal state is not " +
+ "FAIL_CONTAINER_CLEANUP", taImpl.getInternalState(),
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
+
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ assertEquals("Task attempt's internal state is not FAIL_TASK_CLEANUP",
+ taImpl.getInternalState(),
+ TaskAttemptStateInternal.FAIL_TASK_CLEANUP);
+
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_CLEANUP_DONE));
+
+ assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
+ TaskAttemptState.FAILED);
+
+ assertFalse("InternalError occurred", eventHandler.internalError);
+ }
+
+ @Test
+ public void testTaskAttemptDiagnosticEventOnFinishing() throws Exception {
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+ TaskAttemptState.SUCCEEDED);
+ assertEquals("Task attempt's internal state is not " +
+ "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
+ TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
+
+ // TA_DIAGNOSTICS_UPDATE doesn't change state
+ taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(taImpl.getID(),
+ "Task got updated"));
+ assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+ TaskAttemptState.SUCCEEDED);
+ assertEquals("Task attempt's internal state is not " +
+ "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
+ TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
+
+ assertFalse("InternalError occurred", eventHandler.internalError);
+ }
+
+ @Test
+ public void testTimeoutWhileSuccessFinishing() throws Exception {
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+ TaskAttemptState.SUCCEEDED);
+ assertEquals("Task attempt's internal state is not " +
+ "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
+ TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
+
+ // If the task stays in SUCCESS_FINISHING_CONTAINER for too long,
+ // TaskAttemptListenerImpl will time out the attempt.
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_TIMED_OUT));
+ assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+ TaskAttemptState.SUCCEEDED);
+ assertEquals("Task attempt's internal state is not " +
+ "SUCCESS_CONTAINER_CLEANUP", taImpl.getInternalState(),
+ TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP);
+
+ assertFalse("InternalError occurred", eventHandler.internalError);
+ }
+
+ @Test
+ public void testTimeoutWhileFailFinishing() throws Exception {
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
+
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_FAILMSG));
+
+ assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
+ TaskAttemptState.FAILED);
+ assertEquals("Task attempt's internal state is not " +
+ "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(),
+ TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
+
+ // If the task stays in FAIL_FINISHING_CONTAINER for too long,
+ // TaskAttemptListenerImpl will time out the attempt.
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+ TaskAttemptEventType.TA_TIMED_OUT));
+ assertEquals("Task attempt's internal state is not FAIL_CONTAINER_CLEANUP",
+ taImpl.getInternalState(),
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
+
+ assertFalse("InternalError occurred", eventHandler.internalError);
+ }
+
+ private void setupTaskAttemptFinishingMonitor(
+ EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
+ TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
+ new TaskAttemptFinishingMonitor(eventHandler);
+ taskAttemptFinishingMonitor.init(jobConf);
+ when(appCtx.getTaskAttemptFinishingMonitor()).
+ thenReturn(taskAttemptFinishingMonitor);
+ }
+
+ private TaskAttemptImpl createTaskAttemptImpl(
+ MockEventHandler eventHandler) {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
+
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ splits, jobConf, taListener,
+ mock(Token.class), new Credentials(),
+ new SystemClock(), appCtx);
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+ container, mock(Map.class)));
+ taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ return taImpl;
+ }
+
public static class MockEventHandler implements EventHandler {
public boolean internalError;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index bc31bb5..1c40632 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -235,7 +235,15 @@ public interface MRJobConfig {
public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.timeout.check-interval-ms";
-
+
+ public static final String TASK_EXIT_TIMEOUT = "mapreduce.task.exit.timeout";
+
+ public static final int TASK_EXIT_TIMEOUT_DEFAULT = 60 * 1000;
+
+ public static final String TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.exit.timeout.check-interval-ms";
+
+ public static final int TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT = 20 * 1000;
+
public static final String TASK_ID = "mapreduce.task.id";
public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index fb761ba..a9e7618 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1678,4 +1678,24 @@
app master.
</description>
</property>
+
+<property>
+ <name>mapreduce.task.exit.timeout</name>
+ <value>60000</value>
+ <description>The number of milliseconds before a task will be
+ terminated if it stays in finishing state for too long.
+ After a task attempt completes from TaskUmbilicalProtocol's point of view,
+ it will be transitioned to finishing state. That will give a chance for the
+ task to exit by itself.
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.task.exit.timeout.check-interval-ms</name>
+ <value>20000</value>
+ <description>The interval in milliseconds between which the MR framework
+ checks if task attempts stay in finishing state for too long.
+ </description>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
index 194b85a..41bc90a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
@@ -399,4 +400,9 @@ public class JobHistory extends AbstractService implements HistoryContext {
// bogus - Not Required
return null;
}
+
+ @Override
+ public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
index d2edd19..5ce2761 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
@@ -102,7 +102,7 @@ public class TestSpeculativeExecutionWithMRApp {
appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
TaskAttemptEventType.TA_DONE));
appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
- TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED);
}
}
@@ -170,7 +170,7 @@ public class TestSpeculativeExecutionWithMRApp {
appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
TaskAttemptEventType.TA_DONE));
appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
- TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
numTasksToFinish--;
app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED);
} else {
@@ -228,7 +228,7 @@ public class TestSpeculativeExecutionWithMRApp {
appEventHandler.handle(
new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE));
appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(),
- TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
return ta;
}