You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/12/05 02:14:02 UTC

git commit: TEZ-660. Successful TaskAttempts belonging to LeafVertices should not be marked as KILLED in case of NodeFailure. (sseth) case

Updated Branches:
  refs/heads/master af170946e -> 47e897922


TEZ-660. Successful TaskAttempts belonging to LeafVertices should not be
marked as KILLED in case of NodeFailure. (sseth)
case


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/47e89792
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/47e89792
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/47e89792

Branch: refs/heads/master
Commit: 47e897922c63046fb56088420755bd1b857e5451
Parents: af17094
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Dec 4 17:12:35 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Dec 4 17:12:35 2013 -0800

----------------------------------------------------------------------
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  30 ++-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |   6 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 198 ++++++++++++++++++-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |   2 +-
 4 files changed, 217 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/47e89792/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index d6e9d0b..28b8809 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -144,6 +144,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   protected final boolean isRescheduled;
   private final Resource taskResource;
   private final ContainerContext containerContext;
+  private final boolean leafVertex;
 
   protected static final FailedTransitionHelper FAILED_HELPER =
       new FailedTransitionHelper();
@@ -154,6 +155,9 @@ public class TaskAttemptImpl implements TaskAttempt,
   private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
       DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION =
           new DiagnosticInformationUpdater();
+  
+  private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
+      TERMINATED_AFTER_SUCCESS_HELPER = new TerminatedAfterSuccessHelper(KILLED_HELPER);
 
   private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
       STATUS_UPDATER = new StatusUpdaterTransition();
@@ -224,8 +228,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         // How will duplicate history events be handled ?
         // TODO Maybe consider not failing REDUCE tasks in this case. Also, MAP_TASKS in case there's only one phase in the job.
         .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedAfterSuccessTransition(KILLED_HELPER))
-        .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedAfterSuccessTransition(KILLED_HELPER))
+        .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedAfterSuccessTransition())
+        .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_NODE_FAILED, new TerminatedAfterSuccessTransition())
         .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
         .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
 
@@ -239,7 +243,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskAttemptListener tal, Configuration conf, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       TaskLocationHint locationHint, boolean isRescheduled,
-      Resource resource, ContainerContext containerContext) {
+      Resource resource, ContainerContext containerContext, boolean leafVertex) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -258,6 +262,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.isRescheduled = isRescheduled;
     this.taskResource = resource;
     this.containerContext = containerContext;
+    this.leafVertex = leafVertex;
   }
 
 
@@ -1128,7 +1133,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   }
 
-  protected static class TerminatedAfterSuccessTransition extends
+  protected static class TerminatedAfterSuccessHelper extends
       TerminatedBeforeRunningTransition {
 
     @Override
@@ -1138,7 +1143,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       return false;
     }
     
-    public TerminatedAfterSuccessTransition(TerminatedTransitionHelper helper) {
+    public TerminatedAfterSuccessHelper(TerminatedTransitionHelper helper) {
       super(helper);
     }
 
@@ -1150,6 +1155,18 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   }
   
+  protected static class TerminatedAfterSuccessTransition implements
+      MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
+    @Override
+    public TaskAttemptStateInternal transition(TaskAttemptImpl attempt, TaskAttemptEvent event) {
+      if (attempt.leafVertex) {
+        return TaskAttemptStateInternal.SUCCEEDED;
+      }
+      TaskAttemptImpl.TERMINATED_AFTER_SUCCESS_HELPER.transition(attempt, event);
+      return TaskAttemptStateInternal.KILLED;
+    }
+  }
+  
   protected static class OutputReportedFailedTransition implements
   MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
 
@@ -1185,8 +1202,9 @@ public class TaskAttemptImpl implements TaskAttempt,
       String message = attempt.getID() + " being failed for too many output errors";
       LOG.info(message);
       attempt.addDiagnosticInfo(message);
+      // Not checking for leafVertex since a READ_ERROR should only be reported for intermediate tasks.
       if (attempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED) {
-        (new TerminatedAfterSuccessTransition(FAILED_HELPER)).transition(
+        (new TerminatedAfterSuccessHelper(FAILED_HELPER)).transition(
             attempt, event);
         return TaskAttemptStateInternal.FAILED;
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/47e89792/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 30e6e2a..416f9f8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -114,7 +114,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
       new ArrayList(0);
 
-
   // counts the number of attempts that are either running or in a state where
   //  they will come to be running when they get a Container
   private int numberUncompletedAttempts = 0;
@@ -677,7 +676,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   TaskAttemptImpl createAttempt(int attemptNumber) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
         taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
-        locationHint, (failedAttempts > 0), taskResource, containerContext);
+        locationHint, (failedAttempts > 0), taskResource, containerContext, leafVertex);
   }
 
   protected TaskAttempt getSuccessfulAttempt() {
@@ -1083,7 +1082,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       if (task.leafVertex) {
-        LOG.error("Unexpected event for task of leaf vertex " + event.getType());
+        LOG.error("Unexpected event for task of leaf vertex " + event.getType() + ", taskId: "
+            + task.getTaskId());
         task.internalError(event.getType());
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/47e89792/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index dcf9aef..97aee2b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -71,6 +71,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
@@ -89,6 +90,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mortbay.log.Log;
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestTaskAttempt {
@@ -134,7 +136,7 @@ public class TestTaskAttempt {
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
         mock(TaskHeartbeatHandler.class), mock(AppContext.class),
-        locationHint, false, Resource.newInstance(1024, 1), createFakeContainerContext());
+        locationHint, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
 
     TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class);
 
@@ -178,7 +180,7 @@ public class TestTaskAttempt {
         mock(TaskAttemptListener.class), new Configuration(),
         new SystemClock(), mock(TaskHeartbeatHandler.class),
         mock(AppContext.class), locationHint, false, Resource.newInstance(1024,
-            1), createFakeContainerContext());
+            1), createFakeContainerContext(), false);
 
     TaskAttemptImpl spyTa = spy(taImpl);
     when(spyTa.resolveHosts(hosts)).thenReturn(
@@ -324,7 +326,7 @@ public class TestTaskAttempt {
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
         mock(TaskHeartbeatHandler.class), mockAppContext, locationHint, false,
-        resource, createFakeContainerContext());
+        resource, createFakeContainerContext(), false);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
@@ -387,7 +389,7 @@ public class TestTaskAttempt {
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
         mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
-        resource, createFakeContainerContext());
+        resource, createFakeContainerContext(), false);
 
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
@@ -478,7 +480,7 @@ public class TestTaskAttempt {
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
         mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
-        resource, createFakeContainerContext());
+        resource, createFakeContainerContext(), false);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
     // At state STARTING.
@@ -540,7 +542,7 @@ public class TestTaskAttempt {
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
         mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
-        resource, createFakeContainerContext());
+        resource, createFakeContainerContext(), false);
 
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
@@ -583,6 +585,184 @@ public class TestTaskAttempt {
     assertEquals(0, taImpl.getDiagnostics().size());
   }
 
+  @Test
+  // Ensure node failure on Successful Non-Leaf tasks cause them to be marked as KILLED
+  public void testNodeFailedNonLeafVertex() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    TaskLocationHint locationHint = new TaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
+        resource, createFakeContainerContext(), false);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null));
+    assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING,
+        taImpl.getState());
+
+    int expectedEventsAtRunning = 3;
+    verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
+
+    taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in the  SUCCEEDED state", TaskAttemptState.SUCCEEDED,
+        taImpl.getState());
+
+    assertEquals(0, taImpl.getDiagnostics().size());
+
+    int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
+
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+
+    // Send out a Node Failure.
+    taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
+
+    // Verify one event to the Task informing it about FAILURE. No events to scheduler. Counter event.
+    int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 2;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEventsNodeFailure)).handle(arg.capture());
+    verifyEventType(
+        arg.getAllValues().subList(expectedEvenstAfterTerminating,
+            expectedEventsNodeFailure), TaskEventTAUpdate.class, 1);
+
+    // Verify still in KILLED state
+    assertEquals("Task attempt is not in the  KILLED state", TaskAttemptState.KILLED,
+        taImpl.getState());
+  }
+  
+  @Test
+  // Ensure node failure on Successful Leaf tasks do not cause them to be marked as KILLED
+  public void testNodeFailedLeafVertex() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    TaskLocationHint locationHint = new TaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
+        resource, createFakeContainerContext(), true);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null));
+    assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING,
+        taImpl.getState());
+
+    int expectedEventsAtRunning = 3;
+    verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
+
+    taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in the  SUCCEEDED state", TaskAttemptState.SUCCEEDED,
+        taImpl.getState());
+
+    assertEquals(0, taImpl.getDiagnostics().size());
+
+    int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
+
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+
+    // Send out a Node Failure.
+    taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
+
+    // Verify no additional events
+    int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 0;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEventsNodeFailure)).handle(arg.capture());
+
+    // Verify still in SUCCEEDED state
+    assertEquals("Task attempt is not in the  SUCCEEDED state", TaskAttemptState.SUCCEEDED,
+        taImpl.getState());
+  }
 
   @Test
   // Verifies that multiple TooManyFetchFailures are handled correctly by the
@@ -629,7 +809,7 @@ public class TestTaskAttempt {
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
         mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
-        resource, createFakeContainerContext());
+        resource, createFakeContainerContext(), false);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
     // At state STARTING.
@@ -721,10 +901,10 @@ public class TestTaskAttempt {
         Configuration conf, Clock clock,
         TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
         TaskLocationHint locationHint,  boolean isRescheduled,
-        Resource resource, ContainerContext containerContext) {
+        Resource resource, ContainerContext containerContext, boolean leafVertex) {
       super(taskId, attemptNumber, eventHandler, tal, conf,
           clock, taskHeartbeatHandler, appContext,
-          locationHint, isRescheduled, resource, containerContext);
+          locationHint, isRescheduled, resource, containerContext, leafVertex);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/47e89792/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 4cdc2fd..7dd6df0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -539,7 +539,7 @@ public class TestTaskImpl {
         TaskLocationHint locationHing, boolean isRescheduled,
         Resource resource, ContainerContext containerContext) {
       super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
-          appContext, locationHing, isRescheduled, resource, containerContext);
+          appContext, locationHing, isRescheduled, resource, containerContext, false);
     }
 
     @Override