You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:08 UTC

[02/50] [abbrv] git commit: TEZ-850. Recovery unit tests. (Jeff Zhang via hitesh)

TEZ-850. Recovery unit tests. (Jeff Zhang via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f65e65ae
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f65e65ae
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f65e65ae

Branch: refs/heads/branch-0.5
Commit: f65e65aea8cbdb44dd65c6590fbe38dd84413a5a
Parents: d6589d3
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Sep 11 13:22:21 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Sep 11 13:22:21 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  45 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   3 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  12 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 240 +++---
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |   6 +-
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   | 514 +++++++++++
 .../app/dag/impl/TestTaskAttemptRecovery.java   | 178 ++++
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  | 629 +++++++++++---
 .../dag/app/dag/impl/TestVertexRecovery.java    | 860 +++++++++++++++++++
 10 files changed, 2243 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 032438f..4fc7e83 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
 ALL CHANGES:
   TEZ-1544. Link to release artifacts for 0.5.0 does not point to a specific link for 0.5.0.
   TEZ-1559. Add system tests for AM recovery.
+  TEZ-850. Recovery unit tests.
 
 Release 0.5.1: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 24d2e6b..daaa81b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -139,7 +139,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final TaskAttemptListener taskAttemptListener;
   private final TaskHeartbeatHandler taskHeartbeatHandler;
   private final Object tasksSyncHandle = new Object();
-  
+
   private volatile boolean committedOrAborted = false;
   private volatile boolean allOutputsCommitted = false;
   boolean commitAllOutputsOnSuccess = true;
@@ -157,7 +157,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final StateChangeNotifier entityUpdateTracker;
 
   volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
-  private Map<String, Edge> edges = new HashMap<String, Edge>();
+  @VisibleForTesting
+  Map<String, Edge> edges = new HashMap<String, Edge>();
   private TezCounters dagCounters = new TezCounters();
   private Object fullCountersLock = new Object();
   private TezCounters fullCounters = null;
@@ -359,14 +360,18 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private DAGTerminationCause terminationCause;
   private Credentials credentials;
 
-  private long initTime;
-  private long startTime;
-  private long finishTime;
-  
+  @VisibleForTesting
+  long initTime;
+  @VisibleForTesting
+  long startTime;
+  @VisibleForTesting
+  long finishTime;
+
   Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap();
   Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
   private DAGState recoveredState = DAGState.NEW;
-  private boolean recoveryCommitInProgress = false;
+  @VisibleForTesting
+  boolean recoveryCommitInProgress = false;
   Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>();
 
   static class VertexGroupInfo {
@@ -381,7 +386,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       groupMembers = Sets.newHashSet(groupInfo.getGroupMembersList());
       edgeMergedInputs = Maps.newHashMapWithExpectedSize(groupInfo.getEdgeMergedInputsCount());
       for (PlanGroupInputEdgeInfo edgInfo : groupInfo.getEdgeMergedInputsList()) {
-        edgeMergedInputs.put(edgInfo.getDestVertexName(), 
+        edgeMergedInputs.put(edgInfo.getDestVertexName(),
             DagTypeConverters.convertInputDescriptorFromDAGPlan(edgInfo.getMergedInput()));
       }
       outputs = Sets.newHashSet(groupInfo.getOutputsList());
@@ -706,7 +711,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       }
     }
   }
-  
+
   private boolean commitOutput(String outputName, OutputCommitter outputCommitter) {
     final OutputCommitter committer = outputCommitter;
     try {
@@ -723,7 +728,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
     return false;
   }
-  
+
   private synchronized boolean commitOrAbortOutputs(boolean dagSucceeded) {
     if (this.committedOrAborted) {
       LOG.info("Ignoring multiple output commit/abort");
@@ -731,7 +736,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
     LOG.info("Calling DAG commit/abort for dag: " + getID());
     this.committedOrAborted = true;
-    
+
     boolean successfulOutputsAlreadyCommitted = !commitAllOutputsOnSuccess;
     boolean failedWhileCommitting = false;
     if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
@@ -772,7 +777,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           LOG.info("No output committers for vertex: " + vertex.getName());
           continue;
         }
-        Map<String, OutputCommitter> outputCommitters = 
+        Map<String, OutputCommitter> outputCommitters =
             new HashMap<String, OutputCommitter>(vertex.getOutputCommitters());
         Set<String> sharedOutputs = vertex.getSharedOutputs();
         // remove shared outputs
@@ -793,7 +798,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           LOG.info("Committing output: " + entry.getKey() + " for vertex: "
               + vertex.getVertexId());
           if (vertex.getState() != VertexState.SUCCEEDED) {
-            throw new TezUncheckedException("Vertex: " + vertex.getName() + 
+            throw new TezUncheckedException("Vertex: " + vertex.getName() +
                 " not in SUCCEEDED state. State= " + vertex.getState());
           }
           if (!commitOutput(entry.getKey(), entry.getValue())) {
@@ -803,11 +808,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         }
       }
     }
-    
+
     if (failedWhileCommitting) {
       LOG.info("DAG: " + getID() + " failed while committing");
     }
-        
+
     if (!dagSucceeded || failedWhileCommitting) {
       // come here because dag failed or
       // dag succeeded and all or none semantics were on and a commit failed
@@ -1026,9 +1031,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     if (finishTime == 0) {
       setFinishTime();
     }
-    
+
     boolean allOutputsCommitted = commitOrAbortOutputs(finalState == DAGState.SUCCEEDED);
-    
+
     if (finalState == DAGState.SUCCEEDED && !allOutputsCommitted) {
       finalState = DAGState.FAILED;
       trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
@@ -1057,7 +1062,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     LOG.info("DAG: " + getID() + " finished with state: " + finalState);
     return finalState;
   }
-  
+
   private DAGStatus.State getDAGStatusFromState(DAGState finalState) {
     switch (finalState) {
       case NEW:
@@ -1631,7 +1636,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           + ", numFailedVertices=" + job.numFailedVertices
           + ", numKilledVertices=" + job.numKilledVertices
           + ", numVertices=" + job.numVertices);
-      
+
       if (failed) {
         return DAGState.TERMINATING;
       }
@@ -1724,7 +1729,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     addDiagnostic("Vertex re-running"
       + ", vertexName=" + vertex.getName()
       + ", vertexId=" + vertex.getVertexId());
-    
+
     if (!commitAllOutputsOnSuccess) {
       // partial output may already have been committed. fail if so
       List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertex.getName());

http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index e63dbf5..7ba90b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -137,7 +137,8 @@ public class TaskAttemptImpl implements TaskAttempt,
   private String nodeHttpAddress;
   private String nodeRackName;
 
-  private TaskAttemptStatus reportedStatus;
+  @VisibleForTesting
+  TaskAttemptStatus reportedStatus;
   private DAGCounter localityCounter;
 
   // Used to store locality information when

http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index ecd2bcc..1dd711b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -116,7 +116,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   protected final AppContext appContext;
   private final Resource taskResource;
   private final ContainerContext containerContext;
-  private long scheduledTime;
+  @VisibleForTesting
+  long scheduledTime;
 
   private final List<TezEvent> tezEventsForTaskAttempts = new ArrayList<TezEvent>();
   private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
@@ -124,7 +125,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   // counts the number of attempts that are either running or in a state where
   //  they will come to be running when they get a Container
-  private int numberUncompletedAttempts = 0;
+  @VisibleForTesting
+  int numberUncompletedAttempts = 0;
 
   private boolean historyTaskStartGenerated = false;
 
@@ -290,11 +292,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   //saying COMMIT_PENDING
   private TezTaskAttemptID commitAttempt;
 
-  private TezTaskAttemptID successfulAttempt;
+  @VisibleForTesting
+  TezTaskAttemptID successfulAttempt;
 
   @VisibleForTesting
   int failedAttempts;
-  private int finishedAttempts;//finish are total of success, failed and killed
+  @VisibleForTesting
+  int finishedAttempts;//finish are total of success, failed and killed
 
   private final boolean leafVertex;
   private TaskState recoveredState = TaskState.NEW;

http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ff556ba..31240cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -205,9 +205,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   //fields initialized in init
 
-  private int numStartedSourceVertices = 0;
-  private int numInitedSourceVertices = 0;
-  private int numRecoveredSourceVertices = 0;
+  @VisibleForTesting
+  int numStartedSourceVertices = 0;
+  @VisibleForTesting
+  int numInitedSourceVertices = 0;
+  @VisibleForTesting
+  int numRecoveredSourceVertices = 0;
 
   private int distanceFromRoot = 0;
 
@@ -238,7 +241,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       new VertexStateChangedCallback();
 
   private VertexState recoveredState = VertexState.NEW;
-  private List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
+  @VisibleForTesting
+  List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
   private boolean vertexAlreadyInitialized = false;
 
   protected static final
@@ -254,7 +258,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                       VertexState.INITIALIZING, VertexState.FAILED),
                   VertexEventType.V_INIT,
                   new InitTransition())
-          .addTransition(VertexState.NEW, 
+          .addTransition(VertexState.NEW,
                 EnumSet.of(VertexState.NEW),
                   VertexEventType.V_NULL_EDGE_INITIALIZED,
                   new NullEdgeInitializedTransition())
@@ -308,7 +312,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
           // Transitions from INITIALIZING state
           .addTransition(VertexState.INITIALIZING,
-              EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, 
+              EnumSet.of(VertexState.INITIALIZING, VertexState.INITED,
                   VertexState.FAILED),
               VertexEventType.V_ROOT_INPUT_INITIALIZED,
               new RootInputInitializedTransition())
@@ -341,14 +345,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.INITIALIZING, VertexState.ERROR,
               VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-          .addTransition(VertexState.INITIALIZING, 
+          .addTransition(VertexState.INITIALIZING,
               EnumSet.of(VertexState.INITIALIZING, VertexState.INITED,
                   VertexState.FAILED),
                   VertexEventType.V_NULL_EDGE_INITIALIZED,
                   new NullEdgeInitializedTransition())
 
           // Transitions from INITED state
-          // SOURCE_VERTEX_STARTED - for sources which determine parallelism, 
+          // SOURCE_VERTEX_STARTED - for sources which determine parallelism,
           // they must complete before this vertex can start.
           .addTransition
               (VertexState.INITED,
@@ -358,14 +362,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.INITED, VertexState.INITED,
               VertexEventType.V_SOURCE_VERTEX_STARTED,
               new SourceVertexStartedTransition())
-          .addTransition(VertexState.INITED, 
+          .addTransition(VertexState.INITED,
               EnumSet.of(VertexState.INITED),
               VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
               new OneToOneSourceSplitTransition())
           .addTransition(VertexState.INITED,  VertexState.INITED,
               VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
               SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
-          .addTransition(VertexState.INITED, 
+          .addTransition(VertexState.INITED,
               EnumSet.of(VertexState.RUNNING, VertexState.INITED),
               VertexEventType.V_START,
               new StartTransition())
@@ -393,7 +397,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexState.ERROR),
               VertexEventType.V_TASK_COMPLETED,
               new TaskCompletedTransition())
-          .addTransition(VertexState.RUNNING, 
+          .addTransition(VertexState.RUNNING,
               EnumSet.of(VertexState.RUNNING),
               VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
               new OneToOneSourceSplitTransition())
@@ -455,7 +459,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexEventType.V_ROUTE_EVENT,
               ROUTE_EVENT_TRANSITION)
           .addTransition(
-              VertexState.SUCCEEDED, 
+              VertexState.SUCCEEDED,
               EnumSet.of(VertexState.FAILED, VertexState.ERROR),
               VertexEventType.V_TASK_COMPLETED,
               new TaskCompletedAfterVertexSuccessTransition())
@@ -551,17 +555,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private final StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl> stateMachine;
 
   //changing fields while the vertex is running
-  private int numTasks;
-  private int completedTaskCount = 0;
-  private int succeededTaskCount = 0;
-  private int failedTaskCount = 0;
-  private int killedTaskCount = 0;
-
-  private long initTimeRequested; // Time at which INIT request was received.
-  private long initedTime; // Time when entering state INITED
-  private long startTimeRequested; // Time at which START request was received.
-  private long startedTime; // Time when entering state STARTED
-  private long finishTime;
+  @VisibleForTesting
+  int numTasks;
+  @VisibleForTesting
+  int completedTaskCount = 0;
+  @VisibleForTesting
+  int succeededTaskCount = 0;
+  @VisibleForTesting
+  int failedTaskCount = 0;
+  @VisibleForTesting
+  int killedTaskCount = 0;
+
+  @VisibleForTesting
+  long initTimeRequested; // Time at which INIT request was received.
+  @VisibleForTesting
+  long initedTime; // Time when entering state INITED
+  @VisibleForTesting
+  long startTimeRequested; // Time at which START request was received.
+  @VisibleForTesting
+  long startedTime; // Time when entering state STARTED
+  @VisibleForTesting
+  long finishTime;
   private float progress;
 
   private final TezVertexID vertexId;  //runtime assigned id.
@@ -576,14 +590,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private Map<Vertex, Edge> targetVertices;
   Set<Edge> uninitializedEdges = Sets.newHashSet();
 
-  private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+  private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
     rootInputDescriptors;
-  private Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> 
+  private Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>
     additionalOutputs;
   private Map<String, OutputCommitter> outputCommitters;
   private Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>();
   private static final InputSpecUpdate DEFAULT_ROOT_INPUT_SPECS = InputSpecUpdate
-      .getDefaultSinglePhysicalInputSpecUpdate(); 
+      .getDefaultSinglePhysicalInputSpecUpdate();
   private final List<OutputSpec> additionalOutputSpecs = new ArrayList<OutputSpec>();
   private Set<String> inputsWithInitializers;
   private int numInitializedInputs;
@@ -598,7 +612,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private RootInputInitializerManager rootInputInitializerManager;
 
   VertexManager vertexManager;
-  
+
   private final UserGroupInformation dagUgi;
 
   private boolean parallelismSet = false;
@@ -607,20 +621,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private AtomicBoolean committed = new AtomicBoolean(false);
   private AtomicBoolean aborted = new AtomicBoolean(false);
   private boolean commitVertexOutputs = false;
-  
+
   private Map<String, VertexGroupInfo> dagVertexGroups;
-  
+
   private TaskLocationHint taskLocationHints[];
   private Map<String, LocalResource> localResources;
   private Map<String, String> environment;
   private final String javaOpts;
   private final ContainerContext containerContext;
   private VertexTerminationCause terminationCause;
-  
+
   private String logIdentifier;
-  private boolean recoveryCommitInProgress = false;
+  @VisibleForTesting
+  boolean recoveryCommitInProgress = false;
   private boolean summaryCompleteSeen = false;
-  private boolean hasCommitter = false;
+  @VisibleForTesting
+  boolean hasCommitter = false;
   private boolean vertexCompleteSeen = false;
   private Map<String,EdgeManagerPluginDescriptor> recoveredSourceEdgeManagers = null;
   private Map<String, InputSpecUpdate> recoveredRootInputSpecUpdates = null;
@@ -658,7 +674,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       logLocationHints(this.vertexName, vertexLocationHint);
     }
     setTaskLocationHints(vertexLocationHint);
-    
+
     this.dagUgi = appContext.getCurrentDAG().getDagUGI();
 
     this.taskResource = DagTypeConverters
@@ -897,12 +913,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       this.readLock.unlock();
     }
   }
-  
+
   @Override
   public TaskLocationHint getTaskLocationHint(TezTaskID taskId) {
     this.readLock.lock();
     try {
-      if (taskLocationHints == null || 
+      if (taskLocationHints == null ||
           taskLocationHints.length <= taskId.getId()) {
         return null;
       }
@@ -1081,8 +1097,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   private void setTaskLocationHints(VertexLocationHint vertexLocationHint) {
-    if (vertexLocationHint != null && 
-        vertexLocationHint.getTaskLocationHints() != null && 
+    if (vertexLocationHint != null &&
+        vertexLocationHint.getTaskLocationHints() != null &&
         !vertexLocationHint.getTaskLocationHints().isEmpty()) {
       List<TaskLocationHint> locHints = vertexLocationHint.getTaskLocationHints();
       taskLocationHints = locHints.toArray(new TaskLocationHint[locHints.size()]);
@@ -1154,7 +1170,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             }
           }
         }
-        
+
         // Restore any rootInputSpecUpdates which may have been registered during a parallelism
         // update.
         if (rootInputSpecUpdates != null) {
@@ -1166,7 +1182,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         writeLock.unlock();
       }
     }
-    Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value: " 
+    Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value: "
     + parallelism + " for vertex: " + logIdentifier);
     setVertexLocationHint(vertexLocationHint);
     writeLock.lock();
@@ -1175,7 +1191,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         LOG.info("Parallelism can only be set dynamically once per vertex: " + logIdentifier);
         return false;
       }
-      
+
       parallelismSet = true;
 
       // Input initializer/Vertex Manager/1-1 split expected to set parallelism.
@@ -1185,7 +1201,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               "Vertex state is not Initializing. Value: " + getState()
                   + " for vertex: " + logIdentifier);
         }
-        
+
         if(sourceEdgeManagers != null) {
           for(Map.Entry<String, EdgeManagerPluginDescriptor> entry : sourceEdgeManagers.entrySet()) {
             LOG.info("Replacing edge manager for source:"
@@ -1223,7 +1239,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         stateChangeNotifier.stateChanged(vertexId,
             new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
         this.createTasks();
-        LOG.info("Vertex " + getVertexId() + 
+        LOG.info("Vertex " + getVertexId() +
             " parallelism set to " + parallelism);
         if (canInitVertex()) {
           getEventHandler().handle(new VertexEvent(getVertexId(), VertexEventType.V_READY_TO_INIT));
@@ -1244,7 +1260,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           return false;
         }
         if (parallelism == numTasks) {
-          LOG.info("setParallelism same as current value: " + parallelism + 
+          LOG.info("setParallelism same as current value: " + parallelism +
               " for vertex: " + logIdentifier);
           Preconditions.checkArgument(sourceEdgeManagers != null,
               "Source edge managers or RootInputSpecs must be set when not changing parallelism");
@@ -1258,7 +1274,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         for (Edge edge : sourceVertices.values()) {
           edge.startEventBuffering();
         }
-  
+
         // assign to local variable of LinkedHashMap to make sure that changing
         // type of task causes compile error. We depend on LinkedHashMap for order
         LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
@@ -1281,14 +1297,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           LOG.info("Removing task: " + entry.getKey());
           iter.remove();
         }
-        LOG.info("Vertex " + logIdentifier + 
+        LOG.info("Vertex " + logIdentifier +
             " parallelism set to " + parallelism + " from " + numTasks);
         int oldNumTasks = numTasks;
         this.numTasks = parallelism;
         stateChangeNotifier.stateChanged(vertexId,
             new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
         assert tasks.size() == numTasks;
-  
+
         // set new edge managers
         if(sourceEdgeManagers != null) {
           for(Map.Entry<String, EdgeManagerPluginDescriptor> entry : sourceEdgeManagers.entrySet()) {
@@ -1320,17 +1336,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           edge.stopEventBuffering();
         }
       }
-      
+
       for (Map.Entry<Vertex, Edge> entry : targetVertices.entrySet()) {
         Edge edge = entry.getValue();
-        if (edge.getEdgeProperty().getDataMovementType() 
+        if (edge.getEdgeProperty().getDataMovementType()
             == DataMovementType.ONE_TO_ONE) {
           // inform these target vertices that we have changed parallelism
-          VertexEventOneToOneSourceSplit event = 
+          VertexEventOneToOneSourceSplit event =
               new VertexEventOneToOneSourceSplit(entry.getKey().getVertexId(),
                   getVertexId(),
-                  ((originalOneToOneSplitSource!=null) ? 
-                      originalOneToOneSplitSource : getVertexId()), 
+                  ((originalOneToOneSplitSource!=null) ?
+                      originalOneToOneSplitSource : getVertexId()),
                   numTasks);
           getEventHandler().handle(event);
         }
@@ -1339,7 +1355,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     } finally {
       writeLock.unlock();
     }
-    
+
     return true;
   }
 
@@ -1733,7 +1749,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     // no code, for now
   }
 
-  private ContainerContext getContainerContext(int taskIdx) {
+  @VisibleForTesting
+  ContainerContext getContainerContext(int taskIdx) {
     if (taskSpecificLaunchCmdOpts.addTaskSpecificLaunchCmdOption(vertexName, taskIdx)) {
       String jvmOpts = taskSpecificLaunchCmdOpts.getTaskSpecificOption(javaOpts, vertexName, taskIdx);
       ContainerContext context = new ContainerContext(this.localResources,
@@ -1806,9 +1823,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (rootInputDescriptors != null) {
         LOG.info("Root Inputs exist for Vertex: " + getName() + " : "
             + rootInputDescriptors);
-        for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input 
+        for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input
             : rootInputDescriptors.values()) {
-          if (input.getControllerDescriptor() != null && 
+          if (input.getControllerDescriptor() != null &&
               input.getControllerDescriptor().getClassName() != null) {
             if (inputsWithInitializers == null) {
               inputsWithInitializers = Sets.newHashSet();
@@ -1875,7 +1892,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     checkTaskLimits();
     return VertexState.INITED;
   }
-  
+
   private void assignVertexManager() {
     boolean hasBipartite = false;
     boolean hasOneToOne = false;
@@ -1895,12 +1912,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           hasCustom = true;
           break;
         default:
-          throw new TezUncheckedException("Unknown data movement type: " + 
+          throw new TezUncheckedException("Unknown data movement type: " +
               edge.getEdgeProperty().getDataMovementType());
         }
       }
     }
-    
+
     boolean hasUserVertexManager = vertexPlan.hasVertexManagerPlugin();
 
     if (hasUserVertexManager) {
@@ -2008,6 +2025,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           vertex.finished(VertexState.ERROR);
       }
 
+      // recover from recover log, should recover to running
+      // desiredState must be RUNNING based on above code
       VertexState endState;
       switch (vertex.recoveredState) {
         case NEW:
@@ -2086,6 +2105,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
             endState = VertexState.RUNNING;
           } else {
+            // why succeeded here
             endState = VertexState.SUCCEEDED;
             vertex.finished(endState);
           }
@@ -2249,7 +2269,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           (vertex.sourceVertices == null || vertex.sourceVertices.containsKey(otherVertex) ||
           vertex.targetVertices == null || vertex.targetVertices.containsKey(otherVertex)),
           "Not connected to vertex " + otherVertex.getName() + " from vertex: " + vertex.logIdentifier);
-      LOG.info("Edge initialized for connection to vertex " + otherVertex.getName() + 
+      LOG.info("Edge initialized for connection to vertex " + otherVertex.getName() +
           " at vertex : " + vertex.logIdentifier);
       vertex.uninitializedEdges.remove(edge);
       if(vertex.getState() == VertexState.INITIALIZING && vertex.canInitVertex()) {
@@ -2596,7 +2616,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (state.equals(VertexState.FAILED)) {
         return state;
       }
-      // TODO move before to handle NEW state 
+      // TODO move before to handle NEW state
       if (vertex.targetVertices != null) {
         for (Edge e : vertex.targetVertices.values()) {
           if (e.getEdgeManager() == null) {
@@ -2619,7 +2639,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           }
         }
       }
-      
+
       // Create tasks based on initial configuration, but don't start them yet.
       if (vertex.numTasks == -1) {
         LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split"
@@ -2633,7 +2653,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               vertex.appContext.getTaskScheduler().getNumClusterNodes(),
               vertex.getTaskResource(),
               vertex.appContext.getTaskScheduler().getTotalResources());
-          List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+          List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
               inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size());
           for (String inputName : vertex.inputsWithInitializers) {
             inputList.add(vertex.rootInputDescriptors.get(inputName));
@@ -2646,7 +2666,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         } else {
           boolean hasOneToOneUninitedSource = false;
           for (Map.Entry<Vertex, Edge> entry : vertex.sourceVertices.entrySet()) {
-            if (entry.getValue().getEdgeProperty().getDataMovementType() == 
+            if (entry.getValue().getEdgeProperty().getDataMovementType() ==
                 DataMovementType.ONE_TO_ONE) {
               if (entry.getKey().getTotalTasks() == -1) {
                 hasOneToOneUninitedSource = true;
@@ -2662,7 +2682,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             LOG.info("Vertex will initialize via custom vertex manager. " + vertex.logIdentifier);
             return VertexState.INITIALIZING;
           }
-          throw new TezUncheckedException(vertex.getVertexId() + 
+          throw new TezUncheckedException(vertex.getVertexId() +
           " has -1 tasks but does not have input initializers, " +
           "1-1 uninited sources or custom vertex manager to set it at runtime");
         }
@@ -2676,14 +2696,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               vertex.appContext.getTaskScheduler().getNumClusterNodes(),
               vertex.getTaskResource(),
               vertex.appContext.getTaskScheduler().getTotalResources());
-          List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+          List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
           inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size());
           for (String inputName : vertex.inputsWithInitializers) {
             inputList.add(vertex.rootInputDescriptors.get(inputName));
           }
           LOG.info("Starting root input initializers: "
               + vertex.inputsWithInitializers.size());
-          // special case when numTasks>0 and still we want to stay in initializing 
+          // special case when numTasks>0 and still we want to stay in initializing
           // state. This is handled in RootInputInitializedTransition specially.
           vertex.initWaitsForRootInitializers = true;
           vertex.rootInputInitializerManager.runInputInitializers(inputList);
@@ -2711,7 +2731,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       Resource vertexTaskResource, Resource totalResource) {
     return new RootInputInitializerManager(this, appContext, this.dagUgi, this.stateChangeNotifier);
   }
-  
+
   private boolean initializeVertexInInitializingState() {
     boolean isInitialized = initializeVertex();
     if (!isInitialized) {
@@ -2721,7 +2741,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     return true;
   }
-  
+
   void startIfPossible() {
     if (startSignalPending) {
       // Trigger a start event to ensure route events are seen before
@@ -2735,7 +2755,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   public static class VertexInitializedTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
-    
+
     static VertexState doTransition(VertexImpl vertex) {
       Preconditions.checkState(vertex.canInitVertex(), "Vertex: " + vertex.logIdentifier);
       boolean isInitialized = vertex.initializeVertexInInitializingState();
@@ -2744,15 +2764,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       }
 
       vertex.startIfPossible();
-      return VertexState.INITED;      
+      return VertexState.INITED;
     }
-    
+
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
       return doTransition(vertex);
     }
   }
-  
+
   // present in most transitions so that the initializer thread can be shutdown properly
   public static class RootInputInitializedTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@@ -2773,15 +2793,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         // All inputs initialized, shutdown the initializer.
         vertex.rootInputInitializerManager.shutdown();
       }
-      
+
       // done. check if we need to do the initialization
-      if (vertex.getState() == VertexState.INITIALIZING && 
+      if (vertex.getState() == VertexState.INITIALIZING &&
           vertex.initWaitsForRootInitializers) {
         // set the wait flag to false
         vertex.initWaitsForRootInitializers = false;
         // initialize vertex if possible and needed
         if (vertex.canInitVertex()) {
-          Preconditions.checkState(vertex.numTasks >= 0, 
+          Preconditions.checkState(vertex.numTasks >= 0,
               "Parallelism should have been set by now for vertex: " + vertex.logIdentifier);
           return VertexInitializedTransition.doTransition(vertex);
         }
@@ -2795,10 +2815,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      VertexEventOneToOneSourceSplit splitEvent = 
+      VertexEventOneToOneSourceSplit splitEvent =
           (VertexEventOneToOneSourceSplit)event;
       TezVertexID originalSplitSource = splitEvent.getOriginalSplitSource();
-      
+
       if (vertex.originalOneToOneSplitSource != null) {
         VertexState state = vertex.getState();
         Preconditions
@@ -2813,25 +2833,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         if (vertex.originalOneToOneSplitSource.equals(originalSplitSource)) {
           // ignore another split event that may have come from a different
           // path in the DAG. We have already split because of that source
-          LOG.info("Ignoring split of vertex " + vertex.getVertexId() + 
-              " because of split in vertex " + originalSplitSource + 
+          LOG.info("Ignoring split of vertex " + vertex.getVertexId() +
+              " because of split in vertex " + originalSplitSource +
               " sent by vertex " + splitEvent.getSenderVertex() +
               " numTasks " + splitEvent.getNumTasks());
           return state;
         }
         // cannot split from multiple sources
-        throw new TezUncheckedException("Vertex: " + vertex.getVertexId() + 
-            " asked to split by: " + originalSplitSource + 
+        throw new TezUncheckedException("Vertex: " + vertex.getVertexId() +
+            " asked to split by: " + originalSplitSource +
             " but was already split by:" + vertex.originalOneToOneSplitSource);
       }
-      
-      LOG.info("Splitting vertex " + vertex.getVertexId() + 
-          " because of split in vertex " + originalSplitSource + 
+
+      LOG.info("Splitting vertex " + vertex.getVertexId() +
+          " because of split in vertex " + originalSplitSource +
           " sent by vertex " + splitEvent.getSenderVertex() +
           " numTasks " + splitEvent.getNumTasks());
       vertex.originalOneToOneSplitSource = originalSplitSource;
       vertex.setParallelism(splitEvent.getNumTasks(), null, null, null);
-      if (vertex.getState() == VertexState.RUNNING || 
+      if (vertex.getState() == VertexState.RUNNING ||
           vertex.getState() == VertexState.INITED) {
         return vertex.getState();
       } else {
@@ -2861,19 +2881,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       }
       vertex.numStartedSourceVertices++;
       LOG.info("Source vertex started: " + startEvent.getSourceVertexId() +
-          " for vertex: " + vertex.getVertexId() + " numStartedSources: " + 
+          " for vertex: " + vertex.getVertexId() + " numStartedSources: " +
           vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size());
-      
+
       if (vertex.numStartedSourceVertices < vertex.sourceVertices.size()) {
         LOG.info("Cannot start vertex: " + vertex.logIdentifier + " numStartedSources: "
             + vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size());
         return;
       }
-      
-      // vertex meets external start dependency conditions. Save this signal in 
+
+      // vertex meets external start dependency conditions. Save this signal in
       // case we are not ready to start now and need to start later
       vertex.startSignalPending = true;
-      
+
       if (vertex.getState() != VertexState.INITED) {
         // vertex itself is not ready to start. External dependencies have already
         // notified us.
@@ -2883,14 +2903,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             + vertex.uninitializedEdges.size());
         return;
       }
-      
+
       // vertex is inited and all dependencies are ready. Inited vertex means
       // parallelism must be set already and edges defined
       Preconditions.checkState(
           (vertex.numTasks >= 0 && vertex.uninitializedEdges.isEmpty()),
           "Cannot start vertex that is not completely defined. Vertex: "
               + vertex.logIdentifier + " numTasks: " + vertex.numTasks);
-      
+
       vertex.startIfPossible();
     }
   }
@@ -2906,14 +2926,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         + " initWaitsForRootInitializers: " + initWaitsForRootInitializers);
     return false;
   }
-  
-  public static class StartWhileInitializingTransition implements 
+
+  public static class StartWhileInitializingTransition implements
     SingleArcTransition<VertexImpl, VertexEvent> {
 
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
       // vertex state machine does not start itself in the initializing state
-      // this start event can only come directly from the DAG. That means this 
+      // this start event can only come directly from the DAG. That means this
       // is a top level vertex of the dag
       Preconditions.checkState(
           (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty()),
@@ -2926,10 +2946,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   public static class StartTransition implements
     MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
-  
+
   @Override
   public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      Preconditions.checkState(vertex.getState() == VertexState.INITED, 
+      Preconditions.checkState(vertex.getState() == VertexState.INITED,
           "Unexpected state " + vertex.getState() + " for " + vertex.logIdentifier);
       vertex.startTimeRequested = vertex.clock.getTime();
       return vertex.startVertex();
@@ -2937,7 +2957,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   private VertexState startVertex() {
-    Preconditions.checkState(getState() == VertexState.INITED, 
+    Preconditions.checkState(getState() == VertexState.INITED,
         "Vertex must be inited " + logIdentifier);
 
     startedTime = clock.getTime();
@@ -3095,7 +3115,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         case OWN_TASK_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE); break;
         case ROOT_INPUT_INIT_FAILURE:
         case COMMIT_FAILURE:
-        case INVALID_NUM_OF_TASKS: 
+        case INVALID_NUM_OF_TASKS:
         case INIT_FAILURE:
         case INTERNAL_ERROR:
         case OTHER_VERTEX_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE); break;
@@ -3172,7 +3192,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (taskEvent.getState() == TaskState.SUCCEEDED) {
         taskSucceeded(vertex, task);
       } else if (taskEvent.getState() == TaskState.FAILED) {
-        LOG.info("Failing vertex: " + vertex.logIdentifier + 
+        LOG.info("Failing vertex: " + vertex.logIdentifier +
             " because task failed: " + taskEvent.getTaskID());
         vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE);
         forceTransitionToKillWait = true;
@@ -3220,7 +3240,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       vertex.succeededTaskCount--;
     }
   }
-  
+
   private static class VertexNoTasksCompletedTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
@@ -3229,7 +3249,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       return VertexImpl.checkVertexForCompletion(vertex);
     }
   }
-  
+
   private static class TaskCompletedAfterVertexSuccessTransition implements
     MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
     @Override
@@ -3241,12 +3261,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (vEvent.getState() == TaskState.FAILED) {
         finalState = VertexState.FAILED;
         finalStatus = VertexStatus.State.FAILED;
-        diagnosticMsg = "Vertex " + vertex.logIdentifier +" failed as task " + vEvent.getTaskID() + 
+        diagnosticMsg = "Vertex " + vertex.logIdentifier +" failed as task " + vEvent.getTaskID() +
           " failed after vertex succeeded.";
       } else {
         finalState = VertexState.ERROR;
         finalStatus = VertexStatus.State.ERROR;
-        diagnosticMsg = "Vertex " + vertex.logIdentifier + " error as task " + vEvent.getTaskID() + 
+        diagnosticMsg = "Vertex " + vertex.logIdentifier + " error as task " + vEvent.getTaskID() +
             " completed with state " + vEvent.getState() + " after vertex succeeded.";
       }
       LOG.info(diagnosticMsg);
@@ -3599,14 +3619,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @Nullable
   @Override
-  public Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+  public Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
     getAdditionalInputs() {
     return this.rootInputDescriptors;
   }
 
   @Nullable
   @Override
-  public Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> 
+  public Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>
     getAdditionalOutputs() {
     return this.additionalOutputs;
   }
@@ -3696,7 +3716,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
         + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size()));
     if (rootInputDescriptors != null) {
-      for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> 
+      for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
            rootInputDescriptorEntry : rootInputDescriptors.entrySet()) {
         inputSpecList.add(new InputSpec(rootInputDescriptorEntry.getKey(),
             rootInputDescriptorEntry.getValue().getIODescriptor(), rootInputSpecs.get(
@@ -3729,18 +3749,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
     return outputSpecList;
   }
-  
+
   //TODO Eventually remove synchronization.
   @Override
   public synchronized List<GroupInputSpec> getGroupInputSpecList(int taskIndex) {
     return groupInputSpecList;
   }
-  
+
   @Override
   public synchronized void addSharedOutputs(Set<String> outputs) {
     this.sharedOutputs.addAll(outputs);
   }
-  
+
   @Override
   public synchronized Set<String> getSharedOutputs() {
     return this.sharedOutputs;

http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index ec05815..aba4fd9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -111,7 +111,7 @@ public class TestDAGImpl {
   private static final Log LOG = LogFactory.getLog(TestDAGImpl.class);
   private DAGPlan dagPlan;
   private TezDAGID dagId;
-  private Configuration conf;
+  private static Configuration conf;
   private DrainDispatcher dispatcher;
   private Credentials fsTokens;
   private AppContext appContext;
@@ -344,7 +344,7 @@ public class TestDAGImpl {
   }
 
   // Create a plan with 3 vertices: A, B, C. Group(A,B)->C
-  private DAGPlan createGroupDAGPlan() {
+  static DAGPlan createGroupDAGPlan() {
     LOG.info("Setting up group dag plan");
     int dummyTaskCount = 1;
     Resource dummyTaskResource = Resource.newInstance(1, 1);
@@ -381,7 +381,7 @@ public class TestDAGImpl {
     return dag.createDag(conf);
   }
 
-  private DAGPlan createTestDAGPlan() {
+  public static DAGPlan createTestDAGPlan() {
     LOG.info("Setting up dag plan");
     DAGPlan dag = DAGPlan.newBuilder()
         .setName("testverteximpl")

http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
new file mode 100644
index 0000000..da0186e
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class TestDAGRecovery {
+
+  private DAGImpl dag;
+  private EventHandler mockEventHandler;
+
+  private String user = "root";
+  private String dagName = "dag1";
+
+  private AppContext mockAppContext;
+  private ApplicationId appId = ApplicationId.newInstance(
+      System.currentTimeMillis(), 1);
+  private TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+  private long initTime = 100L;
+  private long startTime = initTime + 200L;
+  private long commitStartTime = startTime + 200L;
+  private long finishTime = commitStartTime + 200L;
+
+  @Before
+  public void setUp() {
+
+    mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+    when(mockAppContext.getCurrentDAG().getDagUGI()).thenReturn(null);
+    mockEventHandler = mock(EventHandler.class);
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    dag =
+        new DAGImpl(dagId, new Configuration(), dagPlan, mockEventHandler,
+            mock(TaskAttemptListener.class), new Credentials(),
+            new SystemClock(), user, mock(TaskHeartbeatHandler.class),
+            mockAppContext);
+  }
+
+  private void assertNewState() {
+    assertEquals(0, dag.getVertices().size());
+    assertEquals(0, dag.edges.size());
+    assertNull(dag.dagScheduler);
+    assertFalse(dag.recoveryCommitInProgress);
+    assertEquals(0, dag.recoveredGroupCommits.size());
+  }
+
+  private void restoreFromDAGInitializedEvent() {
+    DAGState recoveredState =
+        dag.restoreFromEvent(new DAGInitializedEvent(dagId, initTime, user,
+            dagName));
+    assertEquals(DAGState.INITED, recoveredState);
+    assertEquals(initTime, dag.initTime);
+    assertEquals(6, dag.getVertices().size());
+    assertEquals(6, dag.edges.size());
+    assertNotNull(dag.dagScheduler);
+  }
+
+  private void restoreFromDAGStartedEvent() {
+    DAGState recoveredState =
+        dag.restoreFromEvent(new DAGStartedEvent(dagId, startTime, user,
+            dagName));
+    assertEquals(startTime, dag.startTime);
+    assertEquals(DAGState.RUNNING, recoveredState);
+  }
+
+  private void restoreFromDAGCommitStartedEvent() {
+    DAGState recoveredState =
+        dag.restoreFromEvent(new DAGCommitStartedEvent(dagId, commitStartTime));
+    assertTrue(dag.recoveryCommitInProgress);
+    assertEquals(DAGState.RUNNING, recoveredState);
+  }
+
+  private void restoreFromVertexGroupCommitStartedEvent() {
+    DAGState recoveredState =
+        dag.restoreFromEvent(new VertexGroupCommitStartedEvent(dagId, "g1",
+            commitStartTime));
+    assertEquals(1, dag.recoveredGroupCommits.size());
+    assertFalse(dag.recoveredGroupCommits.get("g1").booleanValue());
+    assertEquals(DAGState.RUNNING, recoveredState);
+  }
+
+  private void restoreFromVertexGroupCommitFinishedEvent() {
+    DAGState recoveredState =
+        dag.restoreFromEvent(new VertexGroupCommitFinishedEvent(dagId, "g1",
+            commitStartTime + 100L));
+    assertEquals(1, dag.recoveredGroupCommits.size());
+    assertTrue(dag.recoveredGroupCommits.get("g1").booleanValue());
+    assertEquals(DAGState.RUNNING, recoveredState);
+  }
+
+  private void restoreFromDAGFinishedEvent(DAGState finalState) {
+    DAGState recoveredState =
+        dag.restoreFromEvent(new DAGFinishedEvent(dagId, startTime, finishTime,
+            finalState, "", new TezCounters(), user, dagName));
+    assertEquals(finishTime, dag.finishTime);
+    assertFalse(dag.recoveryCommitInProgress);
+    assertEquals(finalState, recoveredState);
+  }
+
+  /**
+   * New -> RecoverTransition
+   */
+  @Test
+  public void testDAGRecovery_FromNew() {
+    assertNewState();
+
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+
+    ArgumentCaptor<DAGEvent> eventCaptor =
+        ArgumentCaptor.forClass(DAGEvent.class);
+    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+    List<DAGEvent> events = eventCaptor.getAllValues();
+    assertEquals(2, events.size());
+    assertEquals(DAGEventType.DAG_INIT, events.get(0).getType());
+    assertEquals(DAGEventType.DAG_START, events.get(1).getType());
+  }
+
+  /**
+   * restoreFromDAGInitializedEvent -> RecoverTransition
+   */
+  @Test
+  public void testDAGRecovery_FromInited() {
+    assertNewState();
+    restoreFromDAGInitializedEvent();
+
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+
+    assertEquals(DAGState.RUNNING, dag.getState());
+    // send recover event to 2 root vertex
+    ArgumentCaptor<VertexEvent> eventCaptor =
+        ArgumentCaptor.forClass(VertexEvent.class);
+    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+    List<VertexEvent> vertexEvents = eventCaptor.getAllValues();
+    assertEquals(2, vertexEvents.size());
+    for (VertexEvent vEvent : vertexEvents) {
+      assertTrue(vEvent instanceof VertexEventRecoverVertex);
+      VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent;
+      assertEquals(VertexState.RUNNING, recoverEvent.getDesiredState());
+    }
+  }
+
+  /**
+   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+   * RecoverTransition
+   */
+  @Test
+  public void testDAGRecovery_FromStarted() {
+    assertNewState();
+    restoreFromDAGInitializedEvent();
+    restoreFromDAGStartedEvent();
+
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+
+    assertEquals(DAGState.RUNNING, dag.getState());
+    // send recover event to 2 root vertex
+    ArgumentCaptor<VertexEvent> eventCaptor =
+        ArgumentCaptor.forClass(VertexEvent.class);
+    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
+    List<VertexEvent> vertexEvents = eventCaptor.getAllValues();
+    assertEquals(2, vertexEvents.size());
+    for (VertexEvent vEvent : vertexEvents) {
+      assertTrue(vEvent instanceof VertexEventRecoverVertex);
+      VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent;
+      assertEquals(VertexState.RUNNING, recoverEvent.getDesiredState());
+    }
+  }
+
+  /**
+   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+   * restoreFromDAGFinishedEvent (SUCCEEDED) -> RecoverTransition
+   */
+  @Test
+  public void testDAGRecovery_Finished_SUCCEEDED() {
+    assertNewState();
+    restoreFromDAGInitializedEvent();
+    restoreFromDAGStartedEvent();
+    restoreFromDAGFinishedEvent(DAGState.SUCCEEDED);
+
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    assertEquals(DAGState.SUCCEEDED, dag.getState());
+
+    // recover all the vertices to SUCCEED
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    int i = 0;
+    for (; i < 6; ++i) {
+      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+      VertexEventRecoverVertex recoverEvent =
+          (VertexEventRecoverVertex) events.get(i);
+      assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
+    }
+
+    // send DAGAppMasterEventDAGFinished at last
+    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+    DAGAppMasterEventDAGFinished dagFinishedEvent =
+        (DAGAppMasterEventDAGFinished) events.get(i);
+    assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState());
+  }
+
+  /**
+   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+   * restoreFromDAGFinishedEvent(FAILED) -> RecoverTransition
+   */
+  @Test
+  public void testDAGRecovery_Finished_FAILED() {
+    assertNewState();
+    restoreFromDAGInitializedEvent();
+    restoreFromDAGStartedEvent();
+    restoreFromDAGFinishedEvent(DAGState.FAILED);
+
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    assertEquals(DAGState.FAILED, dag.getState());
+
+    // recover all the vertices to FAILED
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    int i = 0;
+    for (; i < 6; ++i) {
+      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+      VertexEventRecoverVertex recoverEvent =
+          (VertexEventRecoverVertex) events.get(i);
+      assertEquals(VertexState.FAILED, recoverEvent.getDesiredState());
+    }
+
+    // send DAGAppMasterEventDAGFinished at last
+    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+    DAGAppMasterEventDAGFinished dagFinishedEvent =
+        (DAGAppMasterEventDAGFinished) events.get(i);
+    assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState());
+  }
+
+  /**
+   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> ->
+   * restoreFromDAGFinishedEvent -> RecoverTransition
+   */
+  @Test
+  public void testDAGRecovery_Finished_KILLED() {
+    assertNewState();
+    restoreFromDAGInitializedEvent();
+    restoreFromDAGStartedEvent();
+    restoreFromDAGFinishedEvent(DAGState.KILLED);
+
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    assertEquals(DAGState.KILLED, dag.getState());
+
+    // recover all the vertices to KILLED
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    int i = 0;
+    for (; i < 6; ++i) {
+      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+      VertexEventRecoverVertex recoverEvent =
+          (VertexEventRecoverVertex) events.get(i);
+      assertEquals(VertexState.KILLED, recoverEvent.getDesiredState());
+    }
+
+    // send DAGAppMasterEventDAGFinished at last
+    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+    DAGAppMasterEventDAGFinished dagFinishedEvent =
+        (DAGAppMasterEventDAGFinished) events.get(i);
+    assertEquals(DAGState.KILLED, dagFinishedEvent.getDAGState());
+  }
+
+  /**
+   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> ->
+   * restoreFromDAGFinishedEvent -> RecoverTransition
+   */
+  @Test
+  public void testDAGRecovery_Finished_ERROR() {
+    assertNewState();
+    restoreFromDAGInitializedEvent();
+    restoreFromDAGStartedEvent();
+    restoreFromDAGFinishedEvent(DAGState.ERROR);
+
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    assertEquals(DAGState.ERROR, dag.getState());
+
+    // recover all the vertices to KILLED
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    int i = 0;
+    for (; i < 6; ++i) {
+      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+      VertexEventRecoverVertex recoverEvent =
+          (VertexEventRecoverVertex) events.get(i);
+      assertEquals(VertexState.FAILED, recoverEvent.getDesiredState());
+    }
+
+    // send DAGAppMasterEventDAGFinished at last
+    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+    DAGAppMasterEventDAGFinished dagFinishedEvent =
+        (DAGAppMasterEventDAGFinished) events.get(i);
+    assertEquals(DAGState.ERROR, dagFinishedEvent.getDAGState());
+  }
+
+  /**
+   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+   * restoreFromDAG_COMMIT_STARTED -> RecoverTransition
+   */
+  @Test
+  public void testDAGRecovery_COMMIT_STARTED() {
+    assertNewState();
+    restoreFromDAGInitializedEvent();
+    restoreFromDAGStartedEvent();
+    restoreFromDAGCommitStartedEvent();
+
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    assertEquals(DAGState.FAILED, dag.getState());
+
+    // recover all the vertices to SUCCEEDED
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    int i = 0;
+    for (; i < 6; ++i) {
+      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+      VertexEventRecoverVertex recoverEvent =
+          (VertexEventRecoverVertex) events.get(i);
+      assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
+    }
+
+    // send DAGAppMasterEventDAGFinished at last
+    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+    DAGAppMasterEventDAGFinished dagFinishedEvent =
+        (DAGAppMasterEventDAGFinished) events.get(i);
+    assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState());
+  }
+
+  /**
+   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+   * restoreFromDAG_COMMIT_STARTED -> -> restoreFromDAGFinished (SUCCEEDED)->
+   * RecoverTransition
+   */
+  @Test
+  public void testDAGRecovery_COMMIT_STARTED_Finished_SUCCEEDED() {
+    assertNewState();
+    restoreFromDAGInitializedEvent();
+    restoreFromDAGStartedEvent();
+
+    restoreFromDAGCommitStartedEvent();
+    restoreFromDAGFinishedEvent(DAGState.SUCCEEDED);
+
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    assertEquals(DAGState.SUCCEEDED, dag.getState());
+
+    // recover all the vertices to SUCCEED
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    int i = 0;
+    for (; i < 6; ++i) {
+      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+      VertexEventRecoverVertex recoverEvent =
+          (VertexEventRecoverVertex) events.get(i);
+      assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
+    }
+
+    // send DAGAppMasterEventDAGFinished at last
+    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+    DAGAppMasterEventDAGFinished dagFinishedEvent =
+        (DAGAppMasterEventDAGFinished) events.get(i);
+    assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState());
+
+  }
+
+  /**
+   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+   * restoreFromVERTEX_GROUP_COMMIT_STARTED -> RecoverTransition
+   */
+  @Test
+  public void testDAGRecovery_GROUP_COMMIT_STARTED() {
+    assertNewState();
+    restoreFromDAGInitializedEvent();
+    restoreFromDAGStartedEvent();
+    restoreFromVertexGroupCommitStartedEvent();
+
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    assertEquals(DAGState.FAILED, dag.getState());
+
+    // recover all the vertices to SUCCEEDED
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    int i = 0;
+    for (; i < 6; ++i) {
+      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+      VertexEventRecoverVertex recoverEvent =
+          (VertexEventRecoverVertex) events.get(i);
+      assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
+    }
+
+    // send DAGAppMasterEventDAGFinished at last
+    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+    DAGAppMasterEventDAGFinished dagFinishedEvent =
+        (DAGAppMasterEventDAGFinished) events.get(i);
+    assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState());
+  }
+
+  /**
+   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+   * restoreFromVERTEX_GROUP_COMMIT_STARTED -> VERTEX_GROUP_COMMIT_FINISHED ->
+   * RecoverTransition
+   */
+  @Test
+  public void testDAGRecovery_GROUP_COMMIT_STARTED_FINISHED() {
+    assertNewState();
+    restoreFromDAGInitializedEvent();
+    restoreFromDAGStartedEvent();
+
+    restoreFromVertexGroupCommitStartedEvent();
+    restoreFromVertexGroupCommitFinishedEvent();
+
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    assertEquals(DAGState.RUNNING, dag.getState());
+
+    // send recover event to 2 root vertex
+    verify(mockEventHandler, times(2)).handle(
+        any(VertexEventRecoverVertex.class));
+    assertEquals(DAGState.RUNNING, dag.getState());
+  }
+
+  /**
+   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
+   * restoreFromVERTEX_GROUP_COMMIT_STARTED -> VERTEX_GROUP_COMMIT_FINISHED ->
+   * restoreFromDAG_Finished -> RecoverTransition
+   */
+  @Test
+  public void testDAGRecovery_GROUP_COMMIT_Finished() {
+    assertNewState();
+    restoreFromDAGInitializedEvent();
+    restoreFromDAGStartedEvent();
+
+    restoreFromVertexGroupCommitStartedEvent();
+    restoreFromVertexGroupCommitFinishedEvent();
+    restoreFromDAGFinishedEvent(DAGState.SUCCEEDED);
+
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+
+    // recover all the vertices to SUCCEEDED
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    int i = 0;
+    for (; i < 6; ++i) {
+      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+      VertexEventRecoverVertex recoverEvent =
+          (VertexEventRecoverVertex) events.get(i);
+      assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
+    }
+
+    // send DAGAppMasterEventDAGFinished at last
+    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+    DAGAppMasterEventDAGFinished dagFinishedEvent =
+        (DAGAppMasterEventDAGFinished) events.get(i);
+    assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
new file mode 100644
index 0000000..3b04cf6
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class TestTaskAttemptRecovery {
+
+  private TaskAttemptImpl ta;
+  private EventHandler mockEventHandler;
+  private long startTime = System.currentTimeMillis();
+  private long finishTime = startTime + 5000;
+
+  private TezTaskAttemptID taId = mock(TezTaskAttemptID.class);
+  private String vertexName = "v1";
+
+  @Before
+  public void setUp() {
+    mockEventHandler = mock(EventHandler.class);
+    TezTaskID taskId =
+        TezTaskID.fromString("task_1407371892933_0001_1_00_000000");
+    ta =
+        new TaskAttemptImpl(taskId, 0, mockEventHandler,
+            mock(TaskAttemptListener.class), new Configuration(),
+            new SystemClock(), mock(TaskHeartbeatHandler.class),
+            mock(AppContext.class), false, Resource.newInstance(1, 1),
+            mock(ContainerContext.class), false);
+  }
+
+  private void restoreFromTAStartEvent() {
+    TaskAttemptState recoveredState =
+        ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
+            startTime, mock(ContainerId.class), mock(NodeId.class), "", ""));
+    assertEquals(startTime, ta.getLaunchTime());
+    assertEquals(TaskAttemptState.RUNNING, recoveredState);
+  }
+
+  private void restoreFromTAFinishedEvent(TaskAttemptState state) {
+    String diag = "test_diag";
+    TezCounters counters = mock(TezCounters.class);
+
+    TaskAttemptState recoveredState =
+        ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
+            startTime, finishTime, state, diag, counters));
+    assertEquals(startTime, ta.getLaunchTime());
+    assertEquals(finishTime, ta.getFinishTime());
+    assertEquals(counters, ta.reportedStatus.counters);
+    assertEquals(1.0f, ta.reportedStatus.progress, 1e-6);
+    assertEquals(state, ta.reportedStatus.state);
+    assertEquals(1, ta.getDiagnostics().size());
+    assertEquals(diag, ta.getDiagnostics().get(0));
+    assertEquals(state, recoveredState);
+  }
+
+  /**
+   * No any event to restore -> RecoverTransition
+   */
+  @Test
+  public void testTARecovery_NEW() {
+    ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
+    assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
+    verify(mockEventHandler, times(1)).handle(any(TaskEventTAUpdate.class));
+  }
+
+  /**
+   * restoreFromTAStartEvent -> RecoverTransition
+   */
+  @Test
+  public void testTARecovery_START() {
+    restoreFromTAStartEvent();
+
+    ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
+    assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
+    verify(mockEventHandler, times(1)).handle(any(TaskEventTAUpdate.class));
+  }
+
+  /**
+   * restoreFromTAStartEvent -> restoreFromTAFinished (SUCCEED)
+   * -> RecoverTransition
+   */
+  @Test
+  public void testTARecovery_SUCCEED() {
+    restoreFromTAStartEvent();
+    restoreFromTAFinishedEvent(TaskAttemptState.SUCCEEDED);
+
+    ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
+    assertEquals(TaskAttemptStateInternal.SUCCEEDED, ta.getInternalState());
+    verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class));
+  }
+
+  /**
+   * restoreFromTAStartEvent -> restoreFromTAFinished (KILLED)
+   * -> RecoverTransition
+   */
+  @Test
+  public void testTARecovery_KIILED() {
+    restoreFromTAStartEvent();
+    restoreFromTAFinishedEvent(TaskAttemptState.KILLED);
+
+    ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
+    assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
+    verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class));
+  }
+
+  /**
+   * restoreFromTAStartEvent -> restoreFromTAFinished (FAILED)
+   * -> RecoverTransition
+   */
+  @Test
+  public void testTARecovery_FAILED() {
+    restoreFromTAStartEvent();
+    restoreFromTAFinishedEvent(TaskAttemptState.FAILED);
+
+    ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
+    assertEquals(TaskAttemptStateInternal.FAILED, ta.getInternalState());
+    verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class));
+  }
+
+  /**
+   * restoreFromTAFinishedEvent ( no TAStartEvent before TAFinishedEvent )
+   */
+  @Test
+  public void testRecover_FINISH_BUT_NO_START() {
+    try {
+      restoreFromTAFinishedEvent(TaskAttemptState.SUCCEEDED);
+      fail("Should fail due to no TaskAttemptStartEvent before TaskAttemptFinishedEvent");
+    } catch (Throwable e) {
+      assertEquals("Finished Event seen but"
+          + " no Started Event was encountered earlier", e.getMessage());
+    }
+  }
+}