You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/04/09 03:43:35 UTC

git commit: TEZ-1028. Handle killed tasks and attempts when handling recovery data. (hitesh)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 1ee92e32c -> b8fef8417


TEZ-1028. Handle killed tasks and attempts when handling recovery data. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b8fef841
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b8fef841
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b8fef841

Branch: refs/heads/master
Commit: b8fef84177bd8b339e4d96f8ee2fd60a79830639
Parents: 1ee92e3
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Apr 8 18:42:56 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Apr 8 18:42:56 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 53 +++++++++++++-------
 1 file changed, 34 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8fef841/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 4490103..5169a7f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -508,10 +508,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     return diagnostics;
   }
 
-  private TaskAttempt createRecoveredEvent(TaskAttemptStartedEvent
-      taskAttemptStartedEvent) {
-    TaskAttempt taskAttempt = createAttempt(
-        taskAttemptStartedEvent.getTaskAttemptID().getId());
+  private TaskAttempt createRecoveredEvent(TezTaskAttemptID tezTaskAttemptID) {
+    TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId());
     return taskAttempt;
   }
 
@@ -533,11 +531,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       }
       case TASK_FINISHED:
       {
-        if (!recoveryStartEventSeen) {
-          throw new RuntimeException("Finished Event seen but"
-              + " no Started Event was encountered earlier");
-        }
         TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent;
+        if (!recoveryStartEventSeen
+            && !tEvent.getState().equals(TaskState.KILLED)) {
+          throw new TezUncheckedException("Finished Event seen but"
+              + " no Started Event was encountered earlier"
+              + ", taskId=" + taskId
+              + ", finishState=" + tEvent.getState());
+        }
         recoveredState = tEvent.getState();
         if (tEvent.getState() == TaskState.SUCCEEDED
             && tEvent.getSuccessfulAttemptID() != null) {
@@ -549,7 +550,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       {
         TaskAttemptStartedEvent taskAttemptStartedEvent =
             (TaskAttemptStartedEvent) historyEvent;
-        TaskAttempt recoveredAttempt = createRecoveredEvent(taskAttemptStartedEvent);
+        TaskAttempt recoveredAttempt = createRecoveredEvent(
+            taskAttemptStartedEvent.getTaskAttemptID());
         recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Adding restored attempt into known attempts map"
@@ -563,23 +565,36 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       }
       case TASK_ATTEMPT_FINISHED:
       {
+        TaskAttemptFinishedEvent taskAttemptFinishedEvent =
+            (TaskAttemptFinishedEvent) historyEvent;
+        TaskAttempt taskAttempt = this.attempts.get(
+            taskAttemptFinishedEvent.getTaskAttemptID());
         finishedAttempts++;
+        if (taskAttempt == null) {
+          LOG.warn("Received an attempt finished event for an attempt that "
+              + " never started or does not exist"
+              + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID()
+              + ", taskAttemptFinishState=" + taskAttemptFinishedEvent.getState());
+          TaskAttempt recoveredAttempt = createRecoveredEvent(
+              taskAttemptFinishedEvent.getTaskAttemptID());
+          this.attempts.put(taskAttemptFinishedEvent.getTaskAttemptID(),
+              recoveredAttempt);
+          if (!taskAttemptFinishedEvent.getState().equals(TaskAttemptState.KILLED)) {
+            throw new TezUncheckedException("Could not find task attempt"
+                + " when trying to recover"
+                + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID()
+                + ", taskAttemptFinishState" + taskAttemptFinishedEvent.getState());
+          }
+          return recoveredState;
+        }
         --numberUncompletedAttempts;
         if (numberUncompletedAttempts < 0) {
-          throw new RuntimeException("Invalid recovery event for attempt finished"
+          throw new TezUncheckedException("Invalid recovery event for attempt finished"
               + ", more completions than starts encountered"
+              + ", taskId=" + taskId
               + ", finishedAttempts=" + finishedAttempts
               + ", incompleteAttempts=" + numberUncompletedAttempts);
         }
-        TaskAttemptFinishedEvent taskAttemptFinishedEvent =
-            (TaskAttemptFinishedEvent) historyEvent;
-        TaskAttempt taskAttempt = this.attempts.get(
-            taskAttemptFinishedEvent.getTaskAttemptID());
-        if (taskAttempt == null) {
-          throw new RuntimeException("Could not find task attempt"
-              + " when trying to recover"
-              + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID());
-        }
         TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent(
             taskAttemptFinishedEvent);
         if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) {