You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/12 07:32:19 UTC

git commit: TEZ-796. AM Hangs & does not kill containers when map-task fails (bikas)

Updated Branches:
  refs/heads/master 505898934 -> fea8ffa8d


TEZ-796. AM Hangs & does not kill containers when map-task fails (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/fea8ffa8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/fea8ffa8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/fea8ffa8

Branch: refs/heads/master
Commit: fea8ffa8d8223c2b03da30b2f21bc95fea824fa9
Parents: 5058989
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Feb 11 22:32:09 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Feb 11 22:32:09 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 25 ++++++-----
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 45 ++++++++++++++++++++
 2 files changed, 60 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fea8ffa8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 463d23b..81f5c5a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -193,7 +193,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(TaskStateInternal.KILL_WAIT,
         EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
         TaskEventType.T_ATTEMPT_KILLED,
-        new KillWaitAttemptKilledTransition())
+        new KillWaitAttemptCompletedTransition())
+    .addTransition(TaskStateInternal.KILL_WAIT,
+        EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
+        TaskEventType.T_ATTEMPT_FAILED,
+        new KillWaitAttemptCompletedTransition())
+    .addTransition(TaskStateInternal.KILL_WAIT,
+        EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
+        TaskEventType.T_ATTEMPT_SUCCEEDED,
+        new KillWaitAttemptCompletedTransition())
     .addTransition(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILL_WAIT,
         TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
     // Ignore-able transitions.
@@ -204,8 +212,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             TaskEventType.T_TERMINATE,
             TaskEventType.T_ATTEMPT_LAUNCHED,
             TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE,
-            TaskEventType.T_ATTEMPT_FAILED,
-            TaskEventType.T_ATTEMPT_SUCCEEDED,
             TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from SUCCEEDED state
@@ -987,7 +993,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
           castEvent.getTaskAttemptID(),
           TaskAttemptStateInternal.KILLED);
       task.finishedAttempts++;
-      // we don't need a new event if we already have a spare
+      // we KillWaitAttemptCompletedTransitionready have a spare
       if (--task.numberUncompletedAttempts == 0
           && task.successfulAttempt == null) {
         task.addAndScheduleAttempt();
@@ -996,11 +1002,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
 
-  private static class KillWaitAttemptKilledTransition implements
+  private static class KillWaitAttemptCompletedTransition implements
       MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
-    protected TaskStateInternal finalState = TaskStateInternal.KILLED;
-
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       task.handleTaskAttemptCompletion(
@@ -1010,15 +1014,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       // check whether all attempts are finished
       if (task.finishedAttempts == task.attempts.size()) {
         if (task.historyTaskStartGenerated) {
-          task.logJobHistoryTaskFailedEvent(getExternalState(finalState));
+          task.logJobHistoryTaskFailedEvent(getExternalState(TaskStateInternal.KILLED));
         } else {
           LOG.debug("Not generating HistoryFinish event since start event not" +
           		" generated for task: " + task.getTaskId());
         }
 
         task.eventHandler.handle(
-            new VertexEventTaskCompleted(task.taskId, getExternalState(finalState)));
-        return finalState;
+            new VertexEventTaskCompleted(
+                task.taskId, getExternalState(TaskStateInternal.KILLED)));
+        return TaskStateInternal.KILLED;
       }
       return task.getInternalState();
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fea8ffa8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index b156c11..451ba07 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -254,6 +254,51 @@ public class TestTaskImpl {
     scheduleTaskAttempt(taskId);
     killTask(taskId);
   }
+  
+  /**
+   * {@link TaskState#RUNNING}->{@link TaskState#KILLED}
+   */
+  @Test
+  public void testKillRunningTask() {
+    LOG.info("--- START: testKillRunningTask ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    killTask(taskId);
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
+  }
+  
+  /**
+   * {@link TaskState#RUNNING}->{@link TaskState#KILLED}
+   */
+  @Test
+  public void testKillRunningTaskButAttemptSucceeds() {
+    LOG.info("--- START: testKillRunningTaskButAttemptSucceeds ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    killTask(taskId);
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
+  }
+  
+  /**
+   * {@link TaskState#RUNNING}->{@link TaskState#KILLED}
+   */
+  @Test
+  public void testKillRunningTaskButAttemptFails() {
+    LOG.info("--- START: testKillRunningTaskButAttemptFails ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    killTask(taskId);
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
+  }
 
   @Test
   /**