You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/11/25 15:02:24 UTC

[4/9] tez git commit: TEZ-2581. Umbrella for Tez Recovery Redesign (zjffdu)

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 409c506..803edf7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -15,602 +15,1149 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.tez.dag.app.dag.impl;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-import java.net.URL;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.common.TezAbstractEvent;
-import org.apache.tez.common.counters.TezCounters;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.tez.common.MockDNSToSwitchMapping;
+import org.apache.tez.common.security.ACLManager;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.client.VertexStatus.State;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ClusterInfo;
+import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData;
+import org.apache.tez.dag.app.RecoveryParser.TaskAttemptRecoveryData;
+import org.apache.tez.dag.app.RecoveryParser.TaskRecoveryData;
+import org.apache.tez.dag.app.RecoveryParser.VertexRecoveryData;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAGState;
-import org.apache.tez.dag.app.dag.DAGTerminationCause;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.TaskStateInternal;
+import org.apache.tez.dag.app.dag.TestStateChangeNotifier.StateChangeNotifierForTest;
+import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
+import org.apache.tez.dag.app.dag.event.CallableEventType;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
-import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
-import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
-import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
+import org.apache.tez.dag.app.dag.impl.TestVertexImpl.EventHandlingRootInputInitializer;
+import org.apache.tez.dag.app.rm.AMSchedulerEvent;
+import org.apache.tez.dag.app.rm.AMSchedulerEventType;
+import org.apache.tez.dag.app.rm.TaskSchedulerManager;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
-import org.apache.tez.dag.history.events.DAGKillRequestEvent;
+import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
-import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
-import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.OutputCommitter;
+import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.library.common.InputIdentifier;
+import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 
-@SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestDAGRecovery {
 
+  private static final Logger LOG = LoggerFactory.getLogger(TestDAGImpl.class);
+  private static Configuration conf;
+  private DrainDispatcher dispatcher;
+  private ListeningExecutorService execService;
+  private Credentials fsTokens;
+  private AppContext appContext;
+  private ACLManager aclManager;
+  private ApplicationAttemptId appAttemptId;
+  private TaskEventDispatcher taskEventDispatcher;
+  private VertexEventDispatcher vertexEventDispatcher;
+  private DagEventDispatcher dagEventDispatcher;
+  private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
+  private TaskHeartbeatHandler thh;
+  private Clock clock = new SystemClock();
+  private DAGFinishEventHandler dagFinishEventHandler;
+  private DAGPlan dagPlan;
   private DAGImpl dag;
-  private EventHandler mockEventHandler;
+  private TezDAGID dagId;
+  private UserGroupInformation ugi;
+  private MockHistoryEventHandler historyEventHandler;
+  private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
+  private ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,
+      10));
+  private DAGRecoveryData dagRecoveryData = mock(DAGRecoveryData.class);
+
+  private TezVertexID v1Id;   // first vertex
+  private TezTaskID t1v1Id;     // first task of v1
+  private TezTaskAttemptID ta1t1v1Id;   // first task attempt of the first task of v1
+  private TezVertexID v2Id;
+  private TezTaskID t1v2Id;
+  private TezTaskAttemptID ta1t1v2Id;
+
+  ////////////////////////
+  private Random rand = new Random();
+  private long dagInitedTime = System.currentTimeMillis() + rand.nextInt(100);
+  private long dagStartedTime = dagInitedTime + rand.nextInt(100);
+  private long v1InitedTime = dagStartedTime + rand.nextInt(100);
+  private long v1StartedTime = v1InitedTime + rand.nextInt(100);
+  private int v1NumTask = 10;
+  private long t1StartedTime = v1StartedTime + rand.nextInt(100);
+  private long t1FinishedTime = t1StartedTime + rand.nextInt(100);
+  private long ta1LaunchTime = t1StartedTime + rand.nextInt(100);
+  private long ta1FinishedTime = ta1LaunchTime + rand.nextInt(100);
+  
+  private class DagEventDispatcher implements EventHandler<DAGEvent> {
+    @Override
+    public void handle(DAGEvent event) {
+      dag.handle(event);
+    }
+  }
 
-  private String user = "root";
-  private String dagName = "dag1";
+  private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handle(TaskEvent event) {
+      TaskImpl task = (TaskImpl) dag.getVertex(event.getTaskID().getVertexID())
+          .getTask(event.getTaskID());
+      task.handle(event);
+    }
+  }
 
-  private AppContext mockAppContext;
-  private ApplicationId appId = ApplicationId.newInstance(
-      System.currentTimeMillis(), 1);
-  private TezDAGID dagId = TezDAGID.getInstance(appId, 1);
-  private long initTime = 100L;
-  private long startTime = initTime + 200L;
-  private long commitStartTime = startTime + 200L;
-  private long finishTime = commitStartTime + 200L;
-  private TezCounters tezCounters = new TezCounters();
+  @SuppressWarnings("unchecked")
+  private class TaskAttemptEventDispatcher implements
+      EventHandler<TaskAttemptEvent> {
+    @Override
+    public void handle(TaskAttemptEvent event) {
+      Vertex vertex = dag.getVertex(event.getTaskAttemptID().getTaskID()
+          .getVertexID());
+      Task task = vertex.getTask(event.getTaskAttemptID().getTaskID());
+      TaskAttempt ta = task.getAttempt(event.getTaskAttemptID());
+      ((EventHandler<TaskAttemptEvent>) ta).handle(event);
+    }
+  }
 
-  @Before
-  public void setUp() {
-    mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
-    when(mockAppContext.getCurrentDAG().getDagUGI()).thenReturn(null);
-    ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
-    doReturn(clusterInfo).when(mockAppContext).getClusterInfo();
-    mockEventHandler = mock(EventHandler.class);
-    tezCounters.findCounter("grp_1", "counter_1").increment(1);
-
-    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
-    dag =
-        new DAGImpl(dagId, new Configuration(), dagPlan, mockEventHandler,
-            mock(TaskCommunicatorManagerInterface.class), new Credentials(),
-            new SystemClock(), user, mock(TaskHeartbeatHandler.class),
-            mockAppContext);
-  }
-
-  private void assertNewState() {
-    assertEquals(0, dag.getVertices().size());
-    assertEquals(0, dag.edges.size());
-    assertNull(dag.dagScheduler);
-    assertFalse(dag.recoveryCommitInProgress);
-    assertEquals(0, dag.recoveredGroupCommits.size());
-  }
-
-  private void restoreFromDAGInitializedEvent() {
-    DAGState recoveredState =
-        dag.restoreFromEvent(new DAGInitializedEvent(dagId, initTime, user,
-            dagName, null));
-    assertEquals(DAGState.INITED, recoveredState);
-    assertEquals(initTime, dag.initTime);
-    assertEquals(6, dag.getVertices().size());
-    assertEquals(6, dag.edges.size());
-    assertNotNull(dag.dagScheduler);
-  }
-
-  private void restoreFromDAGStartedEvent() {
-    DAGState recoveredState =
-        dag.restoreFromEvent(new DAGStartedEvent(dagId, startTime, user,
-            dagName));
-    assertEquals(startTime, dag.startTime);
-    assertEquals(DAGState.RUNNING, recoveredState);
-  }
-
-  private void restoreFromDAGCommitStartedEvent() {
-    DAGState recoveredState =
-        dag.restoreFromEvent(new DAGCommitStartedEvent(dagId, commitStartTime));
-    assertTrue(dag.recoveryCommitInProgress);
-    assertEquals(DAGState.RUNNING, recoveredState);
-  }
-
-  private void restoreFromVertexGroupCommitStartedEvent() {
-    DAGState recoveredState =
-        dag.restoreFromEvent(new VertexGroupCommitStartedEvent(dagId, "g1",
-            commitStartTime));
-    assertEquals(1, dag.recoveredGroupCommits.size());
-    assertFalse(dag.recoveredGroupCommits.get("g1").booleanValue());
-    assertEquals(DAGState.RUNNING, recoveredState);
-  }
-
-  private void restoreFromVertexGroupCommitFinishedEvent() {
-    DAGState recoveredState =
-        dag.restoreFromEvent(new VertexGroupCommitFinishedEvent(dagId, "g1",
-            commitStartTime + 100L));
-    assertEquals(1, dag.recoveredGroupCommits.size());
-    assertTrue(dag.recoveredGroupCommits.get("g1").booleanValue());
-    assertEquals(DAGState.RUNNING, recoveredState);
-  }
-
-  private void restoreFromDAGFinishedEvent(DAGState finalState) {
-    DAGState recoveredState =
-        dag.restoreFromEvent(new DAGFinishedEvent(dagId, startTime, finishTime,
-            finalState, "", tezCounters, user, dagName, null, null));
-    assertEquals(finishTime, dag.finishTime);
-    assertFalse(dag.recoveryCommitInProgress);
-    assertEquals(finalState, recoveredState);
-    assertEquals(tezCounters, dag.fullCounters);
-  }
-
-  private void restoreFromDAGKillRequestEvent() {
-    dag.restoreFromEvent(new DAGKillRequestEvent(dag.getID(), 0L, false));
+  private class VertexEventDispatcher implements EventHandler<VertexEvent> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handle(VertexEvent event) {
+      VertexImpl vertex = (VertexImpl) dag.getVertex(event.getVertexId());
+      vertex.handle(event);
+    }
   }
 
-  /**
-   * New -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_FromNew() {
-    assertNewState();
+  private class DAGFinishEventHandler implements
+      EventHandler<DAGAppMasterEventDAGFinished> {
+    public int dagFinishEvents = 0;
 
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    @Override
+    public void handle(DAGAppMasterEventDAGFinished event) {
+      ++dagFinishEvents;
+    }
+  }
 
-    ArgumentCaptor<DAGEvent> eventCaptor =
-        ArgumentCaptor.forClass(DAGEvent.class);
-    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
-    List<DAGEvent> events = eventCaptor.getAllValues();
-    assertEquals(2, events.size());
-    assertEquals(DAGEventType.DAG_INIT, events.get(0).getType());
-    assertEquals(DAGEventType.DAG_START, events.get(1).getType());
+  private class AMSchedulerEventDispatcher implements EventHandler<AMSchedulerEvent> {
+    @Override
+    public void handle(AMSchedulerEvent event) {
+    }
   }
+  
+  private static class MockHistoryEventHandler extends HistoryEventHandler {
 
-  /**
-   * New -> restoreFromDAGKillRequested -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_FromNewToKilled() {
-    restoreFromDAGKillRequestEvent();
-    assertNewState();
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
-    assertEquals(DAGState.KILLED, dag.getState());
-    assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+    private List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
+
+    public MockHistoryEventHandler(AppContext context) {
+      super(context);
+    }
+
+    @Override
+    public void handleCriticalEvent(DAGHistoryEvent event) throws IOException {
+      this.historyEvents.add(event.getHistoryEvent());
+    }
+
+    public List<HistoryEvent> getHistoryEvents() {
+      return historyEvents;
+    }
+
+    public void verifyHistoryEvent(int expectedTimes, HistoryEventType eventType) {
+      int actualCount = 0;
+      for (HistoryEvent event : historyEvents) {
+        if (event.getEventType() == eventType) {
+          actualCount ++;
+        }
+      }
+      assertEquals(expectedTimes, actualCount);
+    }
   }
 
-  /**
-   * restoreFromDAGInitializedEvent -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_FromInited() {
-    assertNewState();
-    restoreFromDAGInitializedEvent();
+  public static class MockInputInitializer extends InputInitializer {
 
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    public MockInputInitializer(InputInitializerContext initializerContext) {
+      super(initializerContext);
+    }
 
-    assertEquals(DAGState.RUNNING, dag.getState());
-    // send recover event to 2 root vertex
-    ArgumentCaptor<VertexEvent> eventCaptor =
-        ArgumentCaptor.forClass(VertexEvent.class);
-    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
-    List<VertexEvent> vertexEvents = eventCaptor.getAllValues();
-    assertEquals(2, vertexEvents.size());
-    for (VertexEvent vEvent : vertexEvents) {
-      assertTrue(vEvent instanceof VertexEventRecoverVertex);
-      VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent;
-      assertEquals(VertexState.RUNNING, recoverEvent.getDesiredState());
+    @Override
+    public List<Event> initialize() throws Exception {
+      // sleep forever, block the initialization
+      while(true) {
+        Thread.sleep(1000);
+      }
+    }
+
+    @Override
+    public void handleInputInitializerEvent(List<InputInitializerEvent> events)
+        throws Exception {
     }
   }
 
-  /**
-   * restoreFromDAGInitializedEvent -> restoreFromDAGKillRequested -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_FromInitedToKilled() {
-    restoreFromDAGInitializedEvent();
-    restoreFromDAGKillRequestEvent();
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
-    assertEquals(DAGState.KILLED, dag.getState());
-    assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+  @BeforeClass
+  public static void beforeClass() {
+    MockDNSToSwitchMapping.initializeMockRackResolver();
   }
 
-  /**
-   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
-   * RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_FromStarted() {
-    assertNewState();
-    restoreFromDAGInitializedEvent();
-    restoreFromDAGStartedEvent();
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Before
+  public void setup() {
+    conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
+    appAttemptId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(100, 1), 1);
+    dagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 1);
+    Assert.assertNotNull(dagId);
+    dagPlan = createDAGPlan();
+    dispatcher = new DrainDispatcher();
+    fsTokens = new Credentials();
+    appContext = mock(AppContext.class);
+    execService = mock(ListeningExecutorService.class);
+    thh = mock(TaskHeartbeatHandler.class);
+    final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
+
+    Mockito.doAnswer(new Answer() {
+      public ListenableFuture<Void> answer(InvocationOnMock invocation) {
+        Object[] args = invocation.getArguments();
+        CallableEvent e = (CallableEvent) args[0];
+        dispatcher.getEventHandler().handle(e);
+        return mockFuture;
+      }
+    }).when(execService).submit((Callable<Void>) any());
+
+    doReturn(execService).when(appContext).getExecService();
+    historyEventHandler = new MockHistoryEventHandler(appContext);
+    aclManager = new ACLManager("amUser");
+    doReturn(conf).when(appContext).getAMConf();
+    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+    doReturn(appAttemptId.getApplicationId()).when(appContext)
+        .getApplicationID();
+    doReturn(dagId).when(appContext).getCurrentDAGID();
+    doReturn(historyEventHandler).when(appContext).getHistoryHandler();
+    doReturn(aclManager).when(appContext).getAMACLManager();
+    doReturn(dagRecoveryData).when(appContext).getDAGRecoveryData();
+    dag = new DAGImpl(dagId, conf, dagPlan, dispatcher.getEventHandler(),
+        taskCommunicatorManagerInterface, fsTokens, clock, "user", thh,
+        appContext);
+    dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
+    doReturn(dag).when(appContext).getCurrentDAG();
+    ugi = mock(UserGroupInformation.class);
+    UserGroupInformation ugi =dag.getDagUGI();
+    doReturn(clusterInfo).when(appContext).getClusterInfo();
+    TaskSchedulerManager mockTaskScheduler = mock(TaskSchedulerManager.class);
+    doReturn(mockTaskScheduler).when(appContext).getTaskScheduler();
+    v1Id = TezVertexID.getInstance(dagId, 0);
+    t1v1Id = TezTaskID.getInstance(v1Id, 0);
+    ta1t1v1Id = TezTaskAttemptID.getInstance(t1v1Id, 0);
+    v2Id = TezVertexID.getInstance(dagId, 1);
+    t1v2Id = TezTaskID.getInstance(v2Id, 0);
+    ta1t1v2Id = TezTaskAttemptID.getInstance(t1v2Id, 0);
+
+    dispatcher.register(CallableEventType.class, new CallableEventDispatcher());
+    taskEventDispatcher = new TaskEventDispatcher();
+    dispatcher.register(TaskEventType.class, taskEventDispatcher);
+    taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
+    dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher);
+    vertexEventDispatcher = new VertexEventDispatcher();
+    dispatcher.register(VertexEventType.class, vertexEventDispatcher);
+    dagEventDispatcher = new DagEventDispatcher();
+    dispatcher.register(DAGEventType.class, dagEventDispatcher);
+    dagFinishEventHandler = new DAGFinishEventHandler();
+    dispatcher.register(DAGAppMasterEventType.class, dagFinishEventHandler);
+    dispatcher.register(AMSchedulerEventType.class, new AMSchedulerEventDispatcher());
+    dispatcher.init(conf);
+    dispatcher.start();
+    doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler();
+    LogManager.getRootLogger().setLevel(Level.DEBUG);
+  }
 
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
 
-    assertEquals(DAGState.RUNNING, dag.getState());
-    // send recover event to 2 root vertex
-    ArgumentCaptor<VertexEvent> eventCaptor =
-        ArgumentCaptor.forClass(VertexEvent.class);
-    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
-    List<VertexEvent> vertexEvents = eventCaptor.getAllValues();
-    assertEquals(2, vertexEvents.size());
-    for (VertexEvent vEvent : vertexEvents) {
-      assertTrue(vEvent instanceof VertexEventRecoverVertex);
-      VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent;
-      assertEquals(VertexState.RUNNING, recoverEvent.getDesiredState());
+  public static class RecoveryNotSupportedOutputCommitter extends OutputCommitter {
+
+    public RecoveryNotSupportedOutputCommitter(
+        OutputCommitterContext committerContext) {
+      super(committerContext);
+    }
+
+    @Override
+    public void initialize() throws Exception {
+    }
+
+    @Override
+    public void setupOutput() throws Exception {
+    }
+
+    @Override
+    public void commitOutput() throws Exception {
+    }
+
+    @Override
+    public void abortOutput(State finalState) throws Exception {
+    }
+    
+    @Override
+    public boolean isTaskRecoverySupported() {
+      return false;
     }
   }
 
   /**
-   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> restoreFromDAGKillRequested
-   * RecoverTransition
+   * v1     v2 
+   *   \    / 
+   *    \  / 
+   *     v3
    */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_FromStartedtoKilled() {
-    assertNewState();
-    restoreFromDAGInitializedEvent();
-    restoreFromDAGStartedEvent();
-    restoreFromDAGKillRequestEvent();
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
-    assertEquals(DAGState.KILLED, dag.getState());
-    assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
-    // send recover event to all the vertices with desired state of KILLED
-    ArgumentCaptor<TezAbstractEvent> eventCaptor =
-        ArgumentCaptor.forClass(TezAbstractEvent.class);
-    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
-    List<TezAbstractEvent> events = eventCaptor.getAllValues();
-    assertEquals(7, events.size());
-    for (int i=0;i<6;++i) {
-      TezAbstractEvent vEvent = events.get(i);
-      assertTrue(vEvent instanceof VertexEventRecoverVertex);
-      VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent;
-      assertEquals(VertexState.KILLED, recoverEvent.getDesiredState());
-    }
-    assertTrue(events.get(6) instanceof DAGAppMasterEventDAGFinished);
+  private DAGPlan createDAGPlan() {
+    DAGPlan dag = DAGPlan
+        .newBuilder()
+        .setName("testverteximpl")
+        .addVertex(
+            VertexPlan
+                .newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .addInputs(RootInputLeafOutputProto.newBuilder().setName("input1")
+                    .setControllerDescriptor(TezEntityDescriptorProto.newBuilder()
+                        .setClassName(MockInputInitializer.class.getName()).build()))
+                .addTaskLocationHint(
+                    PlanTaskLocationHint.newBuilder().addHost("host1")
+                        .addRack("rack1").build())
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder().setNumTasks(-1)
+                        .setVirtualCores(4).setMemoryMb(1024).setJavaOpts("")
+                        .setTaskModule("x1.y1").build())
+                .addOutputs(
+                    DAGProtos.RootInputLeafOutputProto
+                        .newBuilder()
+                        .setIODescriptor(
+                            TezEntityDescriptorProto.newBuilder()
+                                .setClassName("output1").build())
+                        .setName("output1")
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                CountingOutputCommitter.class.getName())))
+                .addOutEdgeId("e1").build())
+        .addVertex(
+            VertexPlan
+                .newBuilder()
+                .setName("vertex2")
+                .setType(PlanVertexType.NORMAL)
+                .addTaskLocationHint(
+                    PlanTaskLocationHint.newBuilder().addHost("host2")
+                        .addRack("rack2").build())
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder().setNumTasks(1)
+                        .setVirtualCores(4).setMemoryMb(1024).setJavaOpts("")
+                        .setTaskModule("x2.y2").build())
+                .addOutputs(
+                    DAGProtos.RootInputLeafOutputProto
+                        .newBuilder()
+                        .setIODescriptor(
+                            TezEntityDescriptorProto.newBuilder()
+                                .setClassName("output2").build())
+                        .setName("output2")
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                RecoveryNotSupportedOutputCommitter.class.getName())))
+                .addOutEdgeId("e2").build())
+        .addVertex(
+            VertexPlan
+                .newBuilder()
+                .setName("vertex3")
+                .setType(PlanVertexType.NORMAL)
+                .setProcessorDescriptor(
+                    TezEntityDescriptorProto.newBuilder().setClassName("x3.y3"))
+                .addTaskLocationHint(
+                    PlanTaskLocationHint.newBuilder().addHost("host3")
+                        .addRack("rack3").build())
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder().setNumTasks(1)
+                        .setVirtualCores(4).setMemoryMb(1024)
+                        .setJavaOpts("foo").setTaskModule("x3.y3").build())
+                .addOutputs(
+                    DAGProtos.RootInputLeafOutputProto
+                        .newBuilder()
+                        .setIODescriptor(
+                            TezEntityDescriptorProto.newBuilder()
+                                .setClassName("output3").build())
+                        .setName("output3")
+                        .setControllerDescriptor(
+                            TezEntityDescriptorProto.newBuilder().setClassName(
+                                CountingOutputCommitter.class.getName())))
+                .addInEdgeId("e1").addInEdgeId("e2").build())
+        .addEdge(
+            EdgePlan
+                .newBuilder()
+                .setEdgeDestination(
+                    TezEntityDescriptorProto.newBuilder().setClassName("i2"))
+                .setInputVertexName("vertex1")
+                .setEdgeSource(
+                    TezEntityDescriptorProto.newBuilder().setClassName("o1"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("e1")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL).build())
+        .addEdge(
+            EdgePlan
+                .newBuilder()
+                .setEdgeDestination(
+                    TezEntityDescriptorProto.newBuilder().setClassName("i3"))
+                .setInputVertexName("vertex2")
+                .setEdgeSource(
+                    TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("e2")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL).build())
+        .build();
+
+    return dag;
   }
 
+ 
+  @After
+  public void teardown() {
+    dispatcher.await();
+    dispatcher.stop();
+    execService.shutdownNow();
+    dagPlan = null;
+    if (dag != null) {
+      dag.entityUpdateTracker.stop();
+    }
+    dag = null;
+  }
+  
+  
+  ////////////////////////////////// DAG Recovery ///////////////////////////////////////////////////
   /**
-   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
-   * restoreFromDAGFinishedEvent (SUCCEEDED) -> RecoverTransition
+   * RecoveryEvents: SummaryEvent_DAGFinishedEvent(SUCCEEDED)
+   * Recover dag to SUCCEEDED and all of its vertices to SUCCEEDED
    */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_Finished_SUCCEEDED() {
-    assertNewState();
-    restoreFromDAGInitializedEvent();
-    restoreFromDAGStartedEvent();
-    restoreFromDAGFinishedEvent(DAGState.SUCCEEDED);
-
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
-    assertEquals(DAGState.SUCCEEDED, dag.getState());
-    assertEquals(tezCounters, dag.getAllCounters());
-    // recover all the vertices to SUCCEED
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    int i = 0;
-    for (; i < 6; ++i) {
-      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
-      VertexEventRecoverVertex recoverEvent =
-          (VertexEventRecoverVertex) events.get(i);
-      assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
-    }
+  @Test(timeout=5000)
+  public void testDAGRecoverFromDesiredSucceeded() {
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, DAGState.SUCCEEDED, dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
 
-    // send DAGAppMasterEventDAGFinished at last
-    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
-    DAGAppMasterEventDAGFinished dagFinishedEvent =
-        (DAGAppMasterEventDAGFinished) events.get(i);
-    assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState());
+    assertEquals(DAGState.SUCCEEDED, dag.getState());
+    assertEquals(3, dag.getVertices().size());
+    assertEquals(VertexState.SUCCEEDED, dag.getVertex("vertex1").getState());
+    assertEquals(VertexState.SUCCEEDED, dag.getVertex("vertex2").getState());
+    assertEquals(VertexState.SUCCEEDED, dag.getVertex("vertex3").getState());
+    // DAG#initTime, startTime is not guaranteed to be recovered in this case
   }
-
+  
   /**
-   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
-   * restoreFromDAGFinishedEvent(FAILED) -> RecoverTransition
+   * RecoveryEvents: SummaryEvent_DAGFinishedEvent(FAILED)
+   * Recover dag to FAILED and all of its vertices to FAILED
    */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_Finished_FAILED() {
-    assertNewState();
-    restoreFromDAGInitializedEvent();
-    restoreFromDAGStartedEvent();
-    restoreFromDAGFinishedEvent(DAGState.FAILED);
-
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
-    assertEquals(DAGState.FAILED, dag.getState());
-    assertEquals(tezCounters, dag.getAllCounters());
-    // recover all the vertices to FAILED
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    int i = 0;
-    for (; i < 6; ++i) {
-      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
-      VertexEventRecoverVertex recoverEvent =
-          (VertexEventRecoverVertex) events.get(i);
-      assertEquals(VertexState.FAILED, recoverEvent.getDesiredState());
-    }
+  @Test(timeout=5000)
+  public void testDAGRecoverFromDesiredFailed() {
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, DAGState.FAILED, dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
 
-    // send DAGAppMasterEventDAGFinished at last
-    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
-    DAGAppMasterEventDAGFinished dagFinishedEvent =
-        (DAGAppMasterEventDAGFinished) events.get(i);
-    assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState());
+    assertEquals(DAGState.FAILED, dag.getState());
+    assertEquals(3, dag.getVertices().size());
+    assertEquals(VertexState.FAILED, dag.getVertex("vertex1").getState());
+    assertEquals(VertexState.FAILED, dag.getVertex("vertex2").getState());
+    assertEquals(VertexState.FAILED, dag.getVertex("vertex3").getState());
+    // DAG#initTime, startTime is not guaranteed to be recovered in this case
   }
-
+  
   /**
-   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> ->
-   * restoreFromDAGFinishedEvent -> RecoverTransition
+   * RecoveryEvents: SummaryEvent_DAGFinishedEvent(KILLED)
+   * Recover dag to KILLED and all of its vertices to KILLED
    */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_Finished_KILLED() {
-    assertNewState();
-    restoreFromDAGInitializedEvent();
-    restoreFromDAGStartedEvent();
-    restoreFromDAGFinishedEvent(DAGState.KILLED);
-
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
-    assertEquals(DAGState.KILLED, dag.getState());
-    assertEquals(tezCounters, dag.getAllCounters());
-    // recover all the vertices to KILLED
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    int i = 0;
-    for (; i < 6; ++i) {
-      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
-      VertexEventRecoverVertex recoverEvent =
-          (VertexEventRecoverVertex) events.get(i);
-      assertEquals(VertexState.KILLED, recoverEvent.getDesiredState());
-    }
+  @Test(timeout=5000)
+  public void testDAGRecoverFromDesiredKilled() {
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, DAGState.KILLED, dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
 
-    // send DAGAppMasterEventDAGFinished at last
-    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
-    DAGAppMasterEventDAGFinished dagFinishedEvent =
-        (DAGAppMasterEventDAGFinished) events.get(i);
-    assertEquals(DAGState.KILLED, dagFinishedEvent.getDAGState());
+    assertEquals(DAGState.KILLED, dag.getState());
+    assertEquals(3, dag.getVertices().size());
+    assertEquals(VertexState.KILLED, dag.getVertex("vertex1").getState());
+    assertEquals(VertexState.KILLED, dag.getVertex("vertex2").getState());
+    assertEquals(VertexState.KILLED, dag.getVertex("vertex3").getState());
+    // DAG#initTime, startTime is not guaranteed to be recovered in this case
   }
-
+  
   /**
-   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent
-   * --> restoreFromDAGKillRequestEvent -->
-   * restoreFromDAGFinishedEvent -> RecoverTransition
+   * RecoveryEvents: SummaryEvent_DAGFinishedEvent(ERROR)
+   * Recover dag to ERROR and all of its vertices to ERROR
    */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_Finished_KILLED_WithKillRequest() {
-    // same behavior as without DAGKillRequestEvent because DAGFinishedEvent is seen
-    assertNewState();
-    restoreFromDAGInitializedEvent();
-    restoreFromDAGStartedEvent();
-    restoreFromDAGKillRequestEvent();
-    restoreFromDAGFinishedEvent(DAGState.KILLED);
-
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
-    assertEquals(DAGState.KILLED, dag.getState());
-    assertEquals(tezCounters, dag.getAllCounters());
-    // recover all the vertices to KILLED
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    int i = 0;
-    for (; i < 6; ++i) {
-      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
-      VertexEventRecoverVertex recoverEvent =
-          (VertexEventRecoverVertex) events.get(i);
-      assertEquals(VertexState.KILLED, recoverEvent.getDesiredState());
-    }
+  @Test(timeout=5000)
+  public void testDAGRecoverFromDesiredError() {
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, DAGState.ERROR, dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
 
-    // send DAGAppMasterEventDAGFinished at last
-    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
-    DAGAppMasterEventDAGFinished dagFinishedEvent =
-        (DAGAppMasterEventDAGFinished) events.get(i);
-    assertEquals(DAGState.KILLED, dagFinishedEvent.getDAGState());
+    assertEquals(DAGState.ERROR, dag.getState());
+    assertEquals(3, dag.getVertices().size());
+    assertEquals(VertexState.ERROR, dag.getVertex("vertex1").getState());
+    assertEquals(VertexState.ERROR, dag.getVertex("vertex2").getState());
+    assertEquals(VertexState.ERROR, dag.getVertex("vertex3").getState());
+    // DAG#initTime, startTime is not guaranteed to be recovered in this case
   }
-
+  
   /**
-   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> ->
-   * restoreFromDAGFinishedEvent -> RecoverTransition
+   * RecoveryEvents: DAGSubmittedEvent
+   * Recover it as normal dag execution
    */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_Finished_ERROR() {
-    assertNewState();
-    restoreFromDAGInitializedEvent();
-    restoreFromDAGStartedEvent();
-    restoreFromDAGFinishedEvent(DAGState.ERROR);
-
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
-    assertEquals(DAGState.ERROR, dag.getState());
-    assertEquals(tezCounters, dag.getAllCounters());
-    // recover all the vertices to KILLED
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    int i = 0;
-    for (; i < 6; ++i) {
-      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
-      VertexEventRecoverVertex recoverEvent =
-          (VertexEventRecoverVertex) events.get(i);
-      assertEquals(VertexState.FAILED, recoverEvent.getDesiredState());
-    }
+  @Test(timeout=5000)
+  public void testDAGRecoverFromNew() {
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
 
-    // send DAGAppMasterEventDAGFinished at last
-    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
-    DAGAppMasterEventDAGFinished dagFinishedEvent =
-        (DAGAppMasterEventDAGFinished) events.get(i);
-    assertEquals(DAGState.ERROR, dagFinishedEvent.getDAGState());
+    assertEquals(DAGState.RUNNING, dag.getState());
   }
-
+  
   /**
-   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
-   * restoreFromDAG_COMMIT_STARTED -> RecoverTransition
+   * RecoveryEvents: DAGSubmittedEvent, DAGInitializedEvent
+   * Recover it as normal dag execution
    */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_COMMIT_STARTED() {
-    assertNewState();
-    restoreFromDAGInitializedEvent();
-    restoreFromDAGStartedEvent();
-    restoreFromDAGCommitStartedEvent();
-
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
-    assertEquals(DAGState.FAILED, dag.getState());
+  @Test(timeout=5000)
+  public void testDAGRecoverFromInited() {
+    DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagId, dagInitedTime, 
+        "user", "dagName", null);
+    doReturn(dagInitedEvent).when(dagRecoveryData).getDAGInitializedEvent();
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
 
-    // recover all the vertices to SUCCEEDED
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    int i = 0;
-    for (; i < 6; ++i) {
-      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
-      VertexEventRecoverVertex recoverEvent =
-          (VertexEventRecoverVertex) events.get(i);
-      assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
-    }
+    assertEquals(DAGState.RUNNING, dag.getState());
+    assertEquals(dagInitedTime, dag.initTime);
+  }
+  
+  @Test(timeout=5000)
+  public void testDAGRecoverFromStarted() {
+    DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagId, dagInitedTime, 
+        "user", "dagName", null);
+    doReturn(dagInitedEvent).when(dagRecoveryData).getDAGInitializedEvent();
+    DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagId, dagStartedTime, "user", "dagName");
+    doReturn(dagStartedEvent).when(dagRecoveryData).getDAGStartedEvent();
+
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
 
-    // send DAGAppMasterEventDAGFinished at last
-    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
-    DAGAppMasterEventDAGFinished dagFinishedEvent =
-        (DAGAppMasterEventDAGFinished) events.get(i);
-    assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState());
+    assertEquals(DAGState.RUNNING, dag.getState());
+    assertEquals(dagInitedTime, dag.initTime);
+    assertEquals(dagStartedTime, dag.startTime);
+  }
+ 
+  /////////////////////////////// Vertex Recovery /////////////////////////////////////////
+  
+  private void initMockDAGRecoveryDataForVertex() {    
+    DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagId, dagInitedTime, 
+        "user", "dagName", null);
+    DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagId, dagStartedTime, "user", "dagName");
+    doReturn(dagInitedEvent).when(dagRecoveryData).getDAGInitializedEvent();
+    doReturn(dagStartedEvent).when(dagRecoveryData).getDAGStartedEvent();
   }
 
   /**
-   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
-   * restoreFromDAG_COMMIT_STARTED -> -> restoreFromDAGFinished (SUCCEEDED)->
-   * RecoverTransition
+   * RecoveryEvents:
+   *  DAG:  DAGInitedEvent -> DAGStartedEvent 
+   *  V1:   No any event
+   * 
+   * Reinitialize V1 again. 
    */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_COMMIT_STARTED_Finished_SUCCEEDED() {
-    assertNewState();
-    restoreFromDAGInitializedEvent();
-    restoreFromDAGStartedEvent();
-
-    restoreFromDAGCommitStartedEvent();
-    restoreFromDAGFinishedEvent(DAGState.SUCCEEDED);
-
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
-    assertEquals(DAGState.SUCCEEDED, dag.getState());
-
-    // recover all the vertices to SUCCEED
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    int i = 0;
-    for (; i < 6; ++i) {
-      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
-      VertexEventRecoverVertex recoverEvent =
-          (VertexEventRecoverVertex) events.get(i);
-      assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
-    }
-
-    // send DAGAppMasterEventDAGFinished at last
-    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
-    DAGAppMasterEventDAGFinished dagFinishedEvent =
-        (DAGAppMasterEventDAGFinished) events.get(i);
-    assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState());
+  @Test(timeout=5000)
+  public void testVertexRecoverFromNew() {
+    initMockDAGRecoveryDataForVertex();
+    
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
 
+    assertEquals(DAGState.RUNNING, dag.getState());
+    // reinitialize v1 again
+    VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+    assertEquals(VertexState.INITIALIZING, v1.getState());
+    assertEquals(VertexState.RUNNING, v2.getState());
+    assertEquals(VertexState.INITED, v3.getState());
   }
-
+  
   /**
-   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
-   * restoreFromVERTEX_GROUP_COMMIT_STARTED -> RecoverTransition
+   * RecoveryEvents:
+   *  DAG:  DAGInitedEvent -> DAGStartedEvent 
+   *  V1:   VertexInitializedEvent
+   * 
+   * Reinitialize V1 again. 
    */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_GROUP_COMMIT_STARTED() {
-    assertNewState();
-    restoreFromDAGInitializedEvent();
-    restoreFromDAGStartedEvent();
-    restoreFromVertexGroupCommitStartedEvent();
-
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
-    assertEquals(DAGState.FAILED, dag.getState());
+  @Test(timeout=5000)
+  public void testVertexRecoverFromInited() {
+    initMockDAGRecoveryDataForVertex();
+    List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+    VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, 
+        "vertex1", 0L, v1InitedTime, 
+        v1NumTask, "", null, inputGeneratedTezEvents);
+    VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
+        null, null, null, null, false);
+    doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+    
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
 
-    // recover all the vertices to SUCCEEDED
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    int i = 0;
-    for (; i < 6; ++i) {
-      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
-      VertexEventRecoverVertex recoverEvent =
-          (VertexEventRecoverVertex) events.get(i);
-      assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
+    assertEquals(DAGState.RUNNING, dag.getState());
+    // reinitialize v1 again because its VertexManager is not completed
+    VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+    assertEquals(VertexState.INITIALIZING, v1.getState());
+    assertEquals(VertexState.RUNNING, v2.getState());
+    assertEquals(VertexState.INITED, v3.getState());
+  }
+  
+  /**
+   * RecoveryEvents:
+   *  DAG:  DAGInitedEvent -> DAGStartedEvent 
+   *  V1:   VertexReconfigrationDoneEvent -> VertexInitializedEvent
+   * 
+   * V1 skip initialization. 
+   */
+  @Test//(timeout=5000)
+  public void testVertexRecoverFromInitedAndReconfigureDone() {
+    initMockDAGRecoveryDataForVertex();
+    List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+    VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, 
+        "vertex1", 0L, v1InitedTime, 
+        v1NumTask, "", null, inputGeneratedTezEvents);
+    VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id, 
+        0L, v1NumTask, null, null, null, true);
+    VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
+        v1ReconfigureDoneEvent, null, null, new HashMap<TezTaskID, TaskRecoveryData>(), false);
+    doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+    
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
     }
-
-    // send DAGAppMasterEventDAGFinished at last
-    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
-    DAGAppMasterEventDAGFinished dagFinishedEvent =
-        (DAGAppMasterEventDAGFinished) events.get(i);
-    assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState());
+    VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+    assertEquals(DAGState.RUNNING, dag.getState());
+    // v1 skip initialization
+    assertEquals(VertexState.RUNNING, v1.getState());
+    assertEquals(v1InitedTime, v1.initedTime);
+    assertEquals(v1NumTask, v1.getTotalTasks());
+    assertEquals(VertexState.RUNNING, v2.getState());
+    assertEquals(VertexState.RUNNING, v3.getState());
   }
-
+  
   /**
-   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
-   * restoreFromVERTEX_GROUP_COMMIT_STARTED -> VERTEX_GROUP_COMMIT_FINISHED ->
-   * RecoverTransition
+   * RecoveryEvents:
+   *  DAG:  DAGInitedEvent -> DAGStartedEvent 
+   *  V1:   VertexReconfigrationDoneEvent -> VertexInitializedEvent -> VertexStartedEvent
+   * 
+   * V1 skip initialization. 
    */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_GROUP_COMMIT_STARTED_FINISHED() {
-    assertNewState();
-    restoreFromDAGInitializedEvent();
-    restoreFromDAGStartedEvent();
-
-    restoreFromVertexGroupCommitStartedEvent();
-    restoreFromVertexGroupCommitFinishedEvent();
-
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+  @Test(timeout=5000)
+  public void testVertexRecoverFromStart() {
+    initMockDAGRecoveryDataForVertex();   
+    List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+    VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, 
+        "vertex1", 0L, v1InitedTime, 
+        v1NumTask, "", null, inputGeneratedTezEvents);
+    VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id, 
+        0L, v1NumTask, null, null, null, true);
+    VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, v1StartedTime);
+    VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
+        v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap<TezTaskID, TaskRecoveryData>(), false);
+    doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+    
+    DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+    dag.handle(recoveryEvent);
+    dispatcher.await();
+
+    VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
     assertEquals(DAGState.RUNNING, dag.getState());
+    // v1 skip initialization
+    assertEquals(VertexState.RUNNING, v1.getState());
+    assertEquals(v1InitedTime, v1.initedTime);
+    assertEquals(v1StartedTime, v1.startedTime);
+    assertEquals(v1NumTask, v1.getTotalTasks());
+    assertEquals(VertexState.RUNNING, v2.getState());
+    assertEquals(VertexState.RUNNING, v3.getState());
+  }
+  
+  /////////////////////////////// Task ////////////////////////////////////////////////////////////
+  
+  private void initMockDAGRecoveryDataForTask() {
+    List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+    VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, 
+        "vertex1", 0L, v1InitedTime, 
+        v1NumTask, "", null, inputGeneratedTezEvents);
+    Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>();
+    VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id, 
+        0L, v1NumTask, null, null, rootInputSpecs, true);
+    VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, v1StartedTime);
+    VertexRecoveryData v1RecoveryData = new VertexRecoveryData(v1InitedEvent,
+        v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap<TezTaskID, TaskRecoveryData>(), false);
+    
+    DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagId, dagInitedTime, 
+        "user", "dagName", null);
+    DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagId, dagStartedTime, "user", "dagName");
+    doReturn(v1RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+    doReturn(dagInitedEvent).when(dagRecoveryData).getDAGInitializedEvent();
+    doReturn(dagStartedEvent).when(dagRecoveryData).getDAGStartedEvent();
+  }
 
-    // send recover event to 2 root vertex
-    verify(mockEventHandler, times(2)).handle(
-        any(VertexEventRecoverVertex.class));
-    assertEquals(DAGState.RUNNING, dag.getState());
+  /**
+   * RecoveryEvent: TaskFinishedEvent(KILLED)
+   * Recover it to KILLED
+   */
+  @Test(timeout=5000)
+  public void testTaskRecoverFromKilled() {
+    initMockDAGRecoveryDataForTask();
+    TaskFinishedEvent taskFinishedEvent = new TaskFinishedEvent(t1v1Id, "v1",
+        0L, 0L, null, TaskState.KILLED, "", null, 4);
+    TaskRecoveryData taskRecoveryData = new TaskRecoveryData(null, taskFinishedEvent, null);
+    doReturn(taskRecoveryData).when(dagRecoveryData).getTaskRecoveryData(t1v1Id);
+    
+    dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+    dispatcher.await();
+
+    VertexImpl vertex1 = (VertexImpl) dag.getVertex(v1Id);
+    TaskImpl task = (TaskImpl)vertex1.getTask(t1v1Id);
+    assertEquals(TaskStateInternal.KILLED, task.getInternalState());
+    assertEquals(1, vertex1.getCompletedTasks());
+  }
+  
+  /**
+   * RecoveryEvent: TaskStartedEvent
+   * Recover it to Scheduled
+   */
+  @Test(timeout=5000)
+  public void testTaskRecoverFromStarted() {
+    initMockDAGRecoveryDataForTask();
+    TaskStartedEvent taskStartedEvent = new TaskStartedEvent(t1v1Id, "v1", 0L, 0L);
+    TaskRecoveryData taskRecoveryData = new TaskRecoveryData(taskStartedEvent, null, null);
+    doReturn(taskRecoveryData).when(dagRecoveryData).getTaskRecoveryData(t1v1Id);
+    
+    dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+    dispatcher.await();
+    
+    VertexImpl vertex1 = (VertexImpl) dag.getVertex(v1Id);
+    TaskImpl task = (TaskImpl)vertex1.getTask(t1v1Id);
+    assertEquals(TaskStateInternal.SCHEDULED, task.getInternalState());
+  }
+  
+  /**
+   * RecoveryEvent: TaskStartedEvent -> TaskFinishedEvent
+   * Recover it to Scheduled
+   */
+  @Test(timeout=5000)
+  public void testTaskRecoverFromSucceeded() {
+    initMockDAGRecoveryDataForTask();
+    TaskStartedEvent taskStartedEvent = new TaskStartedEvent(t1v1Id, "v1", 0L, 0L);
+    TaskFinishedEvent taskFinishedEvent = new TaskFinishedEvent(t1v1Id, "v1",
+        0L, 0L, null, TaskState.SUCCEEDED, "", null, 4);
+    TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(
+        ta1t1v1Id, "v1", 0L, mock(ContainerId.class), 
+        mock(NodeId.class), "", "", "");
+    List<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
+    EventMetaData metadata = new EventMetaData(EventProducerConsumerType.OUTPUT,
+        "vertex1", "vertex3", ta1t1v2Id);
+    taGeneratedEvents.add(new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), metadata));
+    TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+        ta1t1v1Id, "v1", 0L, 0L, 
+        TaskAttemptState.SUCCEEDED, null, "", null, 
+        null, taGeneratedEvents, 0L, null, 0L);
+    TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
+    Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap =
+        new HashMap<TezTaskAttemptID, TaskAttemptRecoveryData>();
+    taRecoveryDataMap.put(ta1t1v1Id, taRecoveryData);
+    TaskRecoveryData taskRecoveryData = new TaskRecoveryData(taskStartedEvent, taskFinishedEvent, taRecoveryDataMap);
+    doReturn(taskRecoveryData).when(dagRecoveryData).getTaskRecoveryData(t1v1Id);
+    doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+
+    dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+    dispatcher.await();
+    
+    VertexImpl vertex1 = (VertexImpl) dag.getVertex(v1Id);
+    TaskImpl task = (TaskImpl)vertex1.getTask(t1v1Id);
+    TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+    assertEquals(VertexState.RUNNING, vertex1.getState());
+    assertEquals(1, vertex1.getCompletedTasks());
+    assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState());
+    assertEquals(TaskAttemptStateInternal.SUCCEEDED, taskAttempt.getInternalState());
+  }
+
+  /////////////////////////////// TaskAttempt Recovery /////////////////////////////////////////////////////
+  
+  private void initMockDAGRecoveryDataForTaskAttempt() {
+    TaskStartedEvent t1StartedEvent = new TaskStartedEvent(t1v1Id, "vertex1", 0L, t1StartedTime);
+    TaskRecoveryData taskRecoveryData = new TaskRecoveryData(t1StartedEvent, null, null);
+    Map<TezTaskID, TaskRecoveryData> taskRecoveryDataMap = new HashMap<TezTaskID, TaskRecoveryData>();
+    taskRecoveryDataMap.put(t1v1Id, taskRecoveryData);
+    
+    List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+    VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, 
+        "vertex1", 0L, v1InitedTime, 
+        v1NumTask, "", null, inputGeneratedTezEvents);
+    Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>();
+    VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id, 
+        0L, v1NumTask, null, null, rootInputSpecs, true);
+    VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, v1StartedTime);
+    VertexRecoveryData v1RecoveryData = new VertexRecoveryData(v1InitedEvent,
+        v1ReconfigureDoneEvent, v1StartedEvent, null, taskRecoveryDataMap, false);
+    
+    DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagId, dagInitedTime, 
+        "user", "dagName", null);
+    DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagId, dagStartedTime, "user", "dagName");
+    doReturn(v1RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+    doReturn(dagInitedEvent).when(dagRecoveryData).getDAGInitializedEvent();
+    doReturn(dagStartedEvent).when(dagRecoveryData).getDAGStartedEvent();
   }
 
   /**
-   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
-   * restoreFromVERTEX_GROUP_COMMIT_STARTED -> VERTEX_GROUP_COMMIT_FINISHED ->
-   * restoreFromDAG_Finished -> RecoverTransition
+   * RecoveryEvents: TaskAttemptFinishedEvent (FAILED)
+   * Recover it to FAILED
    */
-  @Test(timeout = 5000)
-  public void testDAGRecovery_GROUP_COMMIT_Finished() {
-    assertNewState();
-    restoreFromDAGInitializedEvent();
-    restoreFromDAGStartedEvent();
+  @Test(timeout=5000)
+  public void testTARecoverFromNewToFailed() {
+    initMockDAGRecoveryDataForTaskAttempt();
+    TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+        ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, 
+        TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, "", null, 
+        null, null, 0L, null, 0L);
+    TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent);
+    doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+    
+    dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+    dispatcher.await();
+
+    TaskImpl task = (TaskImpl)dag.getVertex(v1Id).getTask(t1v1Id);
+    TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+    assertEquals(TaskAttemptStateInternal.FAILED, taskAttempt.getInternalState());
+    assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, taskAttempt.getTerminationCause());
+    historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_STARTED);
+    historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+    assertEquals(1, task.failedAttempts);
+    // new task attempt is scheduled
+    assertEquals(2, task.getAttempts().size());
+    assertEquals(ta1FinishedTime, taskAttempt.getFinishTime());
+  }
+  
+  /**
+   * RecoveryEvents: TaskAttemptFinishedEvent (KILLED)
+   * Recover it to KILLED
+   */
+  @Test(timeout=5000)
+  public void testTARecoverFromNewToKilled() {
+    initMockDAGRecoveryDataForTaskAttempt();
+    TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+        ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, 
+        TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, 
+        null, null, 0L, null, 0L);
+    TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent);
+    doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+    
+    dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+    dispatcher.await();
+
+    TaskImpl task = (TaskImpl)dag.getVertex(v1Id).getTask(t1v1Id);
+    TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+    assertEquals(TaskAttemptStateInternal.KILLED, taskAttempt.getInternalState());
+    assertEquals(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, taskAttempt.getTerminationCause());
+    historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_STARTED);
+    historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+    assertEquals(0, task.failedAttempts);
+    assertEquals(ta1FinishedTime, taskAttempt.getFinishTime());
+  }
+  
+  /**
+   * RecoveryEvents: TaskAttemptStartedEvent
+   * Recover it to KILLED
+   */
+  @Test(timeout=5000)
+  public void testTARecoverFromRunning() {
+    initMockDAGRecoveryDataForTaskAttempt();
+    TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(
+        ta1t1v1Id, "v1", ta1LaunchTime, mock(ContainerId.class), 
+        mock(NodeId.class), "", "", "");
+    TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, null);
+    doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+    
+    dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+    dispatcher.await();
+
+    TaskImpl task = (TaskImpl)dag.getVertex(v1Id).getTask(t1v1Id);
+    TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+    assertEquals(TaskAttemptStateInternal.KILLED, taskAttempt.getInternalState());
+    assertEquals(TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, taskAttempt.getTerminationCause());
+    historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_STARTED);
+    historyEventHandler.verifyHistoryEvent(1, HistoryEventType.TASK_ATTEMPT_FINISHED);
+    assertEquals(ta1LaunchTime, taskAttempt.getLaunchTime());
+  }
 
-    restoreFromVertexGroupCommitStartedEvent();
-    restoreFromVertexGroupCommitFinishedEvent();
-    restoreFromDAGFinishedEvent(DAGState.SUCCEEDED);
+  /**
+   * RecoveryEvents: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent (SUCCEEDED)
+   * Recover it to SUCCEEDED
+   */
+  @Test(timeout=5000)
+  public void testTARecoverFromSucceeded() {
+    initMockDAGRecoveryDataForTaskAttempt();
+    TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(
+        ta1t1v1Id, "v1", ta1LaunchTime, mock(ContainerId.class), 
+        mock(NodeId.class), "", "", "");
+    List<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
+    taGeneratedEvents.add(new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), null));
+    TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+        ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, 
+        TaskAttemptState.SUCCEEDED, null, "", null, 
+        null, taGeneratedEvents, 0L, null, 0L);
+    TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
+    doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+
+    dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+    dispatcher.await();
+    
+    TaskImpl task = (TaskImpl)dag.getVertex(v1Id).getTask(t1v1Id);
+    TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+    assertEquals(TaskAttemptStateInternal.SUCCEEDED, taskAttempt.getInternalState());
+    historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+    assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState());
+    assertEquals(ta1LaunchTime, taskAttempt.getLaunchTime());
+    assertEquals(ta1FinishedTime, taskAttempt.getFinishTime());
+  }
 
-    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
-    assertEquals(DAGState.SUCCEEDED, dag.getState());
-    assertEquals(tezCounters, dag.getAllCounters());
-    // recover all the vertices to SUCCEEDED
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    int i = 0;
-    for (; i < 6; ++i) {
-      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
-      VertexEventRecoverVertex recoverEvent =
-          (VertexEventRecoverVertex) events.get(i);
-      assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
-    }
+  /**
+   * RecoveryEvents: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent (SUCCEEDED)
+   * Recovered it SUCCEEDED, but task schedule new task attempt
+   * V2's committer is not recovery supported
+   */
+  @Test//(timeout=5000)
+  public void testTARecoverFromSucceeded_OutputCommitterRecoveryNotSupported() {
+    initMockDAGRecoveryDataForTaskAttempt();
+    // set up v2 recovery data
+    // ta1t1v2: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent(SUCCEEDED)
+    // t1v2: TaskStartedEvent
+    // v2: VertexInitializedEvent -> VertexConfigurationDoneEvent -> VertexStartedEvent
+    TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(
+        ta1t1v2Id, "vertex2", ta1LaunchTime, mock(ContainerId.class), 
+        mock(NodeId.class), "", "", "");
+    List<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
+    EventMetaData metadata = new EventMetaData(EventProducerConsumerType.OUTPUT,
+        "vertex2", "vertex3", ta1t1v2Id);
+    taGeneratedEvents.add(new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), metadata));
+    TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+        ta1t1v2Id, "vertex2", ta1LaunchTime, ta1FinishedTime, 
+        TaskAttemptState.SUCCEEDED, null, "", null, 
+        null, taGeneratedEvents, 0L, null, 0L);
+    TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
+    doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v2Id);   
+    Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap =
+        new HashMap<TezTaskAttemptID, TaskAttemptRecoveryData>();
+    taRecoveryDataMap.put(ta1t1v2Id, taRecoveryData);
+ 
+    TaskStartedEvent t1StartedEvent = new TaskStartedEvent(t1v2Id, "vertex2", 0L, t1StartedTime);
+    TaskRecoveryData taskRecoveryData = new TaskRecoveryData(t1StartedEvent, null, taRecoveryDataMap);
+    Map<TezTaskID, TaskRecoveryData> taskRecoveryDataMap = new HashMap<TezTaskID, TaskRecoveryData>();
+    taskRecoveryDataMap.put(t1v2Id, taskRecoveryData);
+    doReturn(taskRecoveryData).when(dagRecoveryData).getTaskRecoveryData(t1v2Id);
+
+    VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id, 
+        "vertex2", 0L, v1InitedTime, 
+        v1NumTask, "", null, null);
+    VertexConfigurationDoneEvent v2ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v2Id, 
+        0L, v1NumTask, null, null, null, false);
+    VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L, v1StartedTime);
+    VertexRecoveryData v2RecoveryData = new VertexRecoveryData(v2InitedEvent,
+        v2ReconfigureDoneEvent, v2StartedEvent, null, taskRecoveryDataMap, false);
+    doReturn(v2RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v2Id);
+
+
+    dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+    dispatcher.await();
+    
+    TaskImpl task = (TaskImpl)dag.getVertex(v2Id).getTask(t1v2Id);
+    TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v2Id);
+    assertEquals(TaskAttemptStateInternal.SUCCEEDED, taskAttempt.getInternalState());
+    historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
+    // new task attempt is scheduled
+    assertEquals(2, task.getAttempts().size());
+    assertEquals(ta1LaunchTime, taskAttempt.getLaunchTime());
+    assertEquals(ta1FinishedTime, taskAttempt.getFinishTime());
+  }
 
-    // send DAGAppMasterEventDAGFinished at last
-    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
-    DAGAppMasterEventDAGFinished dagFinishedEvent =
-        (DAGAppMasterEventDAGFinished) events.get(i);
-    assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState());
+  /**
+   * RecoveryEvents: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent (FAILED)
+   * Recover it to FAILED
+   */
+  @Test(timeout=5000)
+  public void testTARecoverFromFailed() {
+    initMockDAGRecoveryDataForTaskAttempt();
+    TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(
+        ta1t1v1Id, "v1", ta1LaunchTime, mock(ContainerId.class), 
+        mock(NodeId.class), "", "", "");
+    TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+        ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, 
+        TaskAttemptState.FAILED, TaskAttemptTerminationCause.INPUT_READ_ERROR, "", null, 
+        null, null, 0L, null, 0L);
+    TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
+    doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+    
+    dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+    dispatcher.await();
+    
+    TaskImpl task = (TaskImpl)dag.getVertex(v1Id).getTask(t1v1Id);
+    TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+    assertEquals(TaskAttemptStateInternal.FAILED, taskAttempt.getInternalState());
+    assertEquals(TaskAttemptTerminationCause.INPUT_READ_ERROR, taskAttempt.getTerminationCause());
+    assertEquals(TaskStateInternal.SCHEDULED, task.getInternalState());
+    assertEquals(2, task.getAttempts().size());
+    historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+    assertEquals(ta1LaunchTime, taskAttempt.getLaunchTime());
+    assertEquals(ta1FinishedTime, taskAttempt.getFinishTime());
   }
 
+  /**
+   * RecoveryEvents: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent (KILLED)
+   * Recover it to KILLED
+   */
+  @Test(timeout=5000)
+  public void testTARecoverFromKilled() {
+    initMockDAGRecoveryDataForTaskAttempt();
+    TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(
+        ta1t1v1Id, "v1", ta1LaunchTime, mock(ContainerId.class), 
+        mock(NodeId.class), "", "", "");
+    TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+        ta1t1v1Id, "v1", ta1FinishedTime, ta1FinishedTime, 
+        TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, 
+        null, null, 0L, null, 0L);
+    TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
+    doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+    
+    dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+    dispatcher.await();
+    
+    TaskImpl task = (TaskImpl)dag.getVertex(v1Id).getTask(t1v1Id);
+    TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+    assertEquals(TaskAttemptStateInternal.KILLED, taskAttempt.getInternalState());
+    assertEquals(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, taskAttempt.getTerminationCause());
+    historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+    assertEquals(ta1LaunchTime, taskAttempt.getLaunchTime());
+    assertEquals(ta1FinishedTime, taskAttempt.getFinishTime());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 17295cd..6dd578f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
 import org.apache.tez.common.MockDNSToSwitchMapping;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskLocationHint;
@@ -137,6 +139,7 @@ public class TestTaskAttempt {
     mockTask = mock(Task.class);
     HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
     doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
+    LogManager.getRootLogger().setLevel(Level.DEBUG);
   }
 
   @Test(timeout = 5000)
@@ -1556,7 +1559,7 @@ public class TestTaskAttempt {
         Resource resource, ContainerContext containerContext, boolean leafVertex) {
       super(taskId, attemptNumber, eventHandler, tal, conf,
           clock, taskHeartbeatHandler, appContext,
-          isRescheduled, resource, containerContext, leafVertex, mockTask);
+          isRescheduled, resource, containerContext, leafVertex, mockTask, null);
       when(mockTask.getTaskLocationHint()).thenReturn(locationHint);
     }