You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/04/09 03:43:35 UTC
git commit: TEZ-1028. Handle killed tasks and attempts when handling
recovery data. (hitesh)
Repository: incubator-tez
Updated Branches:
refs/heads/master 1ee92e32c -> b8fef8417
TEZ-1028. Handle killed tasks and attempts when handling recovery data. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b8fef841
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b8fef841
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b8fef841
Branch: refs/heads/master
Commit: b8fef84177bd8b339e4d96f8ee2fd60a79830639
Parents: 1ee92e3
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Apr 8 18:42:56 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Apr 8 18:42:56 2014 -0700
----------------------------------------------------------------------
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 53 +++++++++++++-------
1 file changed, 34 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8fef841/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 4490103..5169a7f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -508,10 +508,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
return diagnostics;
}
- private TaskAttempt createRecoveredEvent(TaskAttemptStartedEvent
- taskAttemptStartedEvent) {
- TaskAttempt taskAttempt = createAttempt(
- taskAttemptStartedEvent.getTaskAttemptID().getId());
+ private TaskAttempt createRecoveredEvent(TezTaskAttemptID tezTaskAttemptID) {
+ TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId());
return taskAttempt;
}
@@ -533,11 +531,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
case TASK_FINISHED:
{
- if (!recoveryStartEventSeen) {
- throw new RuntimeException("Finished Event seen but"
- + " no Started Event was encountered earlier");
- }
TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent;
+ if (!recoveryStartEventSeen
+ && !tEvent.getState().equals(TaskState.KILLED)) {
+ throw new TezUncheckedException("Finished Event seen but"
+ + " no Started Event was encountered earlier"
+ + ", taskId=" + taskId
+ + ", finishState=" + tEvent.getState());
+ }
recoveredState = tEvent.getState();
if (tEvent.getState() == TaskState.SUCCEEDED
&& tEvent.getSuccessfulAttemptID() != null) {
@@ -549,7 +550,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
{
TaskAttemptStartedEvent taskAttemptStartedEvent =
(TaskAttemptStartedEvent) historyEvent;
- TaskAttempt recoveredAttempt = createRecoveredEvent(taskAttemptStartedEvent);
+ TaskAttempt recoveredAttempt = createRecoveredEvent(
+ taskAttemptStartedEvent.getTaskAttemptID());
recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding restored attempt into known attempts map"
@@ -563,23 +565,36 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
case TASK_ATTEMPT_FINISHED:
{
+ TaskAttemptFinishedEvent taskAttemptFinishedEvent =
+ (TaskAttemptFinishedEvent) historyEvent;
+ TaskAttempt taskAttempt = this.attempts.get(
+ taskAttemptFinishedEvent.getTaskAttemptID());
finishedAttempts++;
+ if (taskAttempt == null) {
+ LOG.warn("Received an attempt finished event for an attempt that "
+ + " never started or does not exist"
+ + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID()
+ + ", taskAttemptFinishState=" + taskAttemptFinishedEvent.getState());
+ TaskAttempt recoveredAttempt = createRecoveredEvent(
+ taskAttemptFinishedEvent.getTaskAttemptID());
+ this.attempts.put(taskAttemptFinishedEvent.getTaskAttemptID(),
+ recoveredAttempt);
+ if (!taskAttemptFinishedEvent.getState().equals(TaskAttemptState.KILLED)) {
+ throw new TezUncheckedException("Could not find task attempt"
+ + " when trying to recover"
+ + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID()
+ + ", taskAttemptFinishState" + taskAttemptFinishedEvent.getState());
+ }
+ return recoveredState;
+ }
--numberUncompletedAttempts;
if (numberUncompletedAttempts < 0) {
- throw new RuntimeException("Invalid recovery event for attempt finished"
+ throw new TezUncheckedException("Invalid recovery event for attempt finished"
+ ", more completions than starts encountered"
+ + ", taskId=" + taskId
+ ", finishedAttempts=" + finishedAttempts
+ ", incompleteAttempts=" + numberUncompletedAttempts);
}
- TaskAttemptFinishedEvent taskAttemptFinishedEvent =
- (TaskAttemptFinishedEvent) historyEvent;
- TaskAttempt taskAttempt = this.attempts.get(
- taskAttemptFinishedEvent.getTaskAttemptID());
- if (taskAttempt == null) {
- throw new RuntimeException("Could not find task attempt"
- + " when trying to recover"
- + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID());
- }
TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent(
taskAttemptFinishedEvent);
if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) {