You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/06 00:35:18 UTC

[3/4] TEZ-847. Support basic AM recovery. (hitesh)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 7fe07af..9572f6c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -25,6 +25,7 @@ import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -84,6 +85,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
@@ -91,6 +93,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -193,6 +196,14 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptEventType.TA_KILL_REQUEST,
           new TerminateTransition(KILLED_HELPER))
 
+      .addTransition(TaskAttemptStateInternal.NEW,
+          EnumSet.of(TaskAttemptStateInternal.NEW,
+              TaskAttemptStateInternal.RUNNING,
+              TaskAttemptStateInternal.KILLED,
+              TaskAttemptStateInternal.FAILED,
+              TaskAttemptStateInternal.SUCCEEDED),
+          TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
+
       .addTransition(TaskAttemptStateInternal.START_WAIT,
           TaskAttemptStateInternal.RUNNING,
           TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
@@ -462,6 +473,8 @@ public class TaskAttemptImpl implements TaskAttempt,
 
         .installTopology();
 
+  private TaskAttemptState recoveredState = TaskAttemptState.NEW;
+  private boolean recoveryStartEventSeen = false;
 
   @SuppressWarnings("rawtypes")
   public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
@@ -782,6 +795,39 @@ public class TaskAttemptImpl implements TaskAttempt,
     return isRescheduled;
   }
 
+  @Override
+  public TaskAttemptState restoreFromEvent(HistoryEvent historyEvent) {
+    switch (historyEvent.getEventType()) {
+      case TASK_ATTEMPT_STARTED:
+      {
+        TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent;
+        this.launchTime = tEvent.getStartTime();
+        recoveryStartEventSeen = true;
+        recoveredState = TaskAttemptState.RUNNING;
+        return recoveredState;
+      }
+      case TASK_ATTEMPT_FINISHED:
+      {
+        if (!recoveryStartEventSeen) {
+          throw new RuntimeException("Finished Event seen but"
+              + " no Started Event was encountered earlier");
+        }
+        TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) historyEvent;
+        this.finishTime = tEvent.getFinishTime();
+        this.reportedStatus.counters = tEvent.getCounters();
+        this.reportedStatus.progress = 1f;
+        this.reportedStatus.state = tEvent.getState();
+        this.diagnostics.add(tEvent.getDiagnostics());
+        this.recoveredState = tEvent.getState();
+        return recoveredState;
+      }
+      default:
+        throw new RuntimeException("Unexpected event received for restoring"
+            + " state, eventType=" + historyEvent.getEventType());
+
+    }
+  }
+
   @SuppressWarnings("unchecked")
   private void sendEvent(Event<?> event) {
     this.eventHandler.handle(event);
@@ -1366,7 +1412,46 @@ public class TaskAttemptImpl implements TaskAttempt,
     }
 
   }
-  
+
+  protected static class RecoverTransition implements
+      MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
+
+    @Override
+    public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+      TaskAttemptStateInternal endState = TaskAttemptStateInternal.FAILED;
+      switch(taskAttempt.recoveredState) {
+        case NEW:
+        case RUNNING:
+          // FIXME once running containers can be recovered, this
+          // should be handled differently
+          // TODO abort taskattempt
+          taskAttempt.sendEvent(new TaskEventTAUpdate(taskAttempt.attemptId,
+              TaskEventType.T_ATTEMPT_FAILED));
+          endState = TaskAttemptStateInternal.FAILED;
+          break;
+        case SUCCEEDED:
+          // Do not inform Task as it already knows about completed attempts
+          endState = TaskAttemptStateInternal.SUCCEEDED;
+          break;
+        case FAILED:
+          // Do not inform Task as it already knows about completed attempts
+          endState = TaskAttemptStateInternal.FAILED;
+          break;
+        case KILLED:
+          // Do not inform Task as it already knows about completed attempts
+          endState = TaskAttemptStateInternal.KILLED;
+          break;
+        default:
+          throw new RuntimeException("Failed to recover from non-handled state"
+              + ", taskAttemptId=" + taskAttempt.getID()
+              + ", state=" + taskAttempt.recoveredState);
+      }
+
+      return endState;
+    }
+
+  }
+
   protected static class TerminatedAfterSuccessTransition implements
       MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 793c12a..16c063a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -24,6 +24,7 @@ import java.util.EnumSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -63,7 +64,9 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
@@ -74,11 +77,15 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.impl.TezEvent;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -130,6 +137,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private static final SingleArcTransition<TaskImpl, TaskEvent>
      ADD_TEZ_EVENT_TRANSITION = new AddTezEventTransition();
 
+  // Recovery related flags
+  boolean recoveryStartEventSeen = false;
+
   private static final StateMachineFactory
                <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
             stateMachineFactory
@@ -142,11 +152,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
         TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
-            TaskEventType.T_TERMINATE,
-            new KillNewTransition())
+        TaskEventType.T_TERMINATE,
+        new KillNewTransition())
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.NEW,
         TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
 
+    // Recover transition
+    .addTransition(TaskStateInternal.NEW,
+        EnumSet.of(TaskStateInternal.NEW,
+            TaskStateInternal.SCHEDULED,
+            TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
+            TaskStateInternal.FAILED, TaskStateInternal.KILLED),
+        TaskEventType.T_RECOVER, new RecoverTransition())
 
     // Transitions from SCHEDULED state
       //when the first attempt is launched, the task state is set to RUNNING
@@ -154,7 +171,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
          TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
      .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT,
          TaskEventType.T_TERMINATE,
-             KILL_TRANSITION)
+         KILL_TRANSITION)
      .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
          TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
      .addTransition(TaskStateInternal.SCHEDULED,
@@ -190,7 +207,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         KILL_TRANSITION)
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
-
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
+        TaskEventType.T_SCHEDULE)
 
     // Transitions from KILL_WAIT state
     .addTransition(TaskStateInternal.KILL_WAIT,
@@ -235,6 +253,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             TaskEventType.T_TERMINATE,
             TaskEventType.T_ATTEMPT_SUCCEEDED, // Maybe track and reuse later
             TaskEventType.T_ATTEMPT_LAUNCHED))
+    .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
+        TaskEventType.T_SCHEDULE)
 
     // Transitions from FAILED state
     .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
@@ -251,6 +271,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         EnumSet.of(
             TaskEventType.T_TERMINATE,
             TaskEventType.T_ADD_SPEC_ATTEMPT))
+    .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
+        TaskEventType.T_SCHEDULE)
 
     // create the topology tables
     .installTopology();
@@ -292,6 +314,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private int finishedAttempts;//finish are total of success, failed and killed
 
   private final boolean leafVertex;
+  private TaskState recoveredState = TaskState.NEW;
 
   @Override
   public TaskState getState() {
@@ -507,6 +530,92 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     return diagnostics;
   }
 
+  private TaskAttempt createRecoveredEvent(TaskAttemptStartedEvent
+      taskAttemptStartedEvent) {
+    TaskAttempt taskAttempt = createAttempt(
+        taskAttemptStartedEvent.getTaskAttemptID().getId());
+    return taskAttempt;
+  }
+
+  @Override
+  public TaskState restoreFromEvent(HistoryEvent historyEvent) {
+    switch (historyEvent.getEventType()) {
+      case TASK_STARTED:
+      {
+        TaskStartedEvent tEvent = (TaskStartedEvent) historyEvent;
+        recoveryStartEventSeen = true;
+        this.scheduledTime = tEvent.getScheduledTime();
+        if (this.attempts == null
+            || this.attempts.isEmpty()) {
+          this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
+        }
+        recoveredState = TaskState.SCHEDULED;
+        finishedAttempts = 0;
+        return recoveredState;
+      }
+      case TASK_FINISHED:
+      {
+        if (!recoveryStartEventSeen) {
+          throw new RuntimeException("Finished Event seen but"
+              + " no Started Event was encountered earlier");
+        }
+        TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent;
+        recoveredState = tEvent.getState();
+        if (tEvent.getState() == TaskState.SUCCEEDED
+            && tEvent.getSuccessfulAttemptID() != null) {
+          successfulAttempt = tEvent.getSuccessfulAttemptID();
+        }
+        return recoveredState;
+      }
+      case TASK_ATTEMPT_STARTED:
+      {
+        TaskAttemptStartedEvent taskAttemptStartedEvent =
+            (TaskAttemptStartedEvent) historyEvent;
+        TaskAttempt recoveredAttempt = createRecoveredEvent(taskAttemptStartedEvent);
+        recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Adding restored attempt into known attempts map"
+              + ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID());
+        }
+        this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
+            recoveredAttempt);
+        ++numberUncompletedAttempts;
+        this.recoveredState = TaskState.RUNNING;
+        return recoveredState;
+      }
+      case TASK_ATTEMPT_FINISHED:
+      {
+        finishedAttempts++;
+        --numberUncompletedAttempts;
+        if (numberUncompletedAttempts < 0) {
+          throw new RuntimeException("Invalid recovery event for attempt finished"
+              + ", more completions than starts encountered"
+              + ", finishedAttempts=" + finishedAttempts
+              + ", incompleteAttempts=" + numberUncompletedAttempts);
+        }
+        TaskAttemptFinishedEvent taskAttemptFinishedEvent =
+            (TaskAttemptFinishedEvent) historyEvent;
+        TaskAttempt taskAttempt = this.attempts.get(
+            taskAttemptFinishedEvent.getTaskAttemptID());
+        if (taskAttempt == null) {
+          throw new RuntimeException("Could not find task attempt"
+              + " when trying to recover"
+              + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID());
+        }
+        TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent(
+            taskAttemptFinishedEvent);
+        if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) {
+          recoveredState = TaskState.SUCCEEDED;
+          successfulAttempt = taskAttempt.getID();
+        }
+        return recoveredState;
+      }
+      default:
+        throw new RuntimeException("Unexpected event received for restoring"
+            + " state, eventType=" + historyEvent.getEventType());
+    }
+  }
+
   @VisibleForTesting
   public TaskStateInternal getInternalState() {
     readLock.lock();
@@ -851,6 +960,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     // is called from within a transition
     TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
         getVertex().getName(), getLaunchTime(), clock.getTime(),
+        successfulAttempt,
         TaskState.SUCCEEDED, getCounters());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(taskId.getVertexID().getDAGId(), finishEvt));
@@ -858,7 +968,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
     TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
-        getVertex().getName(), getLaunchTime(), clock.getTime(),
+        getVertex().getName(), getLaunchTime(), clock.getTime(), null,
         finalState, getCounters());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(taskId.getVertexID().getDAGId(), finishEvt));
@@ -992,6 +1102,120 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
+  private static class RecoverTransition implements
+      MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+
+    @Override
+    public TaskStateInternal transition(TaskImpl task, TaskEvent taskEvent) {
+      TaskStateInternal endState = TaskStateInternal.NEW;
+      if (task.attempts != null) {
+        for (TaskAttempt taskAttempt : task.attempts.values()) {
+          task.eventHandler.handle(new TaskAttemptEvent(
+              taskAttempt.getID(), TaskAttemptEventType.TA_RECOVER));
+        }
+      }
+      LOG.info("Trying to recover task"
+          + ", taskId=" + task.getTaskId()
+          + ", recoveredState=" + task.recoveredState);
+      switch(task.recoveredState) {
+        case NEW:
+          // Nothing to do until the vertex schedules this task
+          endState = TaskStateInternal.NEW;
+          break;
+        case SCHEDULED:
+        case RUNNING:
+        case SUCCEEDED:
+          if (task.successfulAttempt != null) {
+            //Found successful attempt
+            //Recover data
+            boolean recoveredData = true;
+            if (task.getVertex().getOutputCommitters() != null
+                && !task.getVertex().getOutputCommitters().isEmpty()) {
+              for (Entry<String, OutputCommitter> entry
+                  : task.getVertex().getOutputCommitters().entrySet()) {
+                LOG.info("Recovering data for task from previous DAG attempt"
+                    + ", taskId=" + task.getTaskId()
+                    + ", output=" + entry.getKey());
+                OutputCommitter committer = entry.getValue();
+                if (!committer.isTaskRecoverySupported()) {
+                  LOG.info("Task recovery not supported by committer"
+                      + ", failing task attempt"
+                      + ", taskId=" + task.getTaskId()
+                      + ", attemptId=" + task.successfulAttempt
+                      + ", output=" + entry.getKey());
+                  recoveredData = false;
+                  break;
+                }
+                try {
+                  committer.recoverTask(task.getTaskId().getId(),
+                      task.appContext.getApplicationAttemptId().getAttemptId()-1);
+                } catch (Exception e) {
+                  LOG.warn("Task recovery failed by committer"
+                      + ", taskId=" + task.getTaskId()
+                      + ", attemptId=" + task.successfulAttempt
+                      + ", output=" + entry.getKey(), e);
+                  recoveredData = false;
+                  break;
+                }
+              }
+            }
+            if (!recoveredData) {
+              task.successfulAttempt = null;
+            } else {
+              LOG.info("Recovered a successful attempt"
+                  + ", taskAttemptId=" + task.successfulAttempt.toString());
+              task.logJobHistoryTaskFinishedEvent();
+              task.eventHandler.handle(
+                  new VertexEventTaskCompleted(task.taskId,
+                      getExternalState(TaskStateInternal.SUCCEEDED)));
+              task.eventHandler.handle(
+                  new VertexEventTaskAttemptCompleted(
+                      task.successfulAttempt, TaskAttemptStateInternal.SUCCEEDED));
+              endState = TaskStateInternal.SUCCEEDED;
+              break;
+            }
+          }
+
+          if (endState != TaskStateInternal.SUCCEEDED &&
+              task.attempts.size() >= task.maxAttempts) {
+            // Exceeded max attempts
+            task.finished(TaskStateInternal.FAILED);
+            endState = TaskStateInternal.FAILED;
+            break;
+          }
+
+          // no successful attempt and all attempts completed
+          // schedule a new one
+          // If any incomplete, the running attempt will moved to failed and its
+          // update will trigger a new attempt if possible
+          if (task.attempts.size() == task.finishedAttempts) {
+            task.addAndScheduleAttempt();
+          }
+          endState = TaskStateInternal.RUNNING;
+          break;
+        case KILLED:
+          // Nothing to do
+          // Inform vertex
+          task.eventHandler.handle(
+              new VertexEventTaskCompleted(task.taskId,
+                  getExternalState(TaskStateInternal.KILLED)));
+          endState  = TaskStateInternal.KILLED;
+          break;
+        case FAILED:
+          // Nothing to do
+          // Inform vertex
+          task.eventHandler.handle(
+              new VertexEventTaskCompleted(task.taskId,
+                  getExternalState(TaskStateInternal.FAILED)));
+
+          endState = TaskStateInternal.FAILED;
+          break;
+      }
+
+      return endState;
+    }
+  }
+
 
   private static class KillWaitAttemptCompletedTransition implements
       MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index d3e07cb..67f978a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -52,11 +52,9 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -94,13 +92,16 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
@@ -110,9 +111,12 @@ import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
 import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.dag.records.TezDAGID;
@@ -185,6 +189,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private int numStartedSourceVertices = 0;
   private int numInitedSourceVertices = 0;
+  private int numRecoveredSourceVertices = 0;
+
   private int distanceFromRoot = 0;
 
   private final List<String> diagnostics = new ArrayList<String>();
@@ -209,6 +215,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
           new SourceTaskAttemptCompletedEventTransition();
 
+  private VertexState recoveredState = VertexState.NEW;
+  private List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
+
   protected static final
     StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
        stateMachineFactory
@@ -222,6 +231,33 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexState.INITIALIZING, VertexState.FAILED),
               VertexEventType.V_INIT,
               new InitTransition())
+          .addTransition
+              (VertexState.NEW,
+                  EnumSet.of(VertexState.NEW, VertexState.INITED,
+                      VertexState.INITIALIZING, VertexState.RUNNING,
+                      VertexState.SUCCEEDED, VertexState.FAILED,
+                      VertexState.KILLED, VertexState.ERROR,
+                      VertexState.RECOVERING),
+                  VertexEventType.V_RECOVER,
+                  new StartRecoverTransition())
+          .addTransition
+              (VertexState.RECOVERING,
+                  EnumSet.of(VertexState.NEW, VertexState.INITED,
+                      VertexState.INITIALIZING, VertexState.RUNNING,
+                      VertexState.SUCCEEDED, VertexState.FAILED,
+                      VertexState.KILLED, VertexState.ERROR,
+                      VertexState.RECOVERING),
+                  VertexEventType.V_SOURCE_VERTEX_RECOVERED,
+                  new RecoverTransition())
+          .addTransition
+              (VertexState.NEW,
+                  EnumSet.of(VertexState.INITED,
+                      VertexState.INITIALIZING, VertexState.RUNNING,
+                      VertexState.SUCCEEDED, VertexState.FAILED,
+                      VertexState.KILLED, VertexState.ERROR,
+                      VertexState.RECOVERING),
+                  VertexEventType.V_SOURCE_VERTEX_RECOVERED,
+                  new RecoverTransition())
           .addTransition(VertexState.NEW, VertexState.NEW,
               VertexEventType.V_SOURCE_VERTEX_STARTED,
               new SourceVertexStartedTransition())
@@ -367,6 +403,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   // reruns.
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED))
+          .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
+              VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+              new TaskAttemptCompletedEventTransition())
 
           // Transitions from FAILED state
           .addTransition(
@@ -491,6 +530,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private VertexTerminationCause terminationCause;
   
   private String logIdentifier;
+  private boolean recoveryCommitInProgress = false;
+  private Map<String,EdgeManagerDescriptor> recoveredSourceEdgeManagers = null;
+
+  // Recovery related flags
+  boolean recoveryInitEventSeen = false;
+  boolean recoveryStartEventSeen = false;
 
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
       String vertexName, Configuration conf, EventHandler eventHandler,
@@ -515,7 +560,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     this.vertexLocationHint = vertexLocationHint;
     if (LOG.isDebugEnabled()) {
-      logLocationHints(this.vertexLocationHint);
+      logLocationHints(this.vertexName, this.vertexLocationHint);
     }
 
     this.dagUgi = appContext.getCurrentDAG().getDagUGI();
@@ -800,6 +845,104 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return this.appContext;
   }
 
+  private void handleParallelismUpdate(int newParallelism,
+      Map<String, EdgeManagerDescriptor> sourceEdgeManagers) {
+    LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
+    Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet()
+        .iterator();
+    int i = 0;
+    while (iter.hasNext()) {
+      i++;
+      Map.Entry<TezTaskID, Task> entry = iter.next();
+      if (i <= newParallelism) {
+        continue;
+      }
+      iter.remove();
+    }
+    this.recoveredSourceEdgeManagers =
+        sourceEdgeManagers;
+  }
+
+  @Override
+  public VertexState restoreFromEvent(HistoryEvent historyEvent) {
+    switch (historyEvent.getEventType()) {
+      case VERTEX_INITIALIZED:
+        recoveryInitEventSeen = true;
+        recoveredState = setupVertex((VertexInitializedEvent) historyEvent);
+        createTasks();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Recovered state for vertex after Init event"
+              + ", vertex=" + logIdentifier
+              + ", recoveredState=" + recoveredState);
+        }
+        return recoveredState;
+      case VERTEX_STARTED:
+        if (!recoveryInitEventSeen) {
+          throw new RuntimeException("Started Event seen but"
+              + " no Init Event was encountered earlier");
+        }
+        recoveryStartEventSeen = true;
+        VertexStartedEvent startedEvent = (VertexStartedEvent) historyEvent;
+        startTimeRequested = startedEvent.getStartRequestedTime();
+        startedTime = startedEvent.getStartTime();
+        recoveredState = VertexState.RUNNING;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Recovered state for vertex after Started event"
+              + ", vertex=" + logIdentifier
+              + ", recoveredState=" + recoveredState);
+        }
+        return recoveredState;
+      case VERTEX_PARALLELISM_UPDATED:
+        VertexParallelismUpdatedEvent updatedEvent =
+            (VertexParallelismUpdatedEvent) historyEvent;
+        if (updatedEvent.getVertexLocationHint() != null) {
+          vertexLocationHint = updatedEvent.getVertexLocationHint();
+        }
+        numTasks = updatedEvent.getNumTasks();
+        handleParallelismUpdate(numTasks, updatedEvent.getSourceEdgeManagers());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Recovered state for vertex after parallelism updated event"
+              + ", vertex=" + logIdentifier
+              + ", recoveredState=" + recoveredState);
+        }
+        return recoveredState;
+      case VERTEX_COMMIT_STARTED:
+        if (recoveredState != VertexState.RUNNING) {
+          throw new RuntimeException("Commit Started Event seen but"
+              + " recovered state is not RUNNING"
+              + ", recoveredState=" + recoveredState);
+        }
+        recoveryCommitInProgress = true;
+        return recoveredState;
+      case VERTEX_FINISHED:
+        if (!recoveryStartEventSeen) {
+          throw new RuntimeException("Finished Event seen but"
+              + " no Started Event was encountered earlier");
+        }
+        recoveryCommitInProgress = false;
+        VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
+        recoveredState = finishedEvent.getState();
+        diagnostics.add(finishedEvent.getDiagnostics());
+        finishTime = finishedEvent.getFinishTime();
+        // TODO counters ??
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Recovered state for vertex after finished event"
+              + ", vertex=" + logIdentifier
+              + ", recoveredState=" + recoveredState);
+        }
+        return recoveredState;
+      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+        VertexDataMovementEventsGeneratedEvent vEvent =
+            (VertexDataMovementEventsGeneratedEvent) historyEvent;
+        this.recoveredEvents.addAll(vEvent.getTezEvents());
+        return recoveredState;
+      default:
+        throw new RuntimeException("Unexpected event received for restoring"
+            + " state, eventType=" + historyEvent.getEventType());
+
+    }
+  }
+
   // TODO Create InputReadyVertexManager that schedules when there is something
   // to read and use that as default instead of ImmediateStart.TEZ-480
   @Override
@@ -829,8 +972,40 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   @Override
   public boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerDescriptor> sourceEdgeManagers) {
-    writeLock.lock();
+    return setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, false);
+  }
+
+  private boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+      Map<String, EdgeManagerDescriptor> sourceEdgeManagers,
+      boolean recovering) {
+    if (recovering) {
+      writeLock.lock();
+      try {
+        if (sourceEdgeManagers != null) {
+          for(Map.Entry<String, EdgeManagerDescriptor> entry :
+              sourceEdgeManagers.entrySet()) {
+            LOG.info("Recovering edge manager for source:"
+                + entry.getKey() + " destination: " + getVertexId());
+            Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
+            Edge edge = sourceVertices.get(sourceVertex);
+            try {
+              edge.setCustomEdgeManager(entry.getValue());
+            } catch (Exception e) {
+              LOG.warn("Failed to initialize edge manager for edge"
+                  + ", sourceVertexName=" + sourceVertex.getName()
+                  + ", destinationVertexName=" + edge.getDestinationVertexName(),
+                  e);
+              return false;
+            }
+          }
+        }
+        return true;
+      } finally {
+        writeLock.unlock();
+      }
+    }
     setVertexLocationHint(vertexLocationHint);
+    writeLock.lock();
     try {
       if (parallelismSet == true) {
         LOG.info("Parallelism can only be set dynamically once per vertex");
@@ -928,7 +1103,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             }
           }
         }
-  
+
+        VertexParallelismUpdatedEvent parallelismUpdatedEvent =
+            new VertexParallelismUpdatedEvent(vertexId, numTasks,
+                vertexLocationHint,
+                sourceEdgeManagers);
+        appContext.getHistoryHandler().handle(new DAGHistoryEvent(getDAGId(),
+            parallelismUpdatedEvent));
+
         // stop buffering events
         for (Edge edge : sourceVertices.values()) {
           edge.stopEventBuffering();
@@ -962,7 +1144,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     try {
       this.vertexLocationHint = vertexLocationHint;
       if (LOG.isDebugEnabled()) {
-        logLocationHints(this.vertexLocationHint);
+        logLocationHints(this.vertexName, this.vertexLocationHint);
       }
     } finally {
       writeLock.unlock();
@@ -1039,7 +1221,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   void logJobHistoryVertexInitializedEvent() {
     VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName,
         initTimeRequested, initedTime, numTasks,
-        getProcessorName());
+        getProcessorName(), getAdditionalInputs());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGId(), initEvt));
   }
@@ -1055,13 +1237,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.setFinishTime();
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
         vertexName, initTimeRequested, initedTime, startTimeRequested,
-        startedTime, finishTime, VertexStatus.State.SUCCEEDED, "",
+        startedTime, finishTime, VertexState.SUCCEEDED, "",
         getAllCounters());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGId(), finishEvt));
   }
 
-  void logJobHistoryVertexFailedEvent(VertexStatus.State state) {
+  void logJobHistoryVertexFailedEvent(VertexState state) {
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
         vertexName, initTimeRequested, initedTime, startTimeRequested,
         startedTime, clock.getTime(), state, StringUtils.join(LINE_SEPARATOR,
@@ -1073,7 +1255,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   static VertexState checkVertexForCompletion(final VertexImpl vertex) {
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Checking for vertex completion"
+      LOG.debug("Checking for vertex completion for "
+          + vertex.logIdentifier
+          + ", numTasks=" + vertex.numTasks
           + ", failedTaskCount=" + vertex.failedTaskCount
           + ", killedTaskCount=" + vertex.killedTaskCount
           + ", successfulTaskCount=" + vertex.succeededTaskCount
@@ -1084,6 +1268,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     //check for vertex failure first
     if (vertex.completedTaskCount > vertex.tasks.size()) {
       LOG.error("task completion accounting issue: completedTaskCount > nTasks:"
+          + " for vertex " + vertex.logIdentifier
+          + ", numTasks=" + vertex.numTasks
           + ", failedTaskCount=" + vertex.failedTaskCount
           + ", killedTaskCount=" + vertex.killedTaskCount
           + ", successfulTaskCount=" + vertex.succeededTaskCount
@@ -1096,6 +1282,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
         LOG.info("Vertex succeeded: " + vertex.logIdentifier);
         try {
+          if (vertex.outputCommitters != null) {
+            vertex.appContext.getHistoryHandler().handle(
+                new DAGHistoryEvent(vertex.getDAGId(),
+                    new VertexCommitStartedEvent(vertex.vertexId)));
+          }
           if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
             // commit only once. Dont commit shared outputs
             LOG.info("Invoking committer commit for vertex, vertexId="
@@ -1197,20 +1388,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     if (finishTime == 0) setFinishTime();
 
     switch (finalState) {
-      case KILLED:
-        eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
-            finalState, terminationCause));
-        logJobHistoryVertexFailedEvent(VertexStatus.State.KILLED);
-        break;
       case ERROR:
         eventHandler.handle(new DAGEvent(getDAGId(),
             DAGEventType.INTERNAL_ERROR));
-        logJobHistoryVertexFailedEvent(VertexStatus.State.FAILED);
+        logJobHistoryVertexFailedEvent(finalState);
         break;
+      case KILLED:
       case FAILED:
         eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
             finalState, terminationCause));
-        logJobHistoryVertexFailedEvent(VertexStatus.State.FAILED);
+        logJobHistoryVertexFailedEvent(finalState);
         break;
       case SUCCEEDED:
         eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
@@ -1227,58 +1414,65 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return finished(finalState, null);
   }
 
-  private VertexState initializeVertex() {
+
+  private void initializeCommitters() throws Exception {
     if (!this.additionalOutputSpecs.isEmpty()) {
-      try {
-        LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
-        for (Entry<String, RootInputLeafOutputDescriptor<OutputDescriptor>> entry:
+      LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
+      for (Entry<String, RootInputLeafOutputDescriptor<OutputDescriptor>> entry:
           additionalOutputs.entrySet())  {
-          final String outputName = entry.getKey();
-          final RootInputLeafOutputDescriptor<OutputDescriptor> od = entry.getValue();
-          if (od.getInitializerClassName() == null
+        final String outputName = entry.getKey();
+        final RootInputLeafOutputDescriptor<OutputDescriptor> od = entry.getValue();
+        if (od.getInitializerClassName() == null
             || od.getInitializerClassName().isEmpty()) {
-            LOG.info("Ignoring committer as none specified for output="
-                + outputName
+          LOG.info("Ignoring committer as none specified for output="
+              + outputName
+              + ", vertexId=" + logIdentifier);
+          continue;
+        }
+        LOG.info("Instantiating committer for output=" + outputName
+            + ", vertexId=" + logIdentifier
+            + ", committerClass=" + od.getInitializerClassName());
+
+        dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            OutputCommitter outputCommitter = RuntimeUtils.createClazzInstance(
+                od.getInitializerClassName());
+            OutputCommitterContext outputCommitterContext =
+                new OutputCommitterContextImpl(appContext.getApplicationID(),
+                    appContext.getApplicationAttemptId().getAttemptId(),
+                    appContext.getCurrentDAG().getName(),
+                    vertexName,
+                    outputName,
+                    od.getDescriptor().getUserPayload(),
+                    vertexId.getId());
+
+            LOG.info("Invoking committer init for output=" + outputName
                 + ", vertexId=" + logIdentifier);
-            continue;
+            outputCommitter.initialize(outputCommitterContext);
+            outputCommitters.put(outputName, outputCommitter);
+            LOG.info("Invoking committer setup for output=" + outputName
+                + ", vertexId=" + logIdentifier);
+            outputCommitter.setupOutput();
+            return null;
           }
-          LOG.info("Instantiating committer for output=" + outputName
-              + ", vertexId=" + logIdentifier
-              + ", committerClass=" + od.getInitializerClassName());
-
-          dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-              OutputCommitter outputCommitter = RuntimeUtils.createClazzInstance(
-                  od.getInitializerClassName());
-              OutputCommitterContext outputCommitterContext =
-                  new OutputCommitterContextImpl(appContext.getApplicationID(),
-                      appContext.getApplicationAttemptId().getAttemptId(),
-                      appContext.getCurrentDAG().getName(),
-                      vertexName,
-                      outputName,
-                      od.getDescriptor().getUserPayload());
-
-              LOG.info("Invoking committer init for output=" + outputName
-                  + ", vertexId=" + logIdentifier);
-              outputCommitter.initialize(outputCommitterContext);
-              outputCommitters.put(outputName, outputCommitter);
-              LOG.info("Invoking committer setup for output=" + outputName
-                  + ", vertexId=" + logIdentifier);
-              outputCommitter.setupOutput();
-              return null;
-            }
-          });
-        }
-      } catch (Exception e) {
-        LOG.warn("Vertex Committer init failed, vertexId=" + logIdentifier, e);
-        addDiagnostic("Vertex init failed : "
-            + StringUtils.stringifyException(e));
-        trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
-        abortVertex(VertexStatus.State.FAILED);
-        return finished(VertexState.FAILED);
+        });
       }
     }
+  }
+
+  private VertexState initializeVertex() {
+    try {
+      initializeCommitters();
+    } catch (Exception e) {
+      LOG.warn("Vertex Committer init failed, vertexId=" + logIdentifier, e);
+      addDiagnostic("Vertex init failed : "
+          + StringUtils.stringifyException(e));
+      trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
+      abortVertex(VertexStatus.State.FAILED);
+      return finished(VertexState.FAILED);
+    }
+
     // TODO: Metrics
     initedTime = clock.getTime();
 
@@ -1329,130 +1523,645 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   }
 
-  public static class InitTransition implements
-      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+  private VertexState setupVertex() {
+    return setupVertex(null);
+  }
 
-    @Override
-    public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      VertexState vertexState = VertexState.NEW;
-      vertex.numInitedSourceVertices++;
-      if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ||
-          vertex.numInitedSourceVertices == vertex.sourceVertices.size()) {
-        vertexState = handleInitEvent(vertex, event);
-        if (vertexState != VertexState.FAILED) {
-          if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
-            for (Vertex target : vertex.targetVertices.keySet()) {
-              vertex.getEventHandler().handle(new VertexEvent(target.getVertexId(),
-                VertexEventType.V_INIT));
-            }
-          }
+  private VertexState setupVertex(VertexInitializedEvent event) {
+
+    if (event == null) {
+      initTimeRequested = clock.getTime();
+    } else {
+      initTimeRequested = event.getInitRequestedTime();
+      initedTime = event.getInitedTime();
+    }
+
+    // VertexManager needs to be setup before attempting to Initialize any
+    // Inputs - since events generated by them will be routed to the
+    // VertexManager for handling.
+
+    if (dagVertexGroups != null && !dagVertexGroups.isEmpty()) {
+      List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
+      for (VertexGroupInfo groupInfo : dagVertexGroups.values()) {
+        if (groupInfo.edgeMergedInputs.containsKey(getName())) {
+          InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(getName());
+          groupSpecList.add(new GroupInputSpec(groupInfo.groupName,
+              Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
         }
       }
-      return vertexState;
+      if (!groupSpecList.isEmpty()) {
+        groupInputSpecList = groupSpecList;
+      }
     }
 
-    private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
-      vertex.initTimeRequested = vertex.clock.getTime();
-
-      // VertexManager needs to be setup before attempting to Initialize any
-      // Inputs - since events generated by them will be routed to the
-      // VertexManager for handling.
-
-      if (vertex.dagVertexGroups != null && !vertex.dagVertexGroups.isEmpty()) {
-        List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
-        for (VertexGroupInfo groupInfo : vertex.dagVertexGroups.values()) {
-          if (groupInfo.edgeMergedInputs.containsKey(vertex.getName())) {
-            InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(vertex.getName());
-            groupSpecList.add(new GroupInputSpec(groupInfo.groupName, 
-                Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
+    // Check if any inputs need initializers
+    if (event != null) {
+      this.additionalInputs = event.getAdditionalInputs();
+      if (additionalInputs != null) {
+      // FIXME References to descriptor kept in both objects
+        for (InputSpec inputSpec : this.additionalInputSpecs) {
+          if (additionalInputs.containsKey(inputSpec.getSourceVertexName())
+                && additionalInputs.get(inputSpec.getSourceVertexName()).getDescriptor() != null) {
+            inputSpec.setInputDescriptor(
+                additionalInputs.get(inputSpec.getSourceVertexName()).getDescriptor());
           }
         }
-        if (!groupSpecList.isEmpty()) {
-          vertex.groupInputSpecList = groupSpecList;
-        }
       }
-      
-      // Check if any inputs need initializers
-      if (vertex.additionalInputs != null) {
-        LOG.info("Root Inputs exist for Vertex: " + vertex.getName() + " : "
-            + vertex.additionalInputs);
-        for (RootInputLeafOutputDescriptor<InputDescriptor> input : vertex.additionalInputs.values()) {
+    } else {
+      if (additionalInputs != null) {
+        LOG.info("Root Inputs exist for Vertex: " + getName() + " : "
+            + additionalInputs);
+        for (RootInputLeafOutputDescriptor<InputDescriptor> input : additionalInputs.values()) {
           if (input.getInitializerClassName() != null) {
-            if (vertex.inputsWithInitializers == null) {
-              vertex.inputsWithInitializers = Sets.newHashSet();
+            if (inputsWithInitializers == null) {
+              inputsWithInitializers = Sets.newHashSet();
             }
-            vertex.inputsWithInitializers.add(input.getEntityName());
+            inputsWithInitializers.add(input.getEntityName());
             LOG.info("Starting root input initializer for input: "
                 + input.getEntityName() + ", with class: ["
                 + input.getInitializerClassName() + "]");
           }
         }
       }
+    }
+
+    boolean hasBipartite = false;
+    if (sourceVertices != null) {
+      for (Edge edge : sourceVertices.values()) {
+        if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+          hasBipartite = true;
+          break;
+        }
+      }
+    }
+
+    if (hasBipartite && inputsWithInitializers != null) {
+      LOG.fatal("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
+      if (event != null) {
+        return VertexState.FAILED;
+      } else {
+        return finished(VertexState.FAILED);
+      }
+    }
+
+    boolean hasUserVertexManager = vertexPlan.hasVertexManagerPlugin();
+
+    if (hasUserVertexManager) {
+      VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
+          .convertVertexManagerPluginDescriptorFromDAGPlan(vertexPlan
+              .getVertexManagerPlugin());
+      LOG.info("Setting user vertex manager plugin: "
+          + pluginDesc.getClassName() + " on vertex: " + getName());
+      vertexManager = new VertexManager(pluginDesc, this, appContext);
+    } else {
+      if (hasBipartite) {
+        // setup vertex manager
+        // TODO this needs to consider data size and perhaps API.
+        // Currently implicitly BIPARTITE is the only edge type
+        LOG.info("Setting vertexManager to ShuffleVertexManager for "
+            + logIdentifier);
+        vertexManager = new VertexManager(new ShuffleVertexManager(),
+            this, appContext);
+      } else if (inputsWithInitializers != null) {
+        LOG.info("Setting vertexManager to RootInputVertexManager for "
+            + logIdentifier);
+        vertexManager = new VertexManager(new RootInputVertexManager(),
+            this, appContext);
+      } else {
+        // schedule all tasks upon vertex start. Default behavior.
+        LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
+            + logIdentifier);
+        vertexManager = new VertexManager(
+            new ImmediateStartVertexManager(), this, appContext);
+      }
+    }
+
+    vertexManager.initialize();
+
+    // Setup tasks early if possible. If the VertexManager is not being used
+    // to set parallelism, sending events to Tasks is safe (and less confusing
+    // then relying on tasks to be created after TaskEvents are generated).
+    // For VertexManagers setting parallelism, the setParallelism call needs
+    // to be inline.
+    if (event != null) {
+      numTasks = event.getNumTasks();
+    } else {
+      numTasks = getVertexPlan().getTaskConfig().getNumTasks();
+    }
+
+    if (!(numTasks == -1 || numTasks >= 0)) {
+      addDiagnostic("Invalid task count for vertex"
+          + ", numTasks=" + numTasks);
+      trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
+      if (event != null) {
+        abortVertex(VertexStatus.State.FAILED);
+        return finished(VertexState.FAILED);
+      } else {
+        return VertexState.FAILED;
+      }
+    }
+
+    checkTaskLimits();
+    return VertexState.INITED;
+  }
+
+  public static class StartRecoverTransition implements
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
+      VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vertexEvent;
+      VertexState desiredState = recoverEvent.getDesiredState();
 
-      boolean hasBipartite = false;
-      if (vertex.sourceVertices != null) {
-        for (Edge edge : vertex.sourceVertices.values()) {
-          if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
-            hasBipartite = true;
+      switch (desiredState) {
+        case RUNNING:
+          break;
+        case SUCCEEDED:
+        case KILLED:
+        case FAILED:
+        case ERROR:
+          switch (desiredState) {
+            case SUCCEEDED:
+              vertex.succeededTaskCount = vertex.numTasks;
+              vertex.completedTaskCount = vertex.numTasks;
+              break;
+            case KILLED:
+              vertex.killedTaskCount = vertex.numTasks;
+              break;
+            case FAILED:
+            case ERROR:
+              vertex.failedTaskCount = vertex.numTasks;
+              break;
+          }
+          if (vertex.tasks != null) {
+            TaskState taskState = TaskState.KILLED;
+            switch (desiredState) {
+              case SUCCEEDED:
+                taskState = TaskState.SUCCEEDED;
+                break;
+              case KILLED:
+                taskState = TaskState.KILLED;
+                break;
+              case FAILED:
+              case ERROR:
+                taskState = TaskState.FAILED;
+                break;
+            }
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId(),
+                      taskState));
+            }
+          }
+          LOG.info("DAG informed Vertex of its final completed state"
+              + ", vertex=" + vertex.logIdentifier
+              + ", state=" + desiredState);
+          return desiredState;
+        default:
+          LOG.info("Unhandled desired state provided by DAG"
+              + ", vertex=" + vertex.logIdentifier
+              + ", state=" + desiredState);
+          vertex.finished(VertexState.ERROR);
+      }
+
+      VertexState endState;
+      switch (vertex.recoveredState) {
+        case NEW:
+          // Trigger init and start as desired state is RUNNING
+          // Drop all root events
+          Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
+          while (iterator.hasNext()) {
+            if (iterator.next().getEventType().equals(
+                EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+              iterator.remove();
+            }
+          }
+          vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+              VertexEventType.V_INIT));
+          vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+              VertexEventType.V_START));
+          endState = VertexState.NEW;
+          break;
+        case INITED:
+          try {
+            vertex.initializeCommitters();
+          } catch (Exception e) {
+            LOG.info("Failed to initialize committers", e);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.INIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+
+          // Recover tasks
+          if (vertex.tasks != null) {
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId()));
+            }
+          }
+          // Update tasks with their input payloads as needed
+
+          vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+              VertexEventType.V_START));
+          if (vertex.getInputVertices().isEmpty()) {
+            endState = VertexState.INITED;
+          } else {
+            endState = VertexState.RECOVERING;
+          }
+          break;
+        case RUNNING:
+          vertex.tasksNotYetScheduled = false;
+          try {
+            vertex.initializeCommitters();
+          } catch (Exception e) {
+            LOG.info("Failed to initialize committers", e);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.INIT_FAILURE);
+            endState = VertexState.FAILED;
             break;
           }
+
+          // if commit in progress and desired state is not a succeeded one,
+          // move to failed
+          if (vertex.recoveryCommitInProgress) {
+            LOG.info("Recovered vertex was in the middle of a commit"
+                + ", failing Vertex=" + vertex.logIdentifier);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.COMMIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+          assert vertex.tasks.size() == vertex.numTasks;
+          if (vertex.tasks != null) {
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId()));
+            }
+            vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+            endState = VertexState.RUNNING;
+          } else {
+            endState = VertexState.SUCCEEDED;
+            vertex.finished(endState);
+          }
+          break;
+        case SUCCEEDED:
+        case FAILED:
+        case KILLED:
+          vertex.tasksNotYetScheduled = false;
+          // recover tasks
+          if (vertex.tasks != null) {
+            TaskState taskState = TaskState.KILLED;
+            switch (vertex.recoveredState) {
+              case SUCCEEDED:
+                taskState = TaskState.SUCCEEDED;
+                break;
+              case KILLED:
+                taskState = TaskState.KILLED;
+                break;
+              case FAILED:
+                taskState = TaskState.FAILED;
+                break;
+            }
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId(),
+                      taskState));
+            }
+            vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+            endState = VertexState.RUNNING;
+          } else {
+            endState = vertex.recoveredState;
+            vertex.finished(endState);
+          }
+          break;
+        default:
+          LOG.warn("Invalid recoveredState found when trying to recover"
+              + " vertex, recoveredState=" + vertex.recoveredState);
+          vertex.finished(VertexState.ERROR);
+          endState = VertexState.ERROR;
+          break;
+      }
+      if (!endState.equals(VertexState.RECOVERING)) {
+        LOG.info("Recovered Vertex State"
+            + ", vertexId=" + vertex.logIdentifier
+            + ", state=" + endState
+            + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices
+            + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices
+            + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
+            + ", recoveredEvents="
+            + ( vertex.recoveredEvents == null ? "null" : vertex.recoveredEvents.size())
+            + ", tasksIsNull=" + (vertex.tasks == null)
+            + ", numTasks=" + ( vertex.tasks == null ? "null" : vertex.tasks.size()));
+        for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
+          vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
+              entry.getKey().getVertexId(),
+              vertex.vertexId, endState, null));
         }
       }
-      
-      if (hasBipartite && vertex.inputsWithInitializers != null) {
-        LOG.fatal("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
-        return vertex.finished(VertexState.FAILED);
+      if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED)
+          .contains(endState)) {
+        // Send events downstream
+        vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
+        vertex.recoveredEvents.clear();
+      } else {
+        // Ensure no recovered events
+        if (!vertex.recoveredEvents.isEmpty()) {
+          throw new RuntimeException("Invalid Vertex state"
+              + ", found non-zero recovered events in invalid state"
+              + ", recoveredState=" + endState
+              + ", recoveredEvents=" + vertex.recoveredEvents.size());
+        }
       }
-      
-      boolean hasUserVertexManager = vertex.vertexPlan.hasVertexManagerPlugin();
-      
-      if (hasUserVertexManager) {
-        VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
-            .convertVertexManagerPluginDescriptorFromDAGPlan(vertex.vertexPlan
-                .getVertexManagerPlugin());
-        LOG.info("Setting user vertex manager plugin: "
-            + pluginDesc.getClassName() + " on vertex: " + vertex.getName());
-        vertex.vertexManager = new VertexManager(pluginDesc, vertex, vertex.appContext);
+      return endState;
+    }
+
+  }
+
+  private void routeRecoveredEvents(VertexState vertexState,
+      List<TezEvent> tezEvents) {
+    for (TezEvent tezEvent : tezEvents) {
+      EventMetaData sourceMeta = tezEvent.getSourceInfo();
+      TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
+      if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
+        ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+      } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
+        ((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+      } else if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) {
+        ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+      } else if (tezEvent.getEventType() == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) {
+        if (vertexState == VertexState.RUNNING
+            || vertexState == VertexState.INITED) {
+          // Only routed if vertex is still running
+          eventHandler.handle(new VertexEventRouteEvent(
+              this.getVertexId(), Collections.singletonList(tezEvent), true));
+        }
+        continue;
+      }
+
+      Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName());
+      Edge destEdge = targetVertices.get(destVertex);
+      if (destEdge == null) {
+        throw new TezUncheckedException("Bad destination vertex: " +
+            sourceMeta.getEdgeVertexName() + " for event vertex: " +
+            getVertexId());
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Routing recovered event"
+            + ", eventType=" + tezEvent.getEventType()
+            + ", sourceInfo=" + sourceMeta
+            + ", destinationVertex" + destVertex.getName());
+      }
+      eventHandler.handle(new VertexEventRouteEvent(destVertex
+          .getVertexId(), Collections.singletonList(tezEvent), true));
+    }
+  }
+
+  public static class RecoverTransition implements
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
+      VertexEventSourceVertexRecovered sourceRecoveredEvent =
+          (VertexEventSourceVertexRecovered) vertexEvent;
+      ++vertex.numRecoveredSourceVertices;
+
+      switch (sourceRecoveredEvent.getSourceVertexState()) {
+        case NEW:
+          // Nothing to do
+          break;
+        case INITED:
+          ++vertex.numInitedSourceVertices;
+          break;
+        case RUNNING:
+        case SUCCEEDED:
+          ++vertex.numInitedSourceVertices;
+          ++vertex.numStartedSourceVertices;
+          if (sourceRecoveredEvent.getCompletedTaskAttempts() != null) {
+            vertex.pendingReportedSrcCompletions.addAll(
+                sourceRecoveredEvent.getCompletedTaskAttempts());
+          }
+          break;
+        case FAILED:
+        case KILLED:
+        case ERROR:
+          // Nothing to do
+          // Recover as if source vertices have not inited/started
+          break;
+        default:
+          LOG.warn("Received invalid SourceVertexRecovered event"
+              + ", sourceVertex=" + sourceRecoveredEvent.getSourceVertexID()
+              + ", sourceVertexState=" + sourceRecoveredEvent.getSourceVertexState());
+          return vertex.finished(VertexState.ERROR);
+      }
+
+      if (vertex.numRecoveredSourceVertices !=
+          vertex.getInputVerticesCount()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Waiting for source vertices to recover"
+              + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
+              + ", totalSourceVertices=" + vertex.getInputVerticesCount());
+        }
+        return VertexState.RECOVERING;
+      }
+
+
+      // Complete recovery
+      VertexState endState = VertexState.NEW;
+      List<TezTaskAttemptID> completedTaskAttempts = Lists.newLinkedList();
+      switch (vertex.recoveredState) {
+        case NEW:
+          // Drop all root events if not inited properly
+          Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
+          while (iterator.hasNext()) {
+            if (iterator.next().getEventType().equals(
+                EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+              iterator.remove();
+            }
+          }
+          // Trigger init if all sources initialized
+          if (vertex.numInitedSourceVertices == vertex.getInputVerticesCount()) {
+            vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+                VertexEventType.V_INIT));
+          }
+          if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
+            vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+                VertexEventType.V_START));
+          }
+          endState = VertexState.NEW;
+          break;
+        case INITED:
+          try {
+            vertex.initializeCommitters();
+          } catch (Exception e) {
+            LOG.info("Failed to initialize committers", e);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.INIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+          if (!vertex.setParallelism(0,
+              null, vertex.recoveredSourceEdgeManagers, true)) {
+            LOG.info("Failed to recover edge managers");
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.INIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+          // Recover tasks
+          if (vertex.tasks != null) {
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId()));
+            }
+          }
+          if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
+            vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+              VertexEventType.V_START));
+          }
+          endState = VertexState.INITED;
+          break;
+        case RUNNING:
+          vertex.tasksNotYetScheduled = false;
+          // if commit in progress and desired state is not a succeeded one,
+          // move to failed
+          if (vertex.recoveryCommitInProgress) {
+            LOG.info("Recovered vertex was in the middle of a commit"
+                + ", failing Vertex=" + vertex.logIdentifier);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.COMMIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+          try {
+            vertex.initializeCommitters();
+          } catch (Exception e) {
+            LOG.info("Failed to initialize committers", e);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.INIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+          if (!vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers, true)) {
+            LOG.info("Failed to recover edge managers");
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.INIT_FAILURE);
+            endState = VertexState.FAILED;
+            break;
+          }
+          assert vertex.tasks.size() == vertex.numTasks;
+          if (vertex.tasks != null) {
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId()));
+            }
+            vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+            endState = VertexState.RUNNING;
+          } else {
+            endState = VertexState.SUCCEEDED;
+            vertex.finished(endState);
+          }
+          break;
+        case SUCCEEDED:
+        case FAILED:
+        case KILLED:
+          vertex.tasksNotYetScheduled = false;
+          // recover tasks
+          assert vertex.tasks.size() == vertex.numTasks;
+          if (vertex.tasks != null) {
+            TaskState taskState = TaskState.KILLED;
+            switch (vertex.recoveredState) {
+              case SUCCEEDED:
+                taskState = TaskState.SUCCEEDED;
+                break;
+              case KILLED:
+                taskState = TaskState.KILLED;
+                break;
+              case FAILED:
+                taskState = TaskState.FAILED;
+                break;
+            }
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId(),
+                      taskState));
+            }
+            // Wait for all tasks to recover and report back
+            vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+            endState = VertexState.RUNNING;
+          } else {
+            endState = vertex.recoveredState;
+            vertex.finished(endState);
+          }
+          break;
+        default:
+          LOG.warn("Invalid recoveredState found when trying to recover"
+              + " vertex, recoveredState=" + vertex.recoveredState);
+          vertex.finished(VertexState.ERROR);
+          endState = VertexState.ERROR;
+          break;
+      }
+
+      LOG.info("Recovered Vertex State"
+          + ", vertexId=" + vertex.logIdentifier
+          + ", state=" + endState
+          + ", numInitedSourceVertices" + vertex.numInitedSourceVertices
+          + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices
+          + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
+          + ", tasksIsNull=" + (vertex.tasks == null)
+          + ", numTasks=" + ( vertex.tasks == null ? 0 : vertex.tasks.size()));
+
+      for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
+        vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
+            entry.getKey().getVertexId(),
+            vertex.vertexId, endState, completedTaskAttempts));
+      }
+      if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED)
+          .contains(endState)) {
+        // Send events downstream
+        vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
+        vertex.recoveredEvents.clear();
       } else {
-        if (hasBipartite) {
-          // setup vertex manager
-          // TODO this needs to consider data size and perhaps API.
-          // Currently implicitly BIPARTITE is the only edge type
-          LOG.info("Setting vertexManager to ShuffleVertexManager for "
-              + vertex.logIdentifier);
-          vertex.vertexManager = new VertexManager(new ShuffleVertexManager(),
-              vertex, vertex.appContext);
-        } else if (vertex.inputsWithInitializers != null) {
-          LOG.info("Setting vertexManager to RootInputVertexManager for "
-              + vertex.logIdentifier);
-          vertex.vertexManager = new VertexManager(new RootInputVertexManager(),
-              vertex, vertex.appContext);
-        } else {
-          // schedule all tasks upon vertex start. Default behavior.
-          LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
-              + vertex.logIdentifier);
-          vertex.vertexManager = new VertexManager(
-              new ImmediateStartVertexManager(), vertex, vertex.appContext);
+        // Ensure no recovered events
+        if (!vertex.recoveredEvents.isEmpty()) {
+          throw new RuntimeException("Invalid Vertex state"
+              + ", found non-zero recovered events in invalid state"
+              + ", recoveredState=" + endState
+              + ", recoveredEvents=" + vertex.recoveredEvents.size());
         }
       }
-      
-      vertex.vertexManager.initialize();
-
-      // Setup tasks early if possible. If the VertexManager is not being used
-      // to set parallelism, sending events to Tasks is safe (and less confusing
-      // then relying on tasks to be created after TaskEvents are generated).
-      // For VertexManagers setting parallelism, the setParallelism call needs
-      // to be inline.
-      vertex.numTasks = vertex.getVertexPlan().getTaskConfig().getNumTasks();
-      if (!(vertex.numTasks == -1 || vertex.numTasks >= 0)) {
-        vertex.addDiagnostic("Invalid task count for vertex"
-          + ", numTasks=" + vertex.numTasks);
-        vertex.trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
-        vertex.abortVertex(VertexStatus.State.FAILED);
-        return vertex.finished(VertexState.FAILED);
+      return endState;
+    }
+
+  }
+
+  public static class InitTransition implements
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      VertexState vertexState = VertexState.NEW;
+      vertex.numInitedSourceVertices++;
+      if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ||
+          vertex.numInitedSourceVertices == vertex.sourceVertices.size()) {
+        vertexState = handleInitEvent(vertex, event);
+        if (vertexState != VertexState.FAILED) {
+          if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
+            for (Vertex target : vertex.targetVertices.keySet()) {
+              vertex.getEventHandler().handle(new VertexEvent(target.getVertexId(),
+                VertexEventType.V_INIT));
+            }
+          }
+        }
       }
+      return vertexState;
+    }
 
-      vertex.checkTaskLimits();
+    private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
+      VertexState state = vertex.setupVertex();
+      if (state.equals(VertexState.FAILED)) {
+        return state;
+      }
 
       // Create tasks based on initial configuration, but don't start them yet.
       if (vertex.numTasks == -1) {
@@ -1862,7 +2571,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertex.numSuccessSourceAttemptCompletions++;
         if (vertex.getState() == VertexState.RUNNING) {
           vertex.vertexManager.onSourceTaskCompleted(completionEvent
-              .getTaskAttemptId());
+              .getTaskAttemptId().getTaskID());
         } else {
           vertex.pendingReportedSrcCompletions.add(completionEvent.getTaskAttemptId());
         }
@@ -2034,9 +2743,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
       VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
+      boolean recovered = rEvent.isRecovered();
       List<TezEvent> tezEvents = rEvent.getEvents();
 
       if (vertex.getAppContext().isRecoveryEnabled()
+          && !recovered
           && !tezEvents.isEmpty()) {
         List<TezEvent> dataMovementEvents =
             Lists.newArrayList();
@@ -2061,7 +2772,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       for(TezEvent tezEvent : tezEvents) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Vertex: " + vertex.getName() + " routing event: "
-              + tezEvent.getEventType());
+              + tezEvent.getEventType()
+              + " Recovered:" + recovered);
         }
         EventMetaData sourceMeta = tezEvent.getSourceInfo();
         switch(tezEvent.getEventType()) {
@@ -2074,7 +2786,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
               if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
                 ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
-              } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) { 
+              } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
                 ((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
               } else {
                 ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
@@ -2396,7 +3108,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return this.vertexManager;
   }
 
-  private static void logLocationHints(VertexLocationHint locationHint) {
+  private static void logLocationHints(String vertexName,
+      VertexLocationHint locationHint) {
+    if (locationHint == null) {
+      LOG.debug("No Vertex LocationHint specified for vertex=" + vertexName);
+      return;
+    }
     Multiset<String> hosts = HashMultiset.create();
     Multiset<String> racks = HashMultiset.create();
     int counter = 0;
@@ -2421,18 +3138,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           sb.append(rack).append(", ");
         }
       }
-      LOG.debug("Location: " + counter + " : " + sb.toString());
+      LOG.debug("Vertex: " + vertexName + ", Location: "
+          + counter + " : " + sb.toString());
       counter++;
     }
 
-    LOG.debug("Host Counts");
+    LOG.debug("Vertex: " + vertexName + ", Host Counts");
     for (Multiset.Entry<String> host : hosts.entrySet()) {
-      LOG.debug("host: " + host.toString());
+      LOG.debug("Vertex: " + vertexName + ", host: " + host.toString());
     }
 
-    LOG.debug("Rack Counts");
+    LOG.debug("Vertex: " + vertexName + ", Rack Counts");
     for (Multiset.Entry<String> rack : racks.entrySet()) {
-      LOG.debug("rack: " + rack.toString());
+      LOG.debug("Vertex: " + vertexName + ", rack: " + rack.toString());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index e08e7ed..6f75481 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -147,6 +147,11 @@ public class VertexManager {
       managedVertex.setVertexLocationHint(locationHint);
     }
 
+    @Override
+    public int getDAGAttemptNumber() {
+      return appContext.getApplicationAttemptId().getAttemptId();
+    }
+
     private void verifyIsRootInput(String inputName) {
       Preconditions.checkState(managedVertex.getAdditionalInputs().get(inputName) != null,
           "Cannot add events for non-root inputs");
@@ -222,23 +227,24 @@ public class VertexManager {
 
   public void onVertexStarted(List<TezTaskAttemptID> completions) {
     Map<String, List<Integer>> pluginCompletionsMap = Maps.newHashMap();
-    for (TezTaskAttemptID attemptId : completions) {
-      TezTaskID tezTaskId = attemptId.getTaskID();
-      Integer taskId = new Integer(tezTaskId.getId());
-      String vertexName = 
-          appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
-      List<Integer> taskIdList = pluginCompletionsMap.get(vertexName);
-      if (taskIdList == null) {
-        taskIdList = Lists.newArrayList();
-        pluginCompletionsMap.put(vertexName, taskIdList);
+    if (completions != null && !completions.isEmpty()) {
+      for (TezTaskAttemptID tezTaskAttemptID : completions) {
+        Integer taskId = new Integer(tezTaskAttemptID.getTaskID().getId());
+        String vertexName =
+            appContext.getCurrentDAG().getVertex(
+                tezTaskAttemptID.getTaskID().getVertexID()).getName();
+        List<Integer> taskIdList = pluginCompletionsMap.get(vertexName);
+        if (taskIdList == null) {
+          taskIdList = Lists.newArrayList();
+          pluginCompletionsMap.put(vertexName, taskIdList);
+        }
+        taskIdList.add(taskId);
       }
-      taskIdList.add(taskId);
     }
     plugin.onVertexStarted(pluginCompletionsMap);
   }
 
-  public void onSourceTaskCompleted(TezTaskAttemptID attemptId) {
-    TezTaskID tezTaskId = attemptId.getTaskID();
+  public void onSourceTaskCompleted(TezTaskID tezTaskId) {
     Integer taskId = new Integer(tezTaskId.getId());
     String vertexName = 
         appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index a71686b..7b2087a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -27,11 +27,14 @@ public enum HistoryEventType {
   DAG_FINISHED,
   VERTEX_INITIALIZED,
   VERTEX_STARTED,
+  VERTEX_PARALLELISM_UPDATED,
   VERTEX_FINISHED,
   TASK_STARTED,
   TASK_FINISHED,
   TASK_ATTEMPT_STARTED,
   TASK_ATTEMPT_FINISHED,
   CONTAINER_LAUNCHED,
-  VERTEX_DATA_MOVEMENT_EVENTS_GENERATED
+  VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
+  DAG_COMMIT_STARTED,
+  VERTEX_COMMIT_STARTED
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
index 4ec0632..690e850 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
@@ -22,16 +22,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.recovery.RecoveryService;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
 
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
index 4794a7b..54bc658 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
@@ -98,7 +98,7 @@ public class AMLaunchedEvent implements HistoryEvent {
 
   @Override
   public boolean isRecoveryEvent() {
-    return true;
+    return false;
   }
 
   @Override
@@ -139,4 +139,16 @@ public class AMLaunchedEvent implements HistoryEvent {
     fromProto(proto);
   }
 
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  public long getLaunchTime() {
+    return launchTime;
+  }
+
+  public long getAppSubmitTime() {
+    return appSubmitTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
index b3cbb5c..e66141b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
@@ -129,4 +129,13 @@ public class AMStartedEvent implements HistoryEvent {
     fromProto(proto);
   }
 
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
index 066f315..471ddd1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
@@ -138,4 +138,16 @@ public class ContainerLaunchedEvent implements HistoryEvent {
         + ", launchTime=" + launchTime;
   }
 
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  public long getLaunchTime() {
+    return launchTime;
+  }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
 }