You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:17 UTC

[15/50] [abbrv] tez git commit: TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException: Invalid event: T_ATTEMPT_KILLED at KILLED. (hitesh)

TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException: Invalid event: T_ATTEMPT_KILLED at KILLED. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ba6d7e0e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ba6d7e0e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ba6d7e0e

Branch: refs/heads/TEZ-2003
Commit: ba6d7e0e9106ad26c48bda8bb8caa7e4a890c2b1
Parents: 5218f48
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon May 4 12:20:46 2015 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon May 4 12:20:46 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 21 ++++++--
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 53 +++++++++++++++++++-
 3 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ba6d7e0e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6c19770..0027e98 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -335,6 +335,8 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException:
+    Invalid event: T_ATTEMPT_KILLED at KILLED.
   TEZ-2397. Translation of LocalResources via Tez plan serialization can be lossy.
   TEZ-2221. VertexGroup name should be unqiue
   TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in recovery log

http://git-wip-us.apache.org/repos/asf/tez/blob/ba6d7e0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 8b63734..15382a8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -117,7 +117,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   protected final EventHandler eventHandler;
   private final TezTaskID taskId;
   private Map<TezTaskAttemptID, TaskAttempt> attempts;
-  private final int maxFailedAttempts;
+  protected final int maxFailedAttempts;
   protected final Clock clock;
   private final Vertex vertex;
   private final Lock readLock;
@@ -256,14 +256,29 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         EnumSet.of(
             TaskEventType.T_TERMINATE,
             TaskEventType.T_SCHEDULE,
-            TaskEventType.T_ADD_SPEC_ATTEMPT))
+            TaskEventType.T_ADD_SPEC_ATTEMPT,
+            TaskEventType.T_ATTEMPT_KILLED))
 
     // Transitions from KILLED state
+    // Ignorable event: T_ATTEMPT_KILLED
+    // Refer to TEZ-2379
+    // T_ATTEMPT_KILLED can show up in KILLED state as
+    // a SUCCEEDED attempt can still transition to KILLED after receiving
+    // a KILL event.
+    // This could happen when there is a race where the task receives a
+    // kill event, it tries to kill all running attempts and a potential
+    // running attempt succeeds before it receives the kill event.
+    // The task will then receive both a SUCCEEDED and KILLED
+    // event from the same attempt.
+    // Duplicate events from a single attempt in KILL_WAIT are handled
+    // properly. However, the subsequent T_ATTEMPT_KILLED event might
+    // be received after the task reaches its terminal KILLED state.
     .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
         EnumSet.of(
             TaskEventType.T_TERMINATE,
             TaskEventType.T_SCHEDULE,
-            TaskEventType.T_ADD_SPEC_ATTEMPT))
+            TaskEventType.T_ADD_SPEC_ATTEMPT,
+            TaskEventType.T_ATTEMPT_KILLED))
 
     // create the topology tables
     .installTopology();

http://git-wip-us.apache.org/repos/asf/tez/blob/ba6d7e0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 66e6724..9da3fab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -177,6 +177,12 @@ public class TestTaskImpl {
     assertTaskKillWaitState();
   }
 
+  private void failTask(TezTaskID taskId) {
+    mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
+    assertTaskKillWaitState();
+  }
+
+
   private void killScheduledTaskAttempt(TezTaskAttemptID attemptId) {
     mockTask.handle(new TaskEventTAUpdate(attemptId,
         TaskEventType.T_ATTEMPT_KILLED));
@@ -289,7 +295,6 @@ public class TestTaskImpl {
     killTask(taskId);
     mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
         TaskEventType.T_ATTEMPT_KILLED));
-
     assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
     verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
   }
@@ -377,6 +382,52 @@ public class TestTaskImpl {
     killRunningTaskAttempt(mockTask.getLastAttempt().getID());
   }
 
+  /**
+   * {@link TaskState#KILLED}->{@link TaskState#KILLED}
+   */
+  @Test(timeout = 5000)
+  public void testKilledAttemptAtTaskKilled() {
+    LOG.info("--- START: testKilledAttemptAtTaskKilled ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    killTask(taskId);
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
+
+    // Send duplicate kill for same attempt
+    // This will not happen in practice but this is to simulate handling
+    // of killed attempts in killed state.
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
+
+  }
+
+  /**
+   * {@link TaskState#FAILED}->{@link TaskState#FAILED}
+   */
+  @Test(timeout = 5000)
+  public void testKilledAttemptAtTaskFailed() {
+    LOG.info("--- START: testKilledAttemptAtTaskFailed ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    for (int i = 0; i < mockTask.maxFailedAttempts; ++i) {
+      mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+          TaskEventType.T_ATTEMPT_FAILED));
+    }
+    assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
+
+    // Send kill for an attempt
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
+
+  }
+
+
+
   @Test(timeout = 5000)
   public void testFetchedEventsModifyUnderlyingList() {
     // Tests to ensure that adding an event to a task, does not affect the