You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2014/10/02 00:38:59 UTC

git commit: YARN-2630. Prevented previous AM container status from being acquired by the current restarted AM. Contributed by Jian He.

Repository: hadoop
Updated Branches:
  refs/heads/trunk dd1b8f2ed -> 52bbe0f11


YARN-2630. Prevented previous AM container status from being acquired by the current restarted AM. Contributed by Jian He.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52bbe0f1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52bbe0f1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52bbe0f1

Branch: refs/heads/trunk
Commit: 52bbe0f11bc8e97df78a1ab9b63f4eff65fd7a76
Parents: dd1b8f2
Author: Zhijie Shen <zj...@apache.org>
Authored: Wed Oct 1 15:38:11 2014 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Wed Oct 1 15:38:11 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../distributedshell/ApplicationMaster.java     |  1 +
 .../protocolrecords/NodeHeartbeatResponse.java  |  9 +--
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    | 45 ++++++-------
 .../yarn_server_common_service_protos.proto     |  2 +-
 .../nodemanager/NodeStatusUpdaterImpl.java      |  2 +-
 .../nodemanager/TestNodeStatusUpdater.java      |  2 +-
 .../rmapp/attempt/RMAppAttemptImpl.java         | 69 +++++++++++++-------
 .../resourcemanager/rmnode/RMNodeImpl.java      | 25 +++----
 .../applicationsmanager/TestAMRestart.java      | 24 ++-----
 .../attempt/TestRMAppAttemptTransitions.java    | 12 ++--
 11 files changed, 108 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3878ca1..4d05757 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -503,6 +503,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2602. Fixed possible NPE in ApplicationHistoryManagerOnTimelineStore.
     (Zhijie Shen via jianhe)
 
+    YARN-2630. Prevented previous AM container status from being acquired by the
+    current restarted AM. (Jian He via zjshen) 
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 2451030..df9f34b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -668,6 +668,7 @@ public class ApplicationMaster {
           + ", completed=" + numCompletedContainers.get() + ", allocated="
           + numAllocatedContainers.get() + ", failed="
           + numFailedContainers.get();
+      LOG.info(appMessage);
       success = false;
     }
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 9887acc..12e1f54 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -30,7 +30,7 @@ public interface NodeHeartbeatResponse {
   NodeAction getNodeAction();
 
   List<ContainerId> getContainersToCleanup();
-  List<ContainerId> getFinishedContainersPulledByAM();
+  List<ContainerId> getContainersToBeRemovedFromNM();
 
   List<ApplicationId> getApplicationsToCleanup();
 
@@ -45,9 +45,10 @@ public interface NodeHeartbeatResponse {
 
   void addAllContainersToCleanup(List<ContainerId> containers);
 
-  // This tells NM to remove finished containers only after the AM
-  // has actually received it in a previous allocate response
-  void addFinishedContainersPulledByAM(List<ContainerId> containers);
+  // This tells NM to remove finished containers from its context. Currently, NM
+  // will remove finished containers from its context only after AM has actually
+  // received the finished containers in a previous allocate response
+  void addContainersToBeRemovedFromNM(List<ContainerId> containers);
   
   void addAllApplicationsToCleanup(List<ApplicationId> applications);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index e9296f4..78979d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -40,13 +40,14 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
 
     
-public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse {
+public class NodeHeartbeatResponsePBImpl extends
+    ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse {
   NodeHeartbeatResponseProto proto = NodeHeartbeatResponseProto.getDefaultInstance();
   NodeHeartbeatResponseProto.Builder builder = null;
   boolean viaProto = false;
   
   private List<ContainerId> containersToCleanup = null;
-  private List<ContainerId> finishedContainersPulledByAM = null;
+  private List<ContainerId> containersToBeRemovedFromNM = null;
   private List<ApplicationId> applicationsToCleanup = null;
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
@@ -74,8 +75,8 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
     if (this.applicationsToCleanup != null) {
       addApplicationsToCleanupToProto();
     }
-    if (this.finishedContainersPulledByAM != null) {
-      addFinishedContainersPulledByAMToProto();
+    if (this.containersToBeRemovedFromNM != null) {
+      addContainersToBeRemovedFromNMToProto();
     }
     if (this.containerTokenMasterKey != null) {
       builder.setContainerTokenMasterKey(
@@ -204,9 +205,9 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
   }
 
   @Override
-  public List<ContainerId> getFinishedContainersPulledByAM() {
-    initFinishedContainersPulledByAM();
-    return this.finishedContainersPulledByAM;
+  public List<ContainerId> getContainersToBeRemovedFromNM() {
+    initContainersToBeRemovedFromNM();
+    return this.containersToBeRemovedFromNM;
   }
 
   private void initContainersToCleanup() {
@@ -222,16 +223,16 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
     }
   }
 
-  private void initFinishedContainersPulledByAM() {
-    if (this.finishedContainersPulledByAM != null) {
+  private void initContainersToBeRemovedFromNM() {
+    if (this.containersToBeRemovedFromNM != null) {
       return;
     }
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
-    List<ContainerIdProto> list = p.getFinishedContainersPulledByAmList();
-    this.finishedContainersPulledByAM = new ArrayList<ContainerId>();
+    List<ContainerIdProto> list = p.getContainersToBeRemovedFromNmList();
+    this.containersToBeRemovedFromNM = new ArrayList<ContainerId>();
 
     for (ContainerIdProto c : list) {
-      this.finishedContainersPulledByAM.add(convertFromProtoFormat(c));
+      this.containersToBeRemovedFromNM.add(convertFromProtoFormat(c));
     }
   }
 
@@ -245,12 +246,12 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
   }
 
   @Override
-  public void addFinishedContainersPulledByAM(
-      final List<ContainerId> finishedContainersPulledByAM) {
-    if (finishedContainersPulledByAM == null)
+  public void
+      addContainersToBeRemovedFromNM(final List<ContainerId> containers) {
+    if (containers == null)
       return;
-    initFinishedContainersPulledByAM();
-    this.finishedContainersPulledByAM.addAll(finishedContainersPulledByAM);
+    initContainersToBeRemovedFromNM();
+    this.containersToBeRemovedFromNM.addAll(containers);
   }
 
   private void addContainersToCleanupToProto() {
@@ -288,10 +289,10 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
     builder.addAllContainersToCleanup(iterable);
   }
 
-  private void addFinishedContainersPulledByAMToProto() {
+  private void addContainersToBeRemovedFromNMToProto() {
     maybeInitBuilder();
-    builder.clearFinishedContainersPulledByAm();
-    if (finishedContainersPulledByAM == null)
+    builder.clearContainersToBeRemovedFromNm();
+    if (containersToBeRemovedFromNM == null)
       return;
     Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
 
@@ -299,7 +300,7 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
       public Iterator<ContainerIdProto> iterator() {
         return new Iterator<ContainerIdProto>() {
 
-          Iterator<ContainerId> iter = finishedContainersPulledByAM.iterator();
+          Iterator<ContainerId> iter = containersToBeRemovedFromNM.iterator();
 
           @Override
           public boolean hasNext() {
@@ -320,7 +321,7 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
 
       }
     };
-    builder.addAllFinishedContainersPulledByAm(iterable);
+    builder.addAllContainersToBeRemovedFromNm(iterable);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 600f54d..d0990fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -58,7 +58,7 @@ message NodeHeartbeatResponseProto {
   repeated ApplicationIdProto applications_to_cleanup = 6;
   optional int64 nextHeartBeatInterval = 7;
   optional string diagnostics_message = 8;
-  repeated ContainerIdProto finished_containers_pulled_by_am = 9;
+  repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
 }
 
 message NMContainerStatusProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index b4dcf1f..eecba39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -558,7 +558,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             // when NM re-registers with RM.
             // Only remove the cleanedup containers that are acked
             removeCompletedContainersFromContext(response
-                  .getFinishedContainersPulledByAM());
+                  .getContainersToBeRemovedFromNM());
 
             lastHeartBeatID = response.getResponseId();
             List<ContainerId> containersToCleanup = response

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 8fb51a3..7837846 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -692,7 +692,7 @@ public class TestNodeStatusUpdater {
       NodeHeartbeatResponse nhResponse =
           YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
             heartBeatNodeAction, null, null, null, null, 1000L);
-      nhResponse.addFinishedContainersPulledByAM(finishedContainersPulledByAM);
+      nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM);
       return nhResponse;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index d75a871..fbcb7d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -687,20 +687,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
 
       // A new allocate means the AM received the previously sent
       // finishedContainers. We can ack this to NM now
-      for (NodeId nodeId:finishedContainersSentToAM.keySet()) {
-
-        // Clear and get current values
-        List<ContainerStatus> currentSentContainers =
-            finishedContainersSentToAM
-            .put(nodeId, new ArrayList<ContainerStatus>());
-        List<ContainerId> containerIdList = new ArrayList<ContainerId>
-            (currentSentContainers.size());
-        for (ContainerStatus containerStatus:currentSentContainers) {
-          containerIdList.add(containerStatus.getContainerId());
-        }
-        eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(
-            nodeId, containerIdList));
-      }
+      sendFinishedContainersToNM();
 
       // Mark every containerStatus as being sent to AM though we may return
       // only the ones that belong to the current attempt
@@ -1592,14 +1579,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       ContainerStatus containerStatus =
           containerFinishedEvent.getContainerStatus();
 
-      // Add all finished containers so that they can be acked to NM
-      addJustFinishedContainer(appAttempt, containerFinishedEvent);
-
       // Is this container the AmContainer? If the finished container is same as
       // the AMContainer, AppAttempt fails
       if (appAttempt.masterContainer != null
           && appAttempt.masterContainer.getId().equals(
               containerStatus.getContainerId())) {
+        appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
 
         // Remember the follow up transition and save the final attempt state.
         appAttempt.rememberTargetTransitionsAndStoreState(event,
@@ -1607,10 +1592,46 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         return RMAppAttemptState.FINAL_SAVING;
       }
 
+      // Add all finished containers so that they can be acked to NM
+      addJustFinishedContainer(appAttempt, containerFinishedEvent);
       return this.currentState;
     }
   }
 
+
+  // Ack NM to remove finished containers from context.
+  private void sendFinishedContainersToNM() {
+    for (NodeId nodeId : finishedContainersSentToAM.keySet()) {
+
+      // Clear and get current values
+      List<ContainerStatus> currentSentContainers =
+          finishedContainersSentToAM.put(nodeId,
+            new ArrayList<ContainerStatus>());
+      List<ContainerId> containerIdList =
+          new ArrayList<ContainerId>(currentSentContainers.size());
+      for (ContainerStatus containerStatus : currentSentContainers) {
+        containerIdList.add(containerStatus.getContainerId());
+      }
+      eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
+        containerIdList));
+    }
+  }
+
+  // Add am container to the list so that am container instance will be
+  // removed from NMContext.
+  private void sendAMContainerToNM(RMAppAttemptImpl appAttempt,
+      RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
+    NodeId nodeId = containerFinishedEvent.getNodeId();
+    finishedContainersSentToAM.putIfAbsent(nodeId,
+      new ArrayList<ContainerStatus>());
+    appAttempt.finishedContainersSentToAM.get(nodeId).add(
+      containerFinishedEvent.getContainerStatus());
+    if (!appAttempt.getSubmissionContext()
+      .getKeepContainersAcrossApplicationAttempts()) {
+      appAttempt.sendFinishedContainersToNM();
+    }
+  }
+
   private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
       RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
     appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent
@@ -1661,16 +1682,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       ContainerStatus containerStatus =
           containerFinishedEvent.getContainerStatus();
 
-      // Add all finished containers so that they can be acked to NM.
-      addJustFinishedContainer(appAttempt, containerFinishedEvent);
-
       // Is this container the ApplicationMaster container?
       if (appAttempt.masterContainer.getId().equals(
           containerStatus.getContainerId())) {
         new FinalTransition(RMAppAttemptState.FINISHED).transition(
             appAttempt, containerFinishedEvent);
+        appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
         return RMAppAttemptState.FINISHED;
       }
+      // Add all finished containers so that they can be acked to NM.
+      addJustFinishedContainer(appAttempt, containerFinishedEvent);
 
       return RMAppAttemptState.FINISHING;
     }
@@ -1686,14 +1707,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       ContainerStatus containerStatus =
           containerFinishedEvent.getContainerStatus();
 
-      // Add all finished containers so that they can be acked to NM.
-      addJustFinishedContainer(appAttempt, containerFinishedEvent);
-
       // If this is the AM container, it means the AM container is finished,
       // but we are not yet acknowledged that the final state has been saved.
       // Thus, we still return FINAL_SAVING state here.
       if (appAttempt.masterContainer.getId().equals(
         containerStatus.getContainerId())) {
+        appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
+
         if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
             || appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
           // ignore Container_Finished Event if we were supposed to reach
@@ -1708,6 +1728,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
             appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
         return;
       }
+
+      // Add all finished containers so that they can be acked to NM.
+      addJustFinishedContainer(appAttempt, containerFinishedEvent);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 1123a98..c960b50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -112,8 +112,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
       new ContainerIdComparator());
 
-  /* set of containers that were notified to AM about their completion */
-  private final Set<ContainerId> finishedContainersPulledByAM =
+  /*
+   * set of containers to notify NM to remove them from its context. Currently,
+   * this includes containers that were notified to AM about their completion
+   */
+  private final Set<ContainerId> containersToBeRemovedFromNM =
       new HashSet<ContainerId>();
 
   /* the list of applications that have finished and need to be purged */
@@ -157,7 +160,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
-         new FinishedContainersPulledByAMTransition())
+         new AddContainersToBeRemovedFromNMTransition())
      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
          RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
@@ -174,7 +177,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
          new UpdateNodeResourceWhenUnusableTransition())
      .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
-         new FinishedContainersPulledByAMTransition())
+         new AddContainersToBeRemovedFromNMTransition())
 
      //Transitions from LOST state
      .addTransition(NodeState.LOST, NodeState.LOST,
@@ -182,7 +185,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
          new UpdateNodeResourceWhenUnusableTransition())
      .addTransition(NodeState.LOST, NodeState.LOST,
          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
-         new FinishedContainersPulledByAMTransition())
+         new AddContainersToBeRemovedFromNMTransition())
 
      //Transitions from UNHEALTHY state
      .addTransition(NodeState.UNHEALTHY,
@@ -208,7 +211,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
          RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
-         new FinishedContainersPulledByAMTransition())
+         new AddContainersToBeRemovedFromNMTransition())
 
      // create the topology tables
      .installTopology(); 
@@ -382,11 +385,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       response.addAllContainersToCleanup(
           new ArrayList<ContainerId>(this.containersToClean));
       response.addAllApplicationsToCleanup(this.finishedApplications);
-      response.addFinishedContainersPulledByAM(
-          new ArrayList<ContainerId>(this.finishedContainersPulledByAM));
+      response.addContainersToBeRemovedFromNM(
+          new ArrayList<ContainerId>(this.containersToBeRemovedFromNM));
       this.containersToClean.clear();
       this.finishedApplications.clear();
-      this.finishedContainersPulledByAM.clear();
+      this.containersToBeRemovedFromNM.clear();
     } finally {
       this.writeLock.unlock();
     }
@@ -659,12 +662,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
   }
 
-  public static class FinishedContainersPulledByAMTransition implements
+  public static class AddContainersToBeRemovedFromNMTransition implements
       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
-      rmNode.finishedContainersPulledByAM.addAll(((
+      rmNode.containersToBeRemovedFromNM.addAll(((
           RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index ba592fc..fcb4e45 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -98,9 +98,6 @@ public class TestAMRestart {
       Thread.sleep(200);
     }
 
-    ContainerId amContainerId = ContainerId.newInstance(am1
-        .getApplicationAttemptId(), 1);
-
     // launch the 2nd container, for testing running container transferred.
     nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
     ContainerId containerId2 =
@@ -199,15 +196,11 @@ public class TestAMRestart {
     // completed containerId4 is also transferred to the new attempt.
     RMAppAttempt newAttempt =
         app1.getRMAppAttempt(am2.getApplicationAttemptId());
-    // 4 containers finished, acquired/allocated/reserved/completed + AM
-    // container.
-    waitForContainersToFinish(5, newAttempt);
+    // 4 containers finished, acquired/allocated/reserved/completed.
+    waitForContainersToFinish(4, newAttempt);
     boolean container3Exists = false, container4Exists = false, container5Exists =
-        false, container6Exists = false, amContainerExists = false;
+        false, container6Exists = false;
     for(ContainerStatus status :  newAttempt.getJustFinishedContainers()) {
-      if(status.getContainerId().equals(amContainerId)) {
-        amContainerExists = true;
-      }
       if(status.getContainerId().equals(containerId3)) {
         // containerId3 is the container ran by previous attempt but finished by the
         // new attempt.
@@ -227,11 +220,8 @@ public class TestAMRestart {
         container6Exists = true;
       }
     }
-    Assert.assertTrue(amContainerExists);
-    Assert.assertTrue(container3Exists);
-    Assert.assertTrue(container4Exists);
-    Assert.assertTrue(container5Exists);
-    Assert.assertTrue(container6Exists);
+    Assert.assertTrue(container3Exists && container4Exists && container5Exists
+        && container6Exists);
 
     // New SchedulerApplicationAttempt also has the containers info.
     rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
@@ -250,14 +240,14 @@ public class TestAMRestart {
     // all 4 normal containers finished.
     System.out.println("New attempt's just finished containers: "
         + newAttempt.getJustFinishedContainers());
-    waitForContainersToFinish(6, newAttempt);
+    waitForContainersToFinish(5, newAttempt);
     rm1.stop();
   }
 
   private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
       throws InterruptedException {
     int count = 0;
-    while (attempt.getJustFinishedContainers().size() < expectedNum
+    while (attempt.getJustFinishedContainers().size() != expectedNum
         && count < 500) {
       Thread.sleep(100);
       count++;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index 15028f9..7f27f4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -965,7 +965,7 @@ public class TestRMAppAttemptTransitions {
     sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.FAILED,
         applicationAttempt.getAppAttemptState());
-    assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
+    assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
     assertEquals(amContainer, applicationAttempt.getMasterContainer());
     assertEquals(0, application.getRanNodes().size());
     String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@@ -1003,7 +1003,7 @@ public class TestRMAppAttemptTransitions {
     sendAttemptUpdateSavedEvent(applicationAttempt);
     assertEquals(RMAppAttemptState.KILLED,
         applicationAttempt.getAppAttemptState());
-    assertEquals(1,applicationAttempt.getJustFinishedContainers().size());
+    assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
     assertEquals(amContainer, applicationAttempt.getMasterContainer());
     assertEquals(0, application.getRanNodes().size());
     String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@@ -1192,7 +1192,7 @@ public class TestRMAppAttemptTransitions {
             BuilderUtils.newContainerStatus(amContainer.getId(),
                 ContainerState.COMPLETE, "", 0), anyNodeId));
     testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
-        diagnostics, 1, false);
+        diagnostics, 0, false);
   }
 
   // While attempt is at FINAL_SAVING, Contaienr_Finished event may come before
@@ -1225,7 +1225,7 @@ public class TestRMAppAttemptTransitions {
     // send attempt_saved
     sendAttemptUpdateSavedEvent(applicationAttempt);
     testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
-      diagnostics, 1, false);
+      diagnostics, 0, false);
   }
 
   // While attempt is at FINAL_SAVING, Expire event may come before
@@ -1381,13 +1381,13 @@ public class TestRMAppAttemptTransitions {
     verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
 
     // failed attempt captured the container finished event.
-    assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
+    assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
     ContainerStatus cs2 =
         ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2),
           ContainerState.COMPLETE, "", 0);
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
       appAttemptId, cs2, anyNodeId));
-    assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
+    assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
     boolean found = false;
     for (ContainerStatus containerStatus:applicationAttempt
         .getJustFinishedContainers()) {