You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:17 UTC
[15/50] [abbrv] tez git commit: TEZ-2379.
org.apache.hadoop.yarn.state.InvalidStateTransitonException: Invalid event:
T_ATTEMPT_KILLED at KILLED. (hitesh)
TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException: Invalid event: T_ATTEMPT_KILLED at KILLED. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ba6d7e0e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ba6d7e0e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ba6d7e0e
Branch: refs/heads/TEZ-2003
Commit: ba6d7e0e9106ad26c48bda8bb8caa7e4a890c2b1
Parents: 5218f48
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon May 4 12:20:46 2015 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon May 4 12:20:46 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 21 ++++++--
.../tez/dag/app/dag/impl/TestTaskImpl.java | 53 +++++++++++++++++++-
3 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ba6d7e0e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6c19770..0027e98 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -335,6 +335,8 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException:
+ Invalid event: T_ATTEMPT_KILLED at KILLED.
TEZ-2397. Translation of LocalResources via Tez plan serialization can be lossy.
TEZ-2221. VertexGroup name should be unqiue
TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in recovery log
http://git-wip-us.apache.org/repos/asf/tez/blob/ba6d7e0e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 8b63734..15382a8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -117,7 +117,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
protected final EventHandler eventHandler;
private final TezTaskID taskId;
private Map<TezTaskAttemptID, TaskAttempt> attempts;
- private final int maxFailedAttempts;
+ protected final int maxFailedAttempts;
protected final Clock clock;
private final Vertex vertex;
private final Lock readLock;
@@ -256,14 +256,29 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
EnumSet.of(
TaskEventType.T_TERMINATE,
TaskEventType.T_SCHEDULE,
- TaskEventType.T_ADD_SPEC_ATTEMPT))
+ TaskEventType.T_ADD_SPEC_ATTEMPT,
+ TaskEventType.T_ATTEMPT_KILLED))
// Transitions from KILLED state
+ // Ignorable event: T_ATTEMPT_KILLED
+ // Refer to TEZ-2379
+ // T_ATTEMPT_KILLED can show up in KILLED state as
+ // a SUCCEEDED attempt can still transition to KILLED after receiving
+ // a KILL event.
+ // This could happen when there is a race where the task receives a
+ // kill event, it tries to kill all running attempts and a potential
+ // running attempt succeeds before it receives the kill event.
+ // The task will then receive both a SUCCEEDED and KILLED
+ // event from the same attempt.
+ // Duplicate events from a single attempt in KILL_WAIT are handled
+ // properly. However, the subsequent T_ATTEMPT_KILLED event might
+ // be received after the task reaches its terminal KILLED state.
.addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
EnumSet.of(
TaskEventType.T_TERMINATE,
TaskEventType.T_SCHEDULE,
- TaskEventType.T_ADD_SPEC_ATTEMPT))
+ TaskEventType.T_ADD_SPEC_ATTEMPT,
+ TaskEventType.T_ATTEMPT_KILLED))
// create the topology tables
.installTopology();
http://git-wip-us.apache.org/repos/asf/tez/blob/ba6d7e0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 66e6724..9da3fab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -177,6 +177,12 @@ public class TestTaskImpl {
assertTaskKillWaitState();
}
+ private void failTask(TezTaskID taskId) {
+ mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
+ assertTaskKillWaitState();
+ }
+
+
private void killScheduledTaskAttempt(TezTaskAttemptID attemptId) {
mockTask.handle(new TaskEventTAUpdate(attemptId,
TaskEventType.T_ATTEMPT_KILLED));
@@ -289,7 +295,6 @@ public class TestTaskImpl {
killTask(taskId);
mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
TaskEventType.T_ATTEMPT_KILLED));
-
assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
}
@@ -377,6 +382,52 @@ public class TestTaskImpl {
killRunningTaskAttempt(mockTask.getLastAttempt().getID());
}
+ /**
+ * {@link TaskState#KILLED}->{@link TaskState#KILLED}
+ */
+ @Test(timeout = 5000)
+ public void testKilledAttemptAtTaskKilled() {
+ LOG.info("--- START: testKilledAttemptAtTaskKilled ---");
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(mockTask.getLastAttempt().getID());
+ killTask(taskId);
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+ TaskEventType.T_ATTEMPT_KILLED));
+ assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
+
+ // Send duplicate kill for same attempt
+ // This will not happen in practice but this is to simulate handling
+ // of killed attempts in killed state.
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+ TaskEventType.T_ATTEMPT_KILLED));
+ assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
+
+ }
+
+ /**
+ * {@link TaskState#FAILED}->{@link TaskState#FAILED}
+ */
+ @Test(timeout = 5000)
+ public void testKilledAttemptAtTaskFailed() {
+ LOG.info("--- START: testKilledAttemptAtTaskFailed ---");
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ for (int i = 0; i < mockTask.maxFailedAttempts; ++i) {
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ }
+ assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
+
+ // Send kill for an attempt
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+ TaskEventType.T_ATTEMPT_KILLED));
+ assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
+
+ }
+
+
+
@Test(timeout = 5000)
public void testFetchedEventsModifyUnderlyingList() {
// Tests to ensure that adding an event to a task, does not affect the