You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/23 05:40:51 UTC
[17/50] [abbrv] hadoop git commit: MAPREDUCE-5982. Task attempts that
fail from the ASSIGNED state can disappear. Contributed by Chang Li
MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can disappear. Contributed by Chang Li
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ee4ee6af
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ee4ee6af
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ee4ee6af
Branch: refs/heads/HDFS-7285
Commit: ee4ee6af6a5a6299d27462adb6944206039bbbae
Parents: 9eee975
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Sep 17 21:37:39 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Sep 17 21:37:39 2015 +0000
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../v2/app/job/impl/TaskAttemptImpl.java | 92 +++++------
.../apache/hadoop/mapreduce/v2/app/MRApp.java | 11 +-
.../v2/app/job/impl/TestTaskAttempt.java | 154 +++++++++++++++++++
4 files changed, 213 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee4ee6af/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 6cf7abb..cd84a34 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -573,6 +573,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-5002. AM could potentially allocate a reduce container to a map
attempt (Chang Li via jlowe)
+ MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can
+ disappear (Chang Li via jlowe)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee4ee6af/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 77a7555..a7becdb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -1484,6 +1484,19 @@ public abstract class TaskAttemptImpl implements
return tauce;
}
+ private static void
+ sendJHStartEventForAssignedFailTask(TaskAttemptImpl taskAttempt) {
+ TaskAttemptContainerLaunchedEvent event;
+ taskAttempt.launchTime = taskAttempt.clock.getTime();
+
+ InetSocketAddress nodeHttpInetAddr =
+ NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
+ taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
+ taskAttempt.httpPort = nodeHttpInetAddr.getPort();
+ taskAttempt.sendLaunchedEvents();
+ }
+
+
@SuppressWarnings("unchecked")
private void sendLaunchedEvents() {
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId()
@@ -1681,6 +1694,9 @@ public abstract class TaskAttemptImpl implements
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
+ if (taskAttempt.getLaunchTime() == 0) {
+ sendJHStartEventForAssignedFailTask(taskAttempt);
+ }
//set the finish time
taskAttempt.setFinishTime();
@@ -1715,23 +1731,19 @@ public abstract class TaskAttemptImpl implements
default:
LOG.error("Task final state is not FAILED or KILLED: " + finalState);
}
- if (taskAttempt.getLaunchTime() != 0) {
- TaskAttemptUnsuccessfulCompletionEvent tauce =
- createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
- finalState);
- if(finalState == TaskAttemptStateInternal.FAILED) {
- taskAttempt.eventHandler
- .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
- } else if(finalState == TaskAttemptStateInternal.KILLED) {
- taskAttempt.eventHandler
- .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
- }
- taskAttempt.eventHandler.handle(new JobHistoryEvent(
- taskAttempt.attemptId.getTaskId().getJobId(), tauce));
- } else {
- LOG.debug("Not generating HistoryFinish event since start event not " +
- "generated for taskAttempt: " + taskAttempt.getID());
+
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+ finalState);
+ if(finalState == TaskAttemptStateInternal.FAILED) {
+ taskAttempt.eventHandler
+ .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
+ } else if(finalState == TaskAttemptStateInternal.KILLED) {
+ taskAttempt.eventHandler
+ .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
}
+ taskAttempt.eventHandler.handle(new JobHistoryEvent(
+ taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}
}
@@ -2023,27 +2035,25 @@ public abstract class TaskAttemptImpl implements
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
+ if (taskAttempt.getLaunchTime() == 0) {
+ sendJHStartEventForAssignedFailTask(taskAttempt);
+ }
//set the finish time
taskAttempt.setFinishTime();
- if (taskAttempt.getLaunchTime() != 0) {
- taskAttempt.eventHandler
- .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
- TaskAttemptUnsuccessfulCompletionEvent tauce =
- createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
- TaskAttemptStateInternal.KILLED);
- taskAttempt.eventHandler.handle(new JobHistoryEvent(
- taskAttempt.attemptId.getTaskId().getJobId(), tauce));
- }else {
- LOG.debug("Not generating HistoryFinish event since start event not " +
- "generated for taskAttempt: " + taskAttempt.getID());
- }
+
+ taskAttempt.eventHandler
+ .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+ TaskAttemptStateInternal.KILLED);
+ taskAttempt.eventHandler.handle(new JobHistoryEvent(
+ taskAttempt.attemptId.getTaskId().getJobId(), tauce));
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
-// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED));
@@ -2178,23 +2188,19 @@ public abstract class TaskAttemptImpl implements
@SuppressWarnings("unchecked")
private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) {
+ if (taskAttempt.getLaunchTime() == 0) {
+ sendJHStartEventForAssignedFailTask(taskAttempt);
+ }
// set the finish time
taskAttempt.setFinishTime();
+ taskAttempt.eventHandler
+ .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+ TaskAttemptStateInternal.FAILED);
+ taskAttempt.eventHandler.handle(new JobHistoryEvent(
+ taskAttempt.attemptId.getTaskId().getJobId(), tauce));
- if (taskAttempt.getLaunchTime() != 0) {
- taskAttempt.eventHandler
- .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
- TaskAttemptUnsuccessfulCompletionEvent tauce =
- createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
- TaskAttemptStateInternal.FAILED);
- taskAttempt.eventHandler.handle(new JobHistoryEvent(
- taskAttempt.attemptId.getTaskId().getJobId(), tauce));
- // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
- // handling failed map/reduce events.
- }else {
- LOG.debug("Not generating HistoryFinish event since start event not " +
- "generated for taskAttempt: " + taskAttempt.getID());
- }
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee4ee6af/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
index b51adf2..f0c10d3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
@@ -544,10 +544,7 @@ public class MRApp extends MRAppMaster {
public void handle(ContainerLauncherEvent event) {
switch (event.getType()) {
case CONTAINER_REMOTE_LAUNCH:
- getContext().getEventHandler().handle(
- new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(),
- shufflePort));
-
+ containerLaunched(event.getTaskAttemptID(), shufflePort);
attemptLaunched(event.getTaskAttemptID());
break;
case CONTAINER_REMOTE_CLEANUP:
@@ -561,6 +558,12 @@ public class MRApp extends MRAppMaster {
}
}
+ protected void containerLaunched(TaskAttemptId attemptID, int shufflePort) {
+ getContext().getEventHandler().handle(
+ new TaskAttemptContainerLaunchedEvent(attemptID,
+ shufflePort));
+ }
+
protected void attemptLaunched(TaskAttemptId attemptID) {
if (autoComplete) {
// send the done event
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee4ee6af/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index a88a935..6b4656a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -115,6 +115,69 @@ public class TestTaskAttempt{
}
@Test
+ public void testMRAppHistoryForTAFailedInAssigned() throws Exception {
+ // test TA_CONTAINER_LAUNCH_FAILED for map
+ FailingAttemptsDuringAssignedMRApp app =
+ new FailingAttemptsDuringAssignedMRApp(1, 0,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
+ testTaskAttemptAssignedFailHistory(app);
+
+ // test TA_CONTAINER_LAUNCH_FAILED for reduce
+ app =
+ new FailingAttemptsDuringAssignedMRApp(0, 1,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
+ testTaskAttemptAssignedFailHistory(app);
+
+ // test TA_CONTAINER_COMPLETED for map
+ app =
+ new FailingAttemptsDuringAssignedMRApp(1, 0,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED);
+ testTaskAttemptAssignedFailHistory(app);
+
+ // test TA_CONTAINER_COMPLETED for reduce
+ app =
+ new FailingAttemptsDuringAssignedMRApp(0, 1,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED);
+ testTaskAttemptAssignedFailHistory(app);
+
+ // test TA_FAILMSG for map
+ app =
+ new FailingAttemptsDuringAssignedMRApp(1, 0,
+ TaskAttemptEventType.TA_FAILMSG);
+ testTaskAttemptAssignedFailHistory(app);
+
+ // test TA_FAILMSG for reduce
+ app =
+ new FailingAttemptsDuringAssignedMRApp(0, 1,
+ TaskAttemptEventType.TA_FAILMSG);
+ testTaskAttemptAssignedFailHistory(app);
+
+ // test TA_FAILMSG_BY_CLIENT for map
+ app =
+ new FailingAttemptsDuringAssignedMRApp(1, 0,
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT);
+ testTaskAttemptAssignedFailHistory(app);
+
+ // test TA_FAILMSG_BY_CLIENT for reduce
+ app =
+ new FailingAttemptsDuringAssignedMRApp(0, 1,
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT);
+ testTaskAttemptAssignedFailHistory(app);
+
+ // test TA_KILL for map
+ app =
+ new FailingAttemptsDuringAssignedMRApp(1, 0,
+ TaskAttemptEventType.TA_KILL);
+ testTaskAttemptAssignedKilledHistory(app);
+
+ // test TA_KILL for reduce
+ app =
+ new FailingAttemptsDuringAssignedMRApp(0, 1,
+ TaskAttemptEventType.TA_KILL);
+ testTaskAttemptAssignedKilledHistory(app);
+ }
+
+ @Test
public void testSingleRackRequest() throws Exception {
TaskAttemptImpl.RequestContainerTransition rct =
new TaskAttemptImpl.RequestContainerTransition(false);
@@ -301,6 +364,31 @@ public class TestTaskAttempt{
report.getTaskAttemptState());
}
+ private void testTaskAttemptAssignedFailHistory
+ (FailingAttemptsDuringAssignedMRApp app) throws Exception {
+ Configuration conf = new Configuration();
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.FAILED);
+ Map<TaskId, Task> tasks = job.getTasks();
+ Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent());
+ Assert.assertTrue("No Ta Failed JH Event", app.getTaFailedJHEvent());
+ }
+
+ private void testTaskAttemptAssignedKilledHistory
+ (FailingAttemptsDuringAssignedMRApp app) throws Exception {
+ Configuration conf = new Configuration();
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Map<TaskId, Task> tasks = job.getTasks();
+ Task task = tasks.values().iterator().next();
+ app.waitForState(task, TaskState.SCHEDULED);
+ Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
+ TaskAttempt attempt = attempts.values().iterator().next();
+ app.waitForState(attempt, TaskAttemptState.KILLED);
+ Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent());
+ Assert.assertTrue("No Ta Killed JH Event", app.getTaKilledJHEvent());
+ }
+
static class FailingAttemptsMRApp extends MRApp {
FailingAttemptsMRApp(int maps, int reduces) {
super(maps, reduces, true, "FailingAttemptsMRApp", true);
@@ -331,6 +419,72 @@ public class TestTaskAttempt{
}
}
+ static class FailingAttemptsDuringAssignedMRApp extends MRApp {
+ FailingAttemptsDuringAssignedMRApp(int maps, int reduces,
+ TaskAttemptEventType event) {
+ super(maps, reduces, true, "FailingAttemptsMRApp", true);
+ sendFailEvent = event;
+ }
+
+ TaskAttemptEventType sendFailEvent;
+
+ @Override
+ protected void containerLaunched(TaskAttemptId attemptID,
+ int shufflePort) {
+ //do nothing, not send TA_CONTAINER_LAUNCHED event
+ }
+
+ @Override
+ protected void attemptLaunched(TaskAttemptId attemptID) {
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID, sendFailEvent));
+ }
+
+ private boolean receiveTaStartJHEvent = false;
+ private boolean receiveTaFailedJHEvent = false;
+ private boolean receiveTaKilledJHEvent = false;
+
+ public boolean getTaStartJHEvent(){
+ return receiveTaStartJHEvent;
+ }
+
+ public boolean getTaFailedJHEvent(){
+ return receiveTaFailedJHEvent;
+ }
+
+ public boolean getTaKilledJHEvent(){
+ return receiveTaKilledJHEvent;
+ }
+
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+ AppContext context) {
+ return new EventHandler<JobHistoryEvent>() {
+ @Override
+ public void handle(JobHistoryEvent event) {
+ if (event.getType() == org.apache.hadoop.mapreduce.jobhistory.
+ EventType.MAP_ATTEMPT_FAILED) {
+ receiveTaFailedJHEvent = true;
+ } else if (event.getType() == org.apache.hadoop.mapreduce.
+ jobhistory.EventType.MAP_ATTEMPT_KILLED) {
+ receiveTaKilledJHEvent = true;
+ } else if (event.getType() == org.apache.hadoop.mapreduce.
+ jobhistory.EventType.MAP_ATTEMPT_STARTED) {
+ receiveTaStartJHEvent = true;
+ } else if (event.getType() == org.apache.hadoop.mapreduce.
+ jobhistory.EventType.REDUCE_ATTEMPT_FAILED) {
+ receiveTaFailedJHEvent = true;
+ } else if (event.getType() == org.apache.hadoop.mapreduce.
+ jobhistory.EventType.REDUCE_ATTEMPT_KILLED) {
+ receiveTaKilledJHEvent = true;
+ } else if (event.getType() == org.apache.hadoop.mapreduce.
+ jobhistory.EventType.REDUCE_ATTEMPT_STARTED) {
+ receiveTaStartJHEvent = true;
+ }
+ }
+ };
+ }
+ }
+
@Test
public void testLaunchFailedWhileKilling() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);