You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 19:48:06 UTC

[25/50] [abbrv] tez git commit: TEZ-3644. Cleanup container list stored in AMNode. (sseth)

TEZ-3644. Cleanup container list stored in AMNode. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ce6ea6e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ce6ea6e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ce6ea6e

Branch: refs/heads/TEZ-1190
Commit: 4ce6ea6ed867a67600dbc36a2f56c37bbec3d708
Parents: 1f2a935
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Mar 2 16:02:16 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Mar 2 16:02:16 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../dag/app/rm/container/AMContainerImpl.java   |  62 +++++++--
 .../org/apache/tez/dag/app/rm/node/AMNode.java  |   3 +
 .../rm/node/AMNodeEventContainerCompleted.java  |  37 ++++++
 .../tez/dag/app/rm/node/AMNodeEventType.java    |   5 +-
 .../apache/tez/dag/app/rm/node/AMNodeImpl.java  |  67 ++++++++--
 .../tez/dag/app/rm/node/AMNodeTracker.java      |   5 +-
 .../dag/app/rm/node/PerSourceNodeTracker.java   |  11 +-
 .../dag/app/rm/container/TestAMContainer.java   | 128 ++++++++++++-------
 .../tez/dag/app/rm/node/TestAMNodeTracker.java  |  73 +++++++++++
 10 files changed, 322 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b8465de..07841bf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3644. Cleanup container list stored in AMNode.
   TEZ-3646. IFile.Writer has an extra output stream flush call
   TEZ-3643. Long running AMs can go out of memory due to retained AMContainer instances.
   TEZ-3637. TezMerger logs too much at INFO level

http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index ac429c7..18e72a7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -32,9 +32,12 @@ import org.apache.tez.Utils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
+import org.apache.tez.dag.app.rm.node.AMNodeEventContainerCompleted;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.Credentials;
@@ -48,7 +51,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
-import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.dag.app.AppContext;
@@ -118,6 +120,8 @@ public class AMContainerImpl implements AMContainer {
 
   private Credentials credentials;
   private boolean credentialsChanged = false;
+
+  private boolean completedMessageSent = false;
   
   // TODO Consider registering with the TAL, instead of the TAL pulling.
   // Possibly after splitting TAL and ContainerListener.
@@ -127,8 +131,11 @@ public class AMContainerImpl implements AMContainer {
 
   // TODO Create a generic ERROR state. Container tries informing relevant components in this case.
 
+  private final NonRunningStateEnteredCallback NON_RUNNING_STATE_ENTERED_CALLBACK = new NonRunningStateEnteredCallback();
+
+  private final StateMachineTez<AMContainerState, AMContainerEventType, AMContainerEvent, AMContainerImpl>
+      stateMachine;
 
-  private final StateMachine<AMContainerState, AMContainerEventType, AMContainerEvent> stateMachine;
   private static final StateMachineFactory
       <AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>
       stateMachineFactory =
@@ -328,7 +335,19 @@ public class AMContainerImpl implements AMContainer {
     this.schedulerId = schedulerId;
     this.launcherId = launcherId;
     this.taskCommId = taskCommId;
-    this.stateMachine = stateMachineFactory.make(this);
+    this.stateMachine = new StateMachineTez<>(stateMachineFactory.make(this), this);
+    augmentStateMachine();
+  }
+
+
+  private void augmentStateMachine() {
+    stateMachine
+        .registerStateEnteredCallback(AMContainerState.STOP_REQUESTED,
+            NON_RUNNING_STATE_ENTERED_CALLBACK)
+        .registerStateEnteredCallback(AMContainerState.STOPPING,
+            NON_RUNNING_STATE_ENTERED_CALLBACK)
+        .registerStateEnteredCallback(AMContainerState.COMPLETED,
+            NON_RUNNING_STATE_ENTERED_CALLBACK);
   }
 
   @Override
@@ -422,7 +441,7 @@ public class AMContainerImpl implements AMContainer {
         LOG.error("Can't handle event " + event.getType()
             + " at current state " + oldState + " for ContainerId "
             + this.containerId, e);
-        inError = true;
+        setError();
         // TODO Can't set state to COMPLETED. Add a default error state.
       }
       if (oldState != getState()) {
@@ -482,7 +501,7 @@ public class AMContainerImpl implements AMContainer {
                 msg, e));
         // We have not registered with any of the listeners etc yet. Send out a deallocateContainer
         // message and return. The AM will shutdown shortly.
-        container.inError = true;
+        container.setError();
         container.deAllocate();
         return;
       }
@@ -515,7 +534,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
-      container.inError = true;
+      container.setError();
       container.registerFailedAttempt(event.getTaskAttemptId());
       container.maybeSendNodeFailureForFailedAssignment(event
           .getTaskAttemptId());
@@ -961,7 +980,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
-      container.inError = true;
+      container.setError();
       String errorMessage = "AttemptId: " + event.getTaskAttemptId() +
           " cannot be allocated to container: " + container.getContainerId() +
           " in " + container.getState() + " state";
@@ -1032,7 +1051,7 @@ public class AMContainerImpl implements AMContainer {
 
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
-      container.inError = true;
+      container.setError();
     }
   }
 
@@ -1046,7 +1065,7 @@ public class AMContainerImpl implements AMContainer {
       // think the container is still around and assign a task to it. The task
       // ends up getting a CONTAINER_KILLED message. Task could handle this by
       // asking for a reschedule in this case. Will end up FAILING the task instead of KILLING it.
-      container.inError = true;
+      container.setError();
       AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
       String errorMessage = "AttemptId: " + event.getTaskAttemptId()
           + " cannot be allocated to container: " + container.getContainerId()
@@ -1058,9 +1077,19 @@ public class AMContainerImpl implements AMContainer {
     }
   }
 
+  private static class NonRunningStateEnteredCallback
+      implements OnStateChangedCallback<AMContainerState, AMContainerImpl> {
+
+    @Override
+    public void onStateChanged(AMContainerImpl amContainer,
+                               AMContainerState amContainerState) {
+      amContainer.handleNonRunningStateEntered();
+    }
+  }
+
   private void handleExtraTAAssign(
       AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) {
-    this.inError = true;
+    setError();
     String errorMessage = "AMScheduler Error: Multiple simultaneous " +
         "taskAttempt allocations to: " + this.getContainerId() +
         ". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() +
@@ -1078,6 +1107,19 @@ public class AMContainerImpl implements AMContainer {
     this.unregisterFromContainerListener();
   }
 
+  private void setError() {
+    this.inError = true;
+    handleNonRunningStateEntered();
+  }
+
+  private void handleNonRunningStateEntered() {
+    if (!completedMessageSent) {
+      completedMessageSent = true;
+      sendEvent(new AMNodeEventContainerCompleted(getContainer().getNodeId(),
+          schedulerId, containerId));
+    }
+  }
+
   protected void registerFailedAttempt(TezTaskAttemptID taId) {
     failedAssignments.add(taId);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
index 1c34816..bc01e04 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.dag.DAG;
 
 public interface AMNode extends EventHandler<AMNodeEvent> {
   
@@ -33,4 +34,6 @@ public interface AMNode extends EventHandler<AMNodeEvent> {
   public boolean isUnhealthy();
   public boolean isBlacklisted();
   public boolean isUsable();
+
+  void dagComplete(DAG dag);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerCompleted.java
new file mode 100644
index 0000000..f999c3a
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerCompleted.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class AMNodeEventContainerCompleted extends AMNodeEvent {
+
+  private final ContainerId containerId;
+
+  public AMNodeEventContainerCompleted(
+      NodeId nodeId,
+      int schedulerId, ContainerId containerId) {
+    super(nodeId, schedulerId, AMNodeEventType.N_CONTAINER_COMPLETED);
+    this.containerId = containerId;
+  }
+
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
index 86087d0..a141124 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
@@ -21,7 +21,10 @@ package org.apache.tez.dag.app.rm.node;
 public enum AMNodeEventType {
   //Producer: Scheduler
   N_CONTAINER_ALLOCATED,
-  
+
+  //Producer: Container
+  N_CONTAINER_COMPLETED,
+
   //Producer: TaskSchedulerEventHandler
   N_TA_SUCCEEDED,
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index bcc38c6..f4ad032 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -19,6 +19,8 @@
 package org.apache.tez.dag.app.rm.node;
 
 import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
@@ -26,6 +28,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.tez.dag.app.dag.DAG;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -60,20 +63,19 @@ public class AMNodeImpl implements AMNode {
   private boolean blacklistingEnabled;
   private boolean ignoreBlacklisting = false;
   private boolean nodeUpdatesRescheduleEnabled;
-  private Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet();
+  private final Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet();
 
   @SuppressWarnings("rawtypes")
   protected EventHandler eventHandler;
 
   @VisibleForTesting
-  final List<ContainerId> containers = new LinkedList<ContainerId>();
+  final Set<ContainerId> containers = new LinkedHashSet<>();
+  final Set<ContainerId> completedContainers = new HashSet<>();
   int numFailedTAs = 0;
   int numSuccessfulTAs = 0;
-  
-  //Book-keeping only. In case of Health status change.
-  private final List<ContainerId> pastContainers = new LinkedList<ContainerId>();
-
 
+  private static final ContainerCompletedTransition CONTAINER_COMPLETED_TRANSITION =
+      new ContainerCompletedTransition();
 
   private final StateMachine<AMNodeState, AMNodeEventType, AMNodeEvent> stateMachine;
 
@@ -103,6 +105,8 @@ public class AMNodeImpl implements AMNode {
           new IgnoreBlacklistingStateChangeTransition(true))
       .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
           AMNodeEventType.N_TURNED_HEALTHY)
+      .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
+          AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION)
 
       // Transitions from BLACKLISTED state.
       .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED,
@@ -120,6 +124,8 @@ public class AMNodeImpl implements AMNode {
       .addTransition(AMNodeState.BLACKLISTED, AMNodeState.FORCED_ACTIVE,
           AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED,
           new IgnoreBlacklistingStateChangeTransition(true))
+      .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED,
+          AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION)
       .addTransition(
           AMNodeState.BLACKLISTED,
           AMNodeState.BLACKLISTED,
@@ -142,6 +148,8 @@ public class AMNodeImpl implements AMNode {
           EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE),
           AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED,
           new IgnoreBlacklistingDisabledTransition())
+      .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE,
+          AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION)
       .addTransition(
           AMNodeState.FORCED_ACTIVE,
           AMNodeState.FORCED_ACTIVE,
@@ -168,6 +176,8 @@ public class AMNodeImpl implements AMNode {
           EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE),
           AMNodeEventType.N_TURNED_HEALTHY, new NodeTurnedHealthyTransition())
       .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
+          AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION)
+      .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
           AMNodeEventType.N_TURNED_UNHEALTHY, new GenericErrorTransition())
 
         .installTopology();
@@ -259,7 +269,6 @@ public class AMNodeImpl implements AMNode {
       sendEvent(new AMContainerEventNodeFailed(c, "Node blacklisted"));
     }
     // these containers are not useful anymore
-    pastContainers.addAll(containers);
     containers.clear();
     sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, schedulerId));
   }
@@ -295,9 +304,9 @@ public class AMNodeImpl implements AMNode {
     @Override
     public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
       AMNodeEventTaskAttemptEnded event = (AMNodeEventTaskAttemptEnded) nEvent;
-      LOG.info("Attempt failed on node: " + node.getNodeId() + " TA: "
-          + event.getTaskAttemptId() + " failed: " + event.failed()
-          + " container: " + event.getContainerId() + " numFailedTAs: "
+      LOG.info("Attempt " + (event.failed() ? "failed" : "killed") + "on node: " + node.getNodeId()
+          + " TA: " + event.getTaskAttemptId()
+          + ", container: " + event.getContainerId() + ", numFailedTAs: "
           + node.numFailedTAs);
       if (event.failed()) {
         // ignore duplicate attempt ids
@@ -381,8 +390,6 @@ public class AMNodeImpl implements AMNode {
       AMNodeEventContainerAllocated event = (AMNodeEventContainerAllocated) nEvent;
       node.sendEvent(new AMContainerEvent(event.getContainerId(),
           AMContainerEventType.C_STOP_REQUEST));
-      // ZZZ CReuse: Should the scheduler check node state before scheduling a
-      // container on it ?
     }
   }
 
@@ -434,7 +441,6 @@ public class AMNodeImpl implements AMNode {
       MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
     @Override
     public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
-      node.pastContainers.addAll(node.containers);
       node.containers.clear();
       if (node.ignoreBlacklisting) {
         return AMNodeState.FORCED_ACTIVE;
@@ -444,6 +450,17 @@ public class AMNodeImpl implements AMNode {
     }
   }
 
+  protected static class ContainerCompletedTransition
+      implements SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+
+    @Override
+    public void transition(AMNodeImpl amNode, AMNodeEvent amNodeEvent) {
+      AMNodeEventContainerCompleted cc =
+          (AMNodeEventContainerCompleted) amNodeEvent;
+      amNode.completedContainers.add(cc.getContainerId());
+    }
+  }
+
   @Override
   public boolean isUnhealthy() {
     this.readLock.lock();
@@ -468,4 +485,28 @@ public class AMNodeImpl implements AMNode {
   public boolean isUsable() {
     return !(isUnhealthy() || isBlacklisted());
   }
+
+  @Override
+  public void dagComplete(DAG dag) {
+    this.writeLock.lock();
+    try {
+      int countBefore = containers.size();
+      int countCompleted = completedContainers.size();
+
+
+      // Actual functionality.
+      containers.removeAll(completedContainers);
+      completedContainers.clear();
+
+      int countAfter = containers.size();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Node {}, cleaning up knownContainers. current={}, completed={}, postCleanup={}",
+            getNodeId(), countBefore, countCompleted, countAfter);
+      }
+
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index fdc8a4c..1536170 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -101,6 +101,7 @@ public class AMNodeTracker extends AbstractService implements
     // No synchronization required until there's multiple dispatchers.
     switch (rEvent.getType()) {
       case N_CONTAINER_ALLOCATED:
+      case N_CONTAINER_COMPLETED:
       case N_TA_SUCCEEDED:
       case N_TA_ENDED:
       case N_IGNORE_BLACKLISTING_ENABLED:
@@ -140,7 +141,9 @@ public class AMNodeTracker extends AbstractService implements
   }
 
   public void dagComplete(DAG dag) {
-    // TODO TEZ-2337 Maybe reset failures from previous DAGs
+    for (PerSourceNodeTracker perSourceNodeTracker : perSourceNodeTrackers.values()) {
+      perSourceNodeTracker.dagComplete(dag);
+    }
   }
 
   private PerSourceNodeTracker getAndCreateIfNeededPerSourceTracker(int schedulerId) {

http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
index 72c3230..74c6176 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAG;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -106,7 +107,8 @@ public class PerSourceNodeTracker {
         }
         break;
       default:
-        nodeMap.get(nodeId).handle(rEvent);
+        amNode = nodeMap.get(nodeId);
+        amNode.handle(rEvent);
     }
   }
 
@@ -186,6 +188,13 @@ public class PerSourceNodeTracker {
     }
   }
 
+  public void dagComplete(DAG dag) {
+    for (AMNode amNode : nodeMap.values()) {
+      amNode.dagComplete(dag);
+    }
+    // TODO TEZ-2337 Maybe reset failures from previous DAGs
+  }
+
   @SuppressWarnings("unchecked")
   private void sendEvent(Event<?> event) {
     this.eventHandler.handle(event);

http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 4d1bbae..1b9df99 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.app.TaskCommunicatorWrapper;
+import org.apache.tez.dag.app.rm.node.AMNodeEventType;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.ServicePluginException;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -146,7 +147,9 @@ public class TestAMContainer {
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
-    wc.verifyNoOutgoingEvents();
+    List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
     verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -196,7 +199,9 @@ public class TestAMContainer {
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
-    wc.verifyNoOutgoingEvents();
+    List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
     verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -266,7 +271,9 @@ public class TestAMContainer {
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
-    wc.verifyNoOutgoingEvents();
+    List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
     verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -288,9 +295,11 @@ public class TestAMContainer {
     wc.stopRequest();
     wc.verifyState(AMContainerState.STOP_REQUESTED);
     // Event to NM to stop the container.
-    wc.verifyCountAndGetOutgoingEvents(1);
-    assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
-        ContainerLauncherEventType.CONTAINER_STOP_REQUEST);
+
+    List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        ContainerLauncherEventType.CONTAINER_STOP_REQUEST,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
 
     wc.nmStopSent();
     wc.verifyState(AMContainerState.STOPPING);
@@ -323,9 +332,10 @@ public class TestAMContainer {
     wc.stopRequest();
     wc.verifyState(AMContainerState.STOP_REQUESTED);
     // Event to NM to stop the container.
-    wc.verifyCountAndGetOutgoingEvents(1);
-    assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
-        ContainerLauncherEventType.CONTAINER_STOP_REQUEST);
+    List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        ContainerLauncherEventType.CONTAINER_STOP_REQUEST,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
 
     wc.nmStopFailed();
     wc.verifyState(AMContainerState.STOPPING);
@@ -366,11 +376,12 @@ public class TestAMContainer {
         "Multiple simultaneous taskAttempt");
     verify(wc.chh).unregister(wc.containerID);
     // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         ContainerLauncherEventType.CONTAINER_STOP_REQUEST,
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
-        TaskAttemptEventType.TA_CONTAINER_TERMINATING);
+        TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
     assertTrue(wc.amContainer.isInErrorState());
 
     wc.nmStopSent();
@@ -405,11 +416,12 @@ public class TestAMContainer {
         "Multiple simultaneous taskAttempt");
     verify(wc.chh).unregister(wc.containerID);
     // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         ContainerLauncherEventType.CONTAINER_STOP_REQUEST,
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
-        TaskAttemptEventType.TA_CONTAINER_TERMINATING);
+        TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
     assertTrue(wc.amContainer.isInErrorState());
 
     wc.nmStopSent();
@@ -442,10 +454,11 @@ public class TestAMContainer {
         "timed out");
     verify(wc.chh).unregister(wc.containerID);
     // 1 to TA, 1 for RM de-allocate.
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
-        ContainerLauncherEventType.CONTAINER_STOP_REQUEST);
+        ContainerLauncherEventType.CONTAINER_STOP_REQUEST,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
     // TODO Should this be an RM DE-ALLOCATE instead ?
 
     wc.containerCompleted();
@@ -477,10 +490,11 @@ public class TestAMContainer {
         "received a STOP_REQUEST");
     verify(wc.chh).unregister(wc.containerID);
     // 1 to TA, 1 for RM de-allocate.
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
-        ContainerLauncherEventType.CONTAINER_STOP_REQUEST);
+        ContainerLauncherEventType.CONTAINER_STOP_REQUEST,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
     // TODO Should this be an RM DE-ALLOCATE instead ?
 
     wc.containerCompleted();
@@ -511,10 +525,11 @@ public class TestAMContainer {
     verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.LAUNCH_FAILED,
         "launchFailed");
 
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
-        AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+        AMSchedulerEventType.S_CONTAINER_DEALLOCATE,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
     for (Event e : outgoingEvents) {
       if (e.getType() == TaskAttemptEventType.TA_CONTAINER_TERMINATING) {
         Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,
@@ -538,7 +553,9 @@ public class TestAMContainer {
 
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
-    wc.verifyNoOutgoingEvents();
+    List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
 
     assertFalse(wc.amContainer.isInErrorState());
   }
@@ -561,9 +578,10 @@ public class TestAMContainer {
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
     verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
 
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
-        TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
     Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,
         ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
 
@@ -591,9 +609,10 @@ public class TestAMContainer {
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
     verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER, "DiskFailed");
 
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
-        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
     Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
         ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
 
@@ -623,9 +642,10 @@ public class TestAMContainer {
     verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.NODE_FAILED,
         "NodeFailed");
 
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
-        TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
     Assert.assertEquals(TaskAttemptTerminationCause.NODE_FAILED,
         ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
 
@@ -656,11 +676,12 @@ public class TestAMContainer {
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
-    wc.verifyCountAndGetOutgoingEvents(0);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
 
     assertFalse(wc.amContainer.isInErrorState());
 
-    wc.verifyNoOutgoingEvents();
     wc.verifyHistoryStopEvent();
 
     assertFalse(wc.amContainer.isInErrorState());
@@ -685,9 +706,10 @@ public class TestAMContainer {
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
-        TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
 
     assertFalse(wc.amContainer.isInErrorState());
 
@@ -722,11 +744,14 @@ public class TestAMContainer {
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+
+    Event event = findEventByType(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
     Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION,
-        ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
+        ((TaskAttemptEventContainerTerminatedBySystem)event).getTerminationCause());
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
-        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
 
     assertFalse(wc.amContainer.isInErrorState());
 
@@ -761,9 +786,10 @@ public class TestAMContainer {
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
-        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
     Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
         ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
 
@@ -799,11 +825,13 @@ public class TestAMContainer {
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    Event event = findEventByType(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
     Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
-        ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
+        ((TaskAttemptEventContainerTerminatedBySystem)event).getTerminationCause());
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
-        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
 
     assertFalse(wc.amContainer.isInErrorState());
 
@@ -862,11 +890,12 @@ public class TestAMContainer {
     wc.nodeFailed();
     // Expecting a complete event from the RM
     wc.verifyState(AMContainerState.STOPPING);
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_NODE_FAILED,
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
-        AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+        AMSchedulerEventType.S_CONTAINER_DEALLOCATE,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
 
     for (Event event : outgoingEvents) {
       if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
@@ -904,11 +933,12 @@ public class TestAMContainer {
     wc.nodeFailed();
     // Expecting a complete event from the RM
     wc.verifyState(AMContainerState.STOPPING);
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_NODE_FAILED,
         TaskAttemptEventType.TA_NODE_FAILED,
-        AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+        AMSchedulerEventType.S_CONTAINER_DEALLOCATE,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
 
     for (Event event : outgoingEvents) {
       if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
@@ -945,12 +975,13 @@ public class TestAMContainer {
     wc.nodeFailed();
     // Expecting a complete event from the RM
     wc.verifyState(AMContainerState.STOPPING);
-    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4);
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(5);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_NODE_FAILED,
         TaskAttemptEventType.TA_NODE_FAILED,
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
-        AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+        AMSchedulerEventType.S_CONTAINER_DEALLOCATE,
+        AMNodeEventType.N_CONTAINER_COMPLETED);
 
     for (Event event : outgoingEvents) {
       if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
@@ -1439,6 +1470,15 @@ public class TestAMContainer {
     assertTrue("Found unexpected events: " + eventsCopy
         + " in outgoing event list", eventsCopy.isEmpty());
   }
+
+  private Event findEventByType(List<Event> events, Enum<?> type) {
+    for (Event event : events) {
+      if (event.getType() == type) {
+        return event;
+      }
+    }
+    return null;
+  }
   
   private LocalResource createLocalResource(String name) {
     LocalResource lr = LocalResource.newInstance(URL.newInstance(null, "localhost", 2321, name),

http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
index e123dd1..11d3b7a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.mock;
 
 import java.util.List;
 
+import org.apache.tez.dag.app.dag.DAG;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -326,6 +327,78 @@ public class TestAMNodeTracker {
     }
   }
 
+  @Test(timeout = 10000L)
+  public void testNodeCompletedAndCleanup() {
+    AppContext appContext = mock(AppContext.class);
+    Configuration conf = new Configuration(false);
+    conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+    TestEventHandler handler = new TestEventHandler();
+    AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    AMContainerMap amContainerMap = mock(AMContainerMap.class);
+    TaskSchedulerManager taskSchedulerManager =
+        mock(TaskSchedulerManager.class);
+    dispatcher.register(AMNodeEventType.class, amNodeTracker);
+    dispatcher.register(AMContainerEventType.class, amContainerMap);
+    dispatcher.register(AMSchedulerEventType.class, taskSchedulerManager);
+    amNodeTracker.init(conf);
+    amNodeTracker.start();
+
+    try {
+
+      NodeId nodeId = NodeId.newInstance("fakenode", 3333);
+      amNodeTracker.nodeSeen(nodeId, 0);
+
+      AMNode amNode = amNodeTracker.get(nodeId, 0);
+      ContainerId[] containerIds = new ContainerId[7];
+
+      // Start 5 containers.
+      for (int i = 0; i < 5; i++) {
+        containerIds[i] = mock(ContainerId.class);
+        amNodeTracker
+            .handle(new AMNodeEventContainerAllocated(nodeId, 0, containerIds[i]));
+      }
+      assertEquals(5, amNode.getContainers().size());
+
+      // Finnish 1st dag
+      amNodeTracker.dagComplete(mock(DAG.class));
+      assertEquals(5, amNode.getContainers().size());
+
+
+      // Mark 2 as complete. Finish 2nd dag.
+      for (int i = 0; i < 2; i++) {
+        amNodeTracker.handle(
+            new AMNodeEventContainerCompleted(nodeId, 0, containerIds[i]));
+      }
+      amNodeTracker.dagComplete(mock(DAG.class));
+      assertEquals(3, amNode.getContainers().size());
+
+      // Add 2 more containers. Mark all as complete. Finish 3rd dag.
+      for (int i = 5; i < 7; i++) {
+        containerIds[i] = mock(ContainerId.class);
+        amNodeTracker
+            .handle(new AMNodeEventContainerAllocated(nodeId, 0, containerIds[i]));
+      }
+      assertEquals(5, amNode.getContainers().size());
+      amNodeTracker.dagComplete(mock(DAG.class));
+      assertEquals(5, amNode.getContainers().size());
+      amNodeTracker.dagComplete(mock(DAG.class));
+      assertEquals(5, amNode.getContainers().size());
+
+      for (int i = 2; i < 7; i++) {
+        amNodeTracker.handle(
+            new AMNodeEventContainerCompleted(nodeId, 0, containerIds[i]));
+      }
+      assertEquals(5, amNode.getContainers().size());
+      amNodeTracker.dagComplete(mock(DAG.class));
+      assertEquals(0, amNode.getContainers().size());
+
+    } finally {
+      amNodeTracker.stop();
+    }
+
+  }
+
   @Test(timeout=10000)
   public void testNodeUnhealthyRescheduleTasksEnabled() throws Exception {
     _testNodeUnhealthyRescheduleTasks(true);