You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/11/25 15:02:23 UTC

[3/9] tez git commit: TEZ-2581. Umbrella for Tez Recovery Redesign (zjffdu)

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
deleted file mode 100644
index 1aba5fa..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.dag.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
-import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
-import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
-import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.HistoryEventHandler;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
-import org.apache.tez.dag.history.events.TaskFinishedEvent;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import com.google.common.collect.Lists;
-
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class TestTaskAttemptRecovery {
-
-  private TaskAttemptImpl ta;
-  private EventHandler mockEventHandler;
-  private long creationTime = System.currentTimeMillis();
-  private long allocationTime = creationTime + 5000;
-  private long startTime = allocationTime + 5000;
-  private long finishTime = startTime + 5000;
-
-  private TezTaskAttemptID taId;
-  private String vertexName = "v1";
-
-  private AppContext mockAppContext;
-  private MockHistoryEventHandler mockHistoryEventHandler;
-  private Task mockTask;
-  private Vertex mockVertex;
-
-  public static class MockHistoryEventHandler extends HistoryEventHandler {
-
-    private List<DAGHistoryEvent> events;
-
-    public MockHistoryEventHandler(AppContext context) {
-      super(context);
-      events = new ArrayList<DAGHistoryEvent>();
-    }
-
-    @Override
-    public void handle(DAGHistoryEvent event) {
-      events.add(event);
-    }
-
-    @Override
-    public void handleCriticalEvent(DAGHistoryEvent event) throws IOException {
-      events.add(event);
-    }
-
-    void verfiyTaskAttemptFinishedEvent(TezTaskAttemptID taId, TaskAttemptState finalState, int expectedTimes) {
-      int actualTimes = 0;
-      for (DAGHistoryEvent event : events) {
-        if (event.getHistoryEvent().getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) {
-          TaskAttemptFinishedEvent tfEvent = (TaskAttemptFinishedEvent)event.getHistoryEvent();
-          if (tfEvent.getTaskAttemptID().equals(taId) &&
-              tfEvent.getState().equals(finalState)) {
-            actualTimes ++;
-          }
-        }
-      }
-      assertEquals(expectedTimes, actualTimes);
-    }
-
-    void verifyTaskFinishedEvent(TezTaskID taskId, TaskState finalState, int expectedTimes) {
-      int actualTimes = 0;
-      for (DAGHistoryEvent event : events) {
-        if (event.getHistoryEvent().getEventType() == HistoryEventType.TASK_FINISHED) {
-          TaskFinishedEvent tfEvent = (TaskFinishedEvent)event.getHistoryEvent();
-          if (tfEvent.getTaskID().equals(taskId) && tfEvent.getState().equals(finalState)) {
-            actualTimes ++;
-          }
-        }
-      }
-      assertEquals(expectedTimes, actualTimes);
-    }
-  }
-
-  @Before
-  public void setUp() {
-    mockTask = mock(Task.class);
-    mockVertex = mock(Vertex.class);
-    when(mockTask.getVertex()).thenReturn(mockVertex);
-    mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
-    when(mockAppContext.getCurrentDAG().getVertex(any(TezVertexID.class))
-      .getTask(any(TezTaskID.class)))
-      .thenReturn(mockTask);
-    mockHistoryEventHandler = new MockHistoryEventHandler(mockAppContext);
-    when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler);
-    mockEventHandler = mock(EventHandler.class);
-    TezTaskID taskId =
-        TezTaskID.fromString("task_1407371892933_0001_1_00_000000");
-    ta =
-        new TaskAttemptImpl(taskId, 0, mockEventHandler,
-            mock(TaskCommunicatorManagerInterface.class), new Configuration(),
-            new SystemClock(), mock(TaskHeartbeatHandler.class),
-            mockAppContext, false, Resource.newInstance(1, 1),
-            mock(ContainerContext.class), false, mockTask);
-    taId = ta.getID();
-  }
-
-  private void restoreFromTAStartEvent() {
-    TaskAttemptState recoveredState =
-        ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            startTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
-    assertEquals(startTime, ta.getLaunchTime());
-    assertEquals(TaskAttemptState.RUNNING, recoveredState);
-  }
-
-  private void restoreFromTAFinishedEvent(TaskAttemptState state) {
-    String diag = "test_diag";
-    TezCounters counters = mock(TezCounters.class);
-    TezTaskAttemptID causalId = TezTaskAttemptID.getInstance(taId.getTaskID(), taId.getId()+1);
-    
-    TaskAttemptTerminationCause errorEnum = null;
-    if (state != TaskAttemptState.SUCCEEDED) {
-      errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR;
-    }
-
-    long lastDataEventTime = 1024;
-    TezTaskAttemptID lastDataEventTA = mock(TezTaskAttemptID.class);
-    List<DataEventDependencyInfo> events = Lists.newLinkedList();
-    events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA));
-    events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA));
-    TaskAttemptState recoveredState =
-        ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            startTime, finishTime, state, errorEnum, diag, counters, events, creationTime,
-            causalId, allocationTime));
-    assertEquals(causalId, ta.getCreationCausalAttempt());
-    assertEquals(creationTime, ta.getCreationTime());
-    assertEquals(allocationTime, ta.getAllocationTime());
-    assertEquals(startTime, ta.getLaunchTime());
-    assertEquals(finishTime, ta.getFinishTime());
-    assertEquals(counters, ta.reportedStatus.counters);
-    assertEquals(1.0f, ta.reportedStatus.progress, 1e-6);
-    assertEquals(state, ta.reportedStatus.state);
-    assertEquals(1, ta.getDiagnostics().size());
-    assertEquals(diag, ta.getDiagnostics().get(0));
-    assertEquals(state, recoveredState);
-    assertEquals(events.size(), ta.lastDataEvents.size());
-    assertEquals(lastDataEventTime, ta.lastDataEvents.get(0).getTimestamp());
-    assertEquals(lastDataEventTA, ta.lastDataEvents.get(0).getTaskAttemptId());
-    if (state != TaskAttemptState.SUCCEEDED) {
-      assertEquals(errorEnum, ta.getTerminationCause());
-    } else {
-      assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, ta.getTerminationCause());
-    }
-  }
-
-  private void verifyEvents(List<Event> events, Class<? extends Event> eventClass,
-      int expectedTimes) {
-    int actualTimes = 0;
-    for (Event event : events) {
-      if (eventClass.isInstance(event)) {
-        actualTimes ++;
-      }
-    }
-    assertEquals(expectedTimes, actualTimes);
-  }
-
-  /**
-   * No any event to restore -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testTARecovery_NEW() {
-    ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
-    assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
-
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    assertEquals(2, events.size());
-    verifyEvents(events, TaskEventTAUpdate.class, 1);
-    // one for task killed
-    verifyEvents(events, DAGEventCounterUpdate.class, 1);
-  }
-
-  /**
-   * restoreFromTAStartEvent -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testTARecovery_START() {
-    restoreFromTAStartEvent();
-
-    ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
-    assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
-
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(3)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    assertEquals(3, events.size());
-    verifyEvents(events, TaskEventTAUpdate.class, 1);
-    // one for task launch, one for task killed
-    verifyEvents(events, DAGEventCounterUpdate.class, 2);
-
-    mockHistoryEventHandler.verfiyTaskAttemptFinishedEvent(taId, TaskAttemptState.KILLED, 1);
-  }
-
-  /**
-   * restoreFromTAStartEvent -> restoreFromTAFinished (SUCCEED)
-   * -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testTARecovery_SUCCEED() {
-    restoreFromTAStartEvent();
-    restoreFromTAFinishedEvent(TaskAttemptState.SUCCEEDED);
-
-    ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
-    assertEquals(TaskAttemptStateInternal.SUCCEEDED, ta.getInternalState());
-
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    assertEquals(2, events.size());
-    // one for task launch, one for task succeeded
-    verifyEvents(events, DAGEventCounterUpdate.class, 2);
-  }
-
-  /**
-   * restoreFromTAStartEvent -> restoreFromTAFinished (KILLED)
-   * -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testTARecovery_KIILED() {
-    restoreFromTAStartEvent();
-    restoreFromTAFinishedEvent(TaskAttemptState.KILLED);
-
-    ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
-    assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
-
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    assertEquals(2, events.size());
-    // one for task launch, one for task killed
-    verifyEvents(events, DAGEventCounterUpdate.class, 2);
-  }
-
-  /**
-   * restoreFromTAStartEvent -> restoreFromTAFinished (FAILED)
-   * -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testTARecovery_FAILED() {
-    restoreFromTAStartEvent();
-    restoreFromTAFinishedEvent(TaskAttemptState.FAILED);
-
-    ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
-    assertEquals(TaskAttemptStateInternal.FAILED, ta.getInternalState());
-
-    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
-    List<Event> events = eventCaptor.getAllValues();
-    assertEquals(2, events.size());
-    // one for task launch, one for task killed
-    verifyEvents(events, DAGEventCounterUpdate.class, 2);
-  }
-
-  /**
-   * restoreFromTAFinishedEvent ( killed before started)
-   */
-  @Test(timeout = 5000)
-  public void testRecover_FINISH_BUT_NO_START() {
-    TaskAttemptState recoveredState =
-        ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            startTime, finishTime, TaskAttemptState.KILLED,
-            TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), null, 0, null, 0));
-    assertEquals(TaskAttemptState.KILLED, recoveredState);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 24c9664..0414c99 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -163,7 +163,7 @@ public class TestTaskImpl {
   }
 
   private void scheduleTaskAttempt(TezTaskID taskId) {
-    mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint));
+    mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint, false));
     assertTaskScheduledState();
     assertEquals(mockTaskSpec, mockTask.getBaseTaskSpec());
     assertEquals(locationHint, mockTask.getTaskLocationHint());
@@ -762,8 +762,7 @@ public class TestTaskImpl {
         boolean isRescheduled,
         Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
       super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
-          appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class),
-          schedCausalTA);
+          appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class), schedCausalTA);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
deleted file mode 100644
index bea423a..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ /dev/null
@@ -1,873 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.dag.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.client.VertexStatus.State;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
-import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.StateChangeNotifier;
-import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
-import org.apache.tez.dag.app.dag.TaskStateInternal;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.DAGEventType;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
-import org.apache.tez.dag.app.dag.event.TaskEvent;
-import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
-import org.apache.tez.dag.app.dag.event.TaskEventType;
-import org.apache.tez.dag.app.dag.event.VertexEventType;
-import org.apache.tez.dag.app.dag.impl.TestTaskAttemptRecovery.MockHistoryEventHandler;
-import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
-import org.apache.tez.dag.history.events.TaskFinishedEvent;
-import org.apache.tez.dag.history.events.TaskStartedEvent;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.OutputCommitter;
-import org.apache.tez.runtime.api.OutputCommitterContext;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class TestTaskRecovery {
-
-  private TaskImpl task;
-  private DrainDispatcher dispatcher;
-
-  private int taskAttemptCounter = 0;
-
-  private Configuration conf = new Configuration();
-  private AppContext mockAppContext;
-  private MockHistoryEventHandler  mockHistoryEventHandler;
-  private ApplicationId appId = ApplicationId.newInstance(
-      System.currentTimeMillis(), 1);
-  private TezDAGID dagId = TezDAGID.getInstance(appId, 1);
-  private TezVertexID vertexId = TezVertexID.getInstance(dagId, 1);
-  private Vertex vertex;
-  private String vertexName = "v1";
-  private long taskScheduledTime = 100L;
-  private long taskStartTime = taskScheduledTime + 100L;
-  private long taskFinishTime = taskStartTime + 100L;
-  private TaskAttemptEventHandler taEventHandler =
-      new TaskAttemptEventHandler();
-
-  private class TaskEventHandler implements EventHandler<TaskEvent> {
-    @Override
-    public void handle(TaskEvent event) {
-      task.handle(event);
-    }
-  }
-
-  private class TaskAttemptEventHandler implements
-      EventHandler<TaskAttemptEvent> {
-
-    private List<TaskAttemptEvent> events = Lists.newArrayList();
-
-    @Override
-    public void handle(TaskAttemptEvent event) {
-      events.add(event);
-      ((TaskAttemptImpl) task.getAttempt(event.getTaskAttemptID()))
-          .handle(event);
-    }
-
-    public List<TaskAttemptEvent> getEvents() {
-      return events;
-    }
-  }
-
-  private class TestOutputCommitter extends OutputCommitter {
-
-    boolean recoverySupported = false;
-    boolean throwExceptionWhenRecovery = false;
-
-    public TestOutputCommitter(OutputCommitterContext committerContext,
-        boolean recoverySupported, boolean throwExceptionWhenRecovery) {
-      super(committerContext);
-      this.recoverySupported = recoverySupported;
-      this.throwExceptionWhenRecovery = throwExceptionWhenRecovery;
-    }
-
-    @Override
-    public void recoverTask(int taskIndex, int previousDAGAttempt)
-        throws Exception {
-      if (throwExceptionWhenRecovery) {
-        throw new Exception("fail recovery Task");
-      }
-    }
-
-    @Override
-    public boolean isTaskRecoverySupported() {
-      return recoverySupported;
-    }
-
-    @Override
-    public void initialize() throws Exception {
-
-    }
-
-    @Override
-    public void setupOutput() throws Exception {
-
-    }
-
-    @Override
-    public void commitOutput() throws Exception {
-
-    }
-
-    @Override
-    public void abortOutput(State finalState) throws Exception {
-
-    }
-
-  }
-
-  @Before
-  public void setUp() {
-    dispatcher = new DrainDispatcher();
-    dispatcher.register(DAGEventType.class, mock(EventHandler.class));
-    dispatcher.register(VertexEventType.class, mock(EventHandler.class));
-    dispatcher.register(TaskEventType.class, new TaskEventHandler());
-    dispatcher.register(TaskAttemptEventType.class, taEventHandler);
-    dispatcher.init(new Configuration());
-    dispatcher.start();
-
-    vertex = mock(Vertex.class, RETURNS_DEEP_STUBS);
-    when(vertex.getProcessorDescriptor().getClassName()).thenReturn("");
-
-    mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
-    when(mockAppContext.getCurrentDAG().getVertex(any(TezVertexID.class)))
-        .thenReturn(vertex);
-    mockHistoryEventHandler = new MockHistoryEventHandler(mockAppContext);
-    when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler);
-    task =
-        new TaskImpl(vertexId, 0, dispatcher.getEventHandler(),
-            new Configuration(), mock(TaskCommunicatorManagerInterface.class),
-            new SystemClock(), mock(TaskHeartbeatHandler.class),
-            mockAppContext, false, Resource.newInstance(1, 1),
-            mock(ContainerContext.class), mock(StateChangeNotifier.class), vertex);
-
-    Map<String, OutputCommitter> committers =
-        new HashMap<String, OutputCommitter>();
-    committers.put("out1", new TestOutputCommitter(
-        mock(OutputCommitterContext.class), true, false));
-    when(task.getVertex().getOutputCommitters()).thenReturn(committers);
-  }
-
-  private void restoreFromTaskStartEvent() {
-    TaskState recoveredState =
-        task.restoreFromEvent(new TaskStartedEvent(task.getTaskId(),
-            vertexName, taskScheduledTime, taskStartTime));
-    assertEquals(TaskState.SCHEDULED, recoveredState);
-    assertEquals(0, task.getFinishedAttemptsCount());
-    assertEquals(taskScheduledTime, task.scheduledTime);
-    assertEquals(0, task.getAttempts().size());
-  }
-
-  private void restoreFromFirstTaskAttemptStartEvent(TezTaskAttemptID taId) {
-    long taStartTime = taskStartTime + 100L;
-    TaskState recoveredState =
-        task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
-    assertEquals(TaskState.RUNNING, recoveredState);
-    assertEquals(0, task.getFinishedAttemptsCount());
-    assertEquals(taskScheduledTime, task.scheduledTime);
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(TaskAttemptStateInternal.NEW,
-        ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
-    assertEquals(1, task.getUncompletedAttemptsCount());
-  }
-
-  /**
-   * New -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_New() {
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    assertEquals(TaskStateInternal.NEW, task.getInternalState());
-  }
-
-  /**
-   * -> restoreFromTaskFinishEvent ( no TaskStartEvent )
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_NoStartEvent() {
-    try {
-      task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(), vertexName,
-          taskStartTime, taskFinishTime, null, TaskState.SUCCEEDED, "",
-          new TezCounters(), 0));
-      fail("Should fail due to no TaskStartEvent before TaskFinishEvent");
-    } catch (Throwable e) {
-      assertTrue(e.getMessage().contains(
-          "Finished Event seen but"
-              + " no Started Event was encountered earlier"));
-    }
-  }
-
-  /**
-   * -> restoreFromTaskFinishEvent ( no TaskStartEvent )
-   */
-  @Test(timeout = 5000)
-  public void testRecoveryNewToKilled_NoStartEvent() {
-    task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(), vertexName,
-        taskStartTime, taskFinishTime, null, TaskState.KILLED, "",
-        new TezCounters(), 0));
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_Started() {
-    restoreFromTaskStartEvent();
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
-    // new task attempt is scheduled
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(0, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(null, task.successfulAttempt);
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptFinishedEvent (KILLED) ->
-   * RecoverTranstion
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_OnlyTAFinishedEvent_KILLED() {
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-        0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters(), null, 0, null, 0));
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    // wait for the second task attempt is scheduled
-    dispatcher.await();
-    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
-    // taskAttempt_1 is recovered to KILLED, and new task attempt is scheduled
-    assertEquals(2, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(null, task.successfulAttempt);
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptFinishedEvent (FAILED) ->
-   * RecoverTranstion
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_OnlyTAFinishedEvent_FAILED() {
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-        0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters(), null, 0, null, 0));
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    // wait for the second task attempt is scheduled
-    dispatcher.await();
-    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
-    // taskAttempt_1 is recovered to FAILED, and new task attempt is scheduled
-    assertEquals(2, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(1, task.failedAttempts);
-    assertEquals(null, task.successfulAttempt);
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptFinishedEvent (SUCCEEDED) ->
-   * RecoverTranstion
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_OnlyTAFinishedEvent_SUCCEEDED() {
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    try {
-      task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-          0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), null, 0, null, 0));
-      fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)");
-    } catch (TezUncheckedException e) {
-      assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover"));
-    }
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
-   * RecoverTranstion
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_OneTAStarted() {
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    restoreFromFirstTaskAttemptStartEvent(taId);
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    // wait for the second task attempt is scheduled
-    dispatcher.await();
-    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
-    // taskAttempt_1 is recovered to KILLED, and new task attempt is scheduled
-    assertEquals(2, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(null, task.successfulAttempt);
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
-   * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_OneTAStarted_SUCCEEDED() {
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    restoreFromFirstTaskAttemptStartEvent(taId);
-
-    long taStartTime = taskStartTime + 100L;
-    long taFinishTime = taStartTime + 100L;
-    TaskState recoveredState =
-        task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters(), null, 0, null, 0));
-    assertEquals(TaskState.SUCCEEDED, recoveredState);
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(0, task.getUncompletedAttemptsCount());
-    assertEquals(taId, task.successfulAttempt);
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState());
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(0, task.getUncompletedAttemptsCount());
-    assertEquals(taId, task.successfulAttempt);
-    mockHistoryEventHandler.verifyTaskFinishedEvent(task.getTaskId(), TaskState.SUCCEEDED, 1);
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
-   * restoreFromTaskAttemptFinishedEvent (FAILED) -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_OneTAStarted_FAILED() {
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    restoreFromFirstTaskAttemptStartEvent(taId);
-
-    long taStartTime = taskStartTime + 100L;
-    long taFinishTime = taStartTime + 100L;
-    TaskState recoveredState =
-        task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
-            "", new TezCounters(), null, 0, null, 0));
-    assertEquals(TaskState.RUNNING, recoveredState);
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(1, task.failedAttempts);
-    assertEquals(0, task.getUncompletedAttemptsCount());
-    assertEquals(null, task.successfulAttempt);
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
-    // new task attempt is scheduled
-    assertEquals(2, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(1, task.failedAttempts);
-    assertEquals(1, task.getUncompletedAttemptsCount());
-    assertEquals(null, task.successfulAttempt);
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
-   * restoreFromTaskAttemptFinishedEvent (KILLED) -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_OneTAStarted_KILLED() {
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    restoreFromFirstTaskAttemptStartEvent(taId);
-
-    long taStartTime = taskStartTime + 100L;
-    long taFinishTime = taStartTime + 100L;
-    TaskState recoveredState =
-        task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
-            "", new TezCounters(), null, 0, null, 0));
-    assertEquals(TaskState.RUNNING, recoveredState);
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(0, task.getUncompletedAttemptsCount());
-    assertEquals(null, task.successfulAttempt);
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
-    // new task attempt is scheduled
-    assertEquals(2, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(1, task.getUncompletedAttemptsCount());
-    assertEquals(null, task.successfulAttempt);
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
-   * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) ->
-   * restoreFromTaskFinishedEvent -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_OneTAStarted_SUCCEEDED_Finished() {
-
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    restoreFromFirstTaskAttemptStartEvent(taId);
-
-    long taStartTime = taskStartTime + 100L;
-    long taFinishTime = taStartTime + 100L;
-    TaskState recoveredState =
-        task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters(), null, 0, null, 0));
-    assertEquals(TaskState.SUCCEEDED, recoveredState);
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(0, task.getUncompletedAttemptsCount());
-    assertEquals(taId, task.successfulAttempt);
-
-    recoveredState =
-        task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(),
-            vertexName, taskStartTime, taskFinishTime, taId,
-            TaskState.SUCCEEDED, "", new TezCounters(), 0));
-    assertEquals(TaskState.SUCCEEDED, recoveredState);
-    assertEquals(taId, task.successfulAttempt);
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState());
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(0, task.getUncompletedAttemptsCount());
-    assertEquals(taId, task.successfulAttempt);
-    mockHistoryEventHandler.verifyTaskFinishedEvent(task.getTaskId(), TaskState.SUCCEEDED, 1);
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
-   * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) ->
-   * restoreFromTaskAttemptFinishedEvent (Failed due to output_failure)
-   * restoreFromTaskFinishedEvent -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_OneTAStarted_SUCCEEDED_FAILED() {
-
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    restoreFromFirstTaskAttemptStartEvent(taId);
-
-    long taStartTime = taskStartTime + 100L;
-    long taFinishTime = taStartTime + 100L;
-    TaskState recoveredState =
-        task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters(), null, 0, null, 0));
-    assertEquals(TaskState.SUCCEEDED, recoveredState);
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(0, task.getUncompletedAttemptsCount());
-    assertEquals(taId, task.successfulAttempt);
-
-    // it is possible for TaskAttempt transit from SUCCEEDED to FAILURE due to output failure.
-    recoveredState =
-        task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
-            "", new TezCounters(), null, 0, null, 0));
-    assertEquals(TaskState.RUNNING, recoveredState);
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(1, task.failedAttempts);
-    assertEquals(0, task.getUncompletedAttemptsCount());
-    assertEquals(null, task.successfulAttempt);
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
-    assertEquals(2, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(1, task.failedAttempts);
-    assertEquals(1, task.getUncompletedAttemptsCount());
-    assertEquals(null, task.successfulAttempt);
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
-   * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) ->
-   * restoreFromTaskAttemptFinishedEvent (KILLED due to node failed )
-   * restoreFromTaskFinishedEvent -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_OneTAStarted_SUCCEEDED_KILLED() {
-
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    restoreFromFirstTaskAttemptStartEvent(taId);
-
-    long taStartTime = taskStartTime + 100L;
-    long taFinishTime = taStartTime + 100L;
-    TaskState recoveredState =
-        task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters(), null, 0, null, 0));
-    assertEquals(TaskState.SUCCEEDED, recoveredState);
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(0, task.getUncompletedAttemptsCount());
-    assertEquals(taId, task.successfulAttempt);
-
-    // it is possible for TaskAttempt transit from SUCCEEDED to KILLED due to node failure.
-    recoveredState =
-        task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
-            "", new TezCounters(), null, 0, null, 0));
-    assertEquals(TaskState.RUNNING, recoveredState);
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(0, task.getUncompletedAttemptsCount());
-    assertEquals(null, task.successfulAttempt);
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
-    assertEquals(2, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(1, task.getUncompletedAttemptsCount());
-    assertEquals(null, task.successfulAttempt);
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
-   * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_Commit_Failed_Recovery_Not_Supported() {
-    Map<String, OutputCommitter> committers =
-        new HashMap<String, OutputCommitter>();
-    committers.put("out1", new TestOutputCommitter(
-        mock(OutputCommitterContext.class), false, false));
-    when(task.getVertex().getOutputCommitters()).thenReturn(committers);
-
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    restoreFromFirstTaskAttemptStartEvent(taId);
-
-    // restoreFromTaskAttemptFinishedEvent (SUCCEEDED)
-    long taStartTime = taskStartTime + 100L;
-    long taFinishTime = taStartTime + 100L;
-    TaskState recoveredState =
-        task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters(), null, 0, null, 0));
-    assertEquals(TaskState.SUCCEEDED, recoveredState);
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(0, task.getUncompletedAttemptsCount());
-    assertEquals(taId, task.successfulAttempt);
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
-    // new task attempt is scheduled
-    assertEquals(2, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(1, task.getUncompletedAttemptsCount());
-    assertEquals(null, task.successfulAttempt);
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
-   * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_Commit_Failed_recover_fail() {
-    Map<String, OutputCommitter> committers =
-        new HashMap<String, OutputCommitter>();
-    committers.put("out1", new TestOutputCommitter(
-        mock(OutputCommitterContext.class), true, true));
-    when(task.getVertex().getOutputCommitters()).thenReturn(committers);
-
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    restoreFromFirstTaskAttemptStartEvent(taId);
-
-    // restoreFromTaskAttemptFinishedEvent (SUCCEEDED)
-    long taStartTime = taskStartTime + 100L;
-    long taFinishTime = taStartTime + 100L;
-    TaskState recoveredState =
-        task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters(), null, 0, null, 0));
-    assertEquals(TaskState.SUCCEEDED, recoveredState);
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(0, task.getUncompletedAttemptsCount());
-    assertEquals(taId, task.successfulAttempt);
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
-    // new task attempt is scheduled
-    assertEquals(2, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(1, task.getUncompletedAttemptsCount());
-    assertEquals(null, task.successfulAttempt);
-  }
-
-  @Test(timeout = 5000)
-  public void testRecovery_WithDesired_SUCCEEDED() {
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    restoreFromFirstTaskAttemptStartEvent(taId);
-    task.handle(new TaskEventRecoverTask(task.getTaskId(), TaskState.SUCCEEDED,
-        false));
-    assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState());
-    // no TA_Recovery event sent
-    assertEquals(0, taEventHandler.getEvents().size());
-  }
-
-  @Test(timeout = 5000)
-  public void testRecovery_WithDesired_FAILED() {
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    restoreFromFirstTaskAttemptStartEvent(taId);
-    task.handle(new TaskEventRecoverTask(task.getTaskId(), TaskState.FAILED,
-        false));
-    assertEquals(TaskStateInternal.FAILED, task.getInternalState());
-    // no TA_Recovery event sent
-    assertEquals(0, taEventHandler.getEvents().size());
-  }
-
-  @Test(timeout = 5000)
-  public void testRecovery_WithDesired_KILLED() {
-    restoreFromTaskStartEvent();
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    restoreFromFirstTaskAttemptStartEvent(taId);
-    task.handle(new TaskEventRecoverTask(task.getTaskId(), TaskState.KILLED,
-        false));
-    assertEquals(TaskStateInternal.KILLED, task.getInternalState());
-    // no TA_Recovery event sent
-    assertEquals(0, taEventHandler.getEvents().size());
-
-  }
-
-  /**
-   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
-   * restoreFromTaskAttemptFinishedEvent (KILLED) -> RecoverTransition
-   */
-  @Test(timeout = 5000)
-  public void testRecovery_OneTAStarted_Killed() {
-    restoreFromTaskStartEvent();
-
-    long taStartTime = taskStartTime + 100L;
-    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-    TaskState recoveredState =
-        task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
-    assertEquals(TaskState.RUNNING, recoveredState);
-    assertEquals(TaskAttemptStateInternal.NEW,
-        ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(0, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(1, task.getUncompletedAttemptsCount());
-    assertEquals(null, task.successfulAttempt);
-
-    long taFinishTime = taStartTime + 100L;
-    recoveredState =
-        task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
-            "", new TezCounters(), null, 0, null, 0));
-    assertEquals(TaskState.RUNNING, recoveredState);
-    assertEquals(TaskAttemptStateInternal.NEW,
-        ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
-    assertEquals(1, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(0, task.getUncompletedAttemptsCount());
-    assertEquals(null, task.successfulAttempt);
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    // wait for Task send TA_RECOVER to TA and TA complete the RecoverTransition
-    dispatcher.await();
-    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
-    assertEquals(TaskAttemptStateInternal.KILLED,
-        ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
-    // new task attempt is scheduled
-    assertEquals(2, task.getAttempts().size());
-    assertEquals(1, task.getFinishedAttemptsCount());
-    assertEquals(0, task.failedAttempts);
-    assertEquals(1, task.getUncompletedAttemptsCount());
-    assertEquals(null, task.successfulAttempt);
-  }
-
-  /**
-   * n = maxFailedAttempts, in the previous AM attempt, n task attempts are
-   * killed. When recovering, it should continue to be in running state and
-   * schedule a new task attempt.
-   */
-  @Test(timeout = 5000)
-  public void testTaskRecovery_MultipleAttempts1() {
-    int maxFailedAttempts =
-        conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
-            TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
-    restoreFromTaskStartEvent();
-
-    for (int i = 0; i < maxFailedAttempts; ++i) {
-      TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-      task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", "", ""));
-      task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.KILLED, null, "", null, null, 0, null, 0));
-    }
-    assertEquals(maxFailedAttempts, task.getAttempts().size());
-    assertEquals(0, task.failedAttempts);
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    // if the previous task attempt is killed, it should not been take into
-    // account when checking whether exceed the max attempts
-    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
-    // schedule a new task attempt
-    assertEquals(maxFailedAttempts + 1, task.getAttempts().size());
-  }
-
-  /**
-   * n = maxFailedAttempts, in the previous AM attempt, n task attempts are
-   * failed. When recovering, it should transit to failed because # of
-   * failed_attempt is exceeded.
-   */
-  @Test(timeout = 5000)
-  public void testTaskRecovery_MultipleAttempts2() {
-    int maxFailedAttempts =
-        conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
-            TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
-    restoreFromTaskStartEvent();
-
-    for (int i = 0; i < maxFailedAttempts; ++i) {
-      TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-      task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", "", ""));
-      task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.FAILED, null, "", null, null, 0, null, 0));
-    }
-    assertEquals(maxFailedAttempts, task.getAttempts().size());
-    assertEquals(maxFailedAttempts, task.failedAttempts);
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    // it should transit to failed because of the failed task attempt in the
-    // last application attempt.
-    assertEquals(TaskStateInternal.FAILED, task.getInternalState());
-    assertEquals(maxFailedAttempts, task.getAttempts().size());
-  }
-
-  /**
-   * n = maxFailedAttempts, in the previous AM attempt, n-1 task attempts are
-   * killed. And last task attempt is still in running state. When recovering,
-   * the last attempt should transit to killed and task is still in running
-   * state and new task attempt is scheduled.
-   */
-  @Test(timeout = 5000)
-  public void testTaskRecovery_MultipleAttempts3() throws InterruptedException {
-    int maxFailedAttempts =
-        conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
-            TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
-    restoreFromTaskStartEvent();
-
-    for (int i = 0; i < maxFailedAttempts - 1; ++i) {
-      TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
-      task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", "", ""));
-      task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.FAILED, null, "", null, null, 0, null, 0));
-    }
-    assertEquals(maxFailedAttempts - 1, task.getAttempts().size());
-    assertEquals(maxFailedAttempts - 1, task.failedAttempts);
-
-    TezTaskAttemptID newTaskAttemptId = getNewTaskAttemptID(task.getTaskId());
-    TaskState recoveredState =
-        task.restoreFromEvent(new TaskAttemptStartedEvent(newTaskAttemptId,
-            vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
-
-    assertEquals(TaskState.RUNNING, recoveredState);
-    assertEquals(TaskAttemptStateInternal.NEW,
-        ((TaskAttemptImpl) task.getAttempt(newTaskAttemptId))
-            .getInternalState());
-    assertEquals(maxFailedAttempts, task.getAttempts().size());
-
-    task.handle(new TaskEventRecoverTask(task.getTaskId()));
-    // wait until task attempt receive the Recover event from task
-    dispatcher.await();
-
-    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
-    assertEquals(TaskAttemptStateInternal.KILLED,
-        ((TaskAttemptImpl) (task.getAttempt(newTaskAttemptId)))
-            .getInternalState());
-    assertEquals(maxFailedAttempts - 1, task.failedAttempts);
-
-    // new task attempt is added
-    assertEquals(maxFailedAttempts + 1, task.getAttempts().size());
-  }
-
-  private TezTaskAttemptID getNewTaskAttemptID(TezTaskID taskId) {
-    return TezTaskAttemptID.getInstance(taskId, taskAttemptCounter++);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 035de32..11c2bf1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -137,8 +137,10 @@ import org.apache.tez.dag.app.dag.event.CallableEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
@@ -3459,11 +3461,8 @@ public class TestVertexImpl {
 
     ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
     Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
-
-    dispatcher.getEventHandler().handle(
-        new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
-            new TaskAttemptFailedEvent("Failed"), new EventMetaData(
-                EventProducerConsumerType.PROCESSOR, v.getName(), null, ta.getID())))));
+    ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED,
+        "diag", TaskAttemptTerminationCause.APPLICATION_ERROR));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
     Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, ta.getTerminationCause());
@@ -3496,10 +3495,8 @@ public class TestVertexImpl {
     ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
     Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
 
-    dispatcher.getEventHandler().handle(
-        new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
-            new TaskAttemptFailedEvent("Failed"), new EventMetaData(
-                EventProducerConsumerType.INPUT, v.getName(), null, ta.getID())))));
+    ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED,
+        "diag", TaskAttemptTerminationCause.INPUT_READ_ERROR));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
     Assert.assertEquals(TaskAttemptTerminationCause.INPUT_READ_ERROR, ta.getTerminationCause());
@@ -3533,10 +3530,8 @@ public class TestVertexImpl {
     ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
     Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
 
-    dispatcher.getEventHandler().handle(
-        new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
-            new TaskAttemptFailedEvent("Failed"), new EventMetaData(
-                EventProducerConsumerType.OUTPUT, v.getName(), null, ta.getID())))));
+    ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED,
+        "diag", TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
     Assert.assertEquals(TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR, ta.getTerminationCause());
@@ -6355,17 +6350,17 @@ public class TestVertexImpl {
     v1.handle(new VertexEventRouteEvent(v1.getVertexId(), Lists.newArrayList(tezEvent1)));
     dispatcher.await();
     assertTrue(v3.pendingTaskEvents.size() != 0);
-    ArgumentCaptor<DAGHistoryEvent> argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class);
-    verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
-    verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1);
+//    ArgumentCaptor<DAGHistoryEvent> argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class);
+//    verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
+//    verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1);
 
     v3.scheduleTasks(Lists.newArrayList(ScheduleTaskRequest.create(0, null)));
     dispatcher.await();
     assertTrue(v3.pendingTaskEvents.size() == 0);
     // recovery events is not only handled one time
-    argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class);
-    verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
-    verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1);
+//    argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class);
+//    verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
+//    verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1);
   }
 
   private void verifyHistoryEvents(List<DAGHistoryEvent> events, HistoryEventType eventType, int expectedTimes) {