You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/12 07:32:19 UTC
git commit: TEZ-796. AM Hangs & does not kill containers when
map-task fails (bikas)
Updated Branches:
refs/heads/master 505898934 -> fea8ffa8d
TEZ-796. AM Hangs & does not kill containers when map-task fails (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/fea8ffa8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/fea8ffa8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/fea8ffa8
Branch: refs/heads/master
Commit: fea8ffa8d8223c2b03da30b2f21bc95fea824fa9
Parents: 5058989
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Feb 11 22:32:09 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Feb 11 22:32:09 2014 -0800
----------------------------------------------------------------------
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 25 ++++++-----
.../tez/dag/app/dag/impl/TestTaskImpl.java | 45 ++++++++++++++++++++
2 files changed, 60 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fea8ffa8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 463d23b..81f5c5a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -193,7 +193,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.addTransition(TaskStateInternal.KILL_WAIT,
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_KILLED,
- new KillWaitAttemptKilledTransition())
+ new KillWaitAttemptCompletedTransition())
+ .addTransition(TaskStateInternal.KILL_WAIT,
+ EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
+ TaskEventType.T_ATTEMPT_FAILED,
+ new KillWaitAttemptCompletedTransition())
+ .addTransition(TaskStateInternal.KILL_WAIT,
+ EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
+ TaskEventType.T_ATTEMPT_SUCCEEDED,
+ new KillWaitAttemptCompletedTransition())
.addTransition(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILL_WAIT,
TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
// Ignore-able transitions.
@@ -204,8 +212,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskEventType.T_TERMINATE,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE,
- TaskEventType.T_ATTEMPT_FAILED,
- TaskEventType.T_ATTEMPT_SUCCEEDED,
TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from SUCCEEDED state
@@ -987,7 +993,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
castEvent.getTaskAttemptID(),
TaskAttemptStateInternal.KILLED);
task.finishedAttempts++;
- // we don't need a new event if we already have a spare
+ // we KillWaitAttemptCompletedTransitionready have a spare
if (--task.numberUncompletedAttempts == 0
&& task.successfulAttempt == null) {
task.addAndScheduleAttempt();
@@ -996,11 +1002,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
- private static class KillWaitAttemptKilledTransition implements
+ private static class KillWaitAttemptCompletedTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
- protected TaskStateInternal finalState = TaskStateInternal.KILLED;
-
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.handleTaskAttemptCompletion(
@@ -1010,15 +1014,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// check whether all attempts are finished
if (task.finishedAttempts == task.attempts.size()) {
if (task.historyTaskStartGenerated) {
- task.logJobHistoryTaskFailedEvent(getExternalState(finalState));
+ task.logJobHistoryTaskFailedEvent(getExternalState(TaskStateInternal.KILLED));
} else {
LOG.debug("Not generating HistoryFinish event since start event not" +
" generated for task: " + task.getTaskId());
}
task.eventHandler.handle(
- new VertexEventTaskCompleted(task.taskId, getExternalState(finalState)));
- return finalState;
+ new VertexEventTaskCompleted(
+ task.taskId, getExternalState(TaskStateInternal.KILLED)));
+ return TaskStateInternal.KILLED;
}
return task.getInternalState();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fea8ffa8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index b156c11..451ba07 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -254,6 +254,51 @@ public class TestTaskImpl {
scheduleTaskAttempt(taskId);
killTask(taskId);
}
+
+ /**
+ * {@link TaskState#RUNNING}->{@link TaskState#KILLED}
+ */
+ @Test
+ public void testKillRunningTask() {
+ LOG.info("--- START: testKillRunningTask ---");
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(mockTask.getLastAttempt().getID());
+ killTask(taskId);
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+ TaskEventType.T_ATTEMPT_KILLED));
+ assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
+ }
+
+ /**
+ * {@link TaskState#RUNNING}->{@link TaskState#KILLED}
+ */
+ @Test
+ public void testKillRunningTaskButAttemptSucceeds() {
+ LOG.info("--- START: testKillRunningTaskButAttemptSucceeds ---");
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(mockTask.getLastAttempt().getID());
+ killTask(taskId);
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+ assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
+ }
+
+ /**
+ * {@link TaskState#RUNNING}->{@link TaskState#KILLED}
+ */
+ @Test
+ public void testKillRunningTaskButAttemptFails() {
+ LOG.info("--- START: testKillRunningTaskButAttemptFails ---");
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(mockTask.getLastAttempt().getID());
+ killTask(taskId);
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
+ }
@Test
/**