You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/11/19 06:17:21 UTC

svn commit: r1543310 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/sr...

Author: vinodkv
Date: Tue Nov 19 05:17:20 2013
New Revision: 1543310

URL: http://svn.apache.org/r1543310
Log:
YARN-1210. Changed RM to start new app-attempts on RM restart only after ensuring that previous AM exited or after expiry time. Contributed by Omkar Vinit Joshi.

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Nov 19 05:17:20 2013
@@ -103,6 +103,10 @@ Release 2.3.0 - UNRELEASED
     YARN-709. Added tests to verify validity of delegation tokens and logging of
     appsummary after RM restart. (Jian He via vinodkv)
 
+    YARN-1210. Changed RM to start new app-attempts on RM restart only after
+    ensuring that previous AM exited or after expiry time. (Omkar Vinit Joshi via
+    vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java Tue Nov 19 05:17:20 2013
@@ -20,15 +20,29 @@ package org.apache.hadoop.yarn.server.ap
 
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.util.Records;
 
-public interface NodeHeartbeatRequest {
+public abstract class NodeHeartbeatRequest {
+  
+  public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
+      MasterKey lastKnownContainerTokenMasterKey,
+      MasterKey lastKnownNMTokenMasterKey) {
+    NodeHeartbeatRequest nodeHeartbeatRequest =
+        Records.newRecord(NodeHeartbeatRequest.class);
+    nodeHeartbeatRequest.setNodeStatus(nodeStatus);
+    nodeHeartbeatRequest
+        .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
+    nodeHeartbeatRequest
+        .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
+    return nodeHeartbeatRequest;
+  }
 
-  NodeStatus getNodeStatus();
-  void setNodeStatus(NodeStatus status);
+  public abstract NodeStatus getNodeStatus();
+  public abstract void setNodeStatus(NodeStatus status);
 
-  MasterKey getLastKnownContainerTokenMasterKey();
-  void setLastKnownContainerTokenMasterKey(MasterKey secretKey);
+  public abstract MasterKey getLastKnownContainerTokenMasterKey();
+  public abstract void setLastKnownContainerTokenMasterKey(MasterKey secretKey);
   
-  MasterKey getLastKnownNMTokenMasterKey();
-  void setLastKnownNMTokenMasterKey(MasterKey secretKey);
+  public abstract MasterKey getLastKnownNMTokenMasterKey();
+  public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java Tue Nov 19 05:17:20 2013
@@ -18,17 +18,37 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
 
-public interface RegisterNodeManagerRequest {
-  NodeId getNodeId();
-  int getHttpPort();
-  Resource getResource();
-  String getNMVersion();
+public abstract class RegisterNodeManagerRequest {
+  
+  public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
+      int httpPort, Resource resource, String nodeManagerVersionId,
+      List<ContainerStatus> containerStatuses) {
+    RegisterNodeManagerRequest request =
+        Records.newRecord(RegisterNodeManagerRequest.class);
+    request.setHttpPort(httpPort);
+    request.setResource(resource);
+    request.setNodeId(nodeId);
+    request.setNMVersion(nodeManagerVersionId);
+    request.setContainerStatuses(containerStatuses);
+    return request;
+  }
+  
+  public abstract NodeId getNodeId();
+  public abstract int getHttpPort();
+  public abstract Resource getResource();
+  public abstract String getNMVersion();
+  public abstract List<ContainerStatus> getContainerStatuses();
   
-  void setNodeId(NodeId nodeId);
-  void setHttpPort(int port);
-  void setResource(Resource resource);
-  void setNMVersion(String version);
+  public abstract void setNodeId(NodeId nodeId);
+  public abstract void setHttpPort(int port);
+  public abstract void setResource(Resource resource);
+  public abstract void setNMVersion(String version);
+  public abstract void setContainerStatuses(List<ContainerStatus> containerStatuses);
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java Tue Nov 19 05:17:20 2013
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
@@ -29,8 +28,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
 
-public class NodeHeartbeatRequestPBImpl extends
-    ProtoBase<NodeHeartbeatRequestProto> implements NodeHeartbeatRequest {
+public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   NodeHeartbeatRequestProto proto = NodeHeartbeatRequestProto.getDefaultInstance();
   NodeHeartbeatRequestProto.Builder builder = null;
   boolean viaProto = false;
@@ -55,6 +53,21 @@ public class NodeHeartbeatRequestPBImpl 
     return proto;
   }
 
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
   private void mergeLocalToBuilder() {
     if (this.nodeStatus != null) {
       builder.setNodeStatus(convertToProtoFormat(this.nodeStatus));

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java Tue Nov 19 05:17:20 2013
@@ -19,11 +19,21 @@
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
@@ -32,13 +42,14 @@ import org.apache.hadoop.yarn.server.api
 
 
     
-public class RegisterNodeManagerRequestPBImpl extends ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {
+public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
   RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
   RegisterNodeManagerRequestProto.Builder builder = null;
   boolean viaProto = false;
   
   private Resource resource = null;
   private NodeId nodeId = null;
+  private List<ContainerStatus> containerStatuses = null;
   
   public RegisterNodeManagerRequestPBImpl() {
     builder = RegisterNodeManagerRequestProto.newBuilder();
@@ -57,6 +68,9 @@ public class RegisterNodeManagerRequestP
   }
 
   private void mergeLocalToBuilder() {
+    if (this.containerStatuses != null) {
+      addContainerStatusesToProto();
+    }
     if (this.resource != null) {
       builder.setResource(convertToProtoFormat(this.resource));
     }
@@ -140,6 +154,81 @@ public class RegisterNodeManagerRequestP
   }
 
   @Override
+  public List<ContainerStatus> getContainerStatuses() {
+    initContainerStatuses();
+    return containerStatuses;
+  }
+  
+  private void initContainerStatuses() {
+    if (this.containerStatuses != null) {
+      return;
+    }
+    RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<ContainerStatusProto> list = p.getContainerStatusesList();
+    this.containerStatuses = new ArrayList<ContainerStatus>();
+    for (ContainerStatusProto c : list) {
+      this.containerStatuses.add(convertFromProtoFormat(c));
+    }
+  }
+
+  @Override
+  public void setContainerStatuses(List<ContainerStatus> containers) {
+    if (containers == null) {
+      return;
+    }
+    initContainerStatuses();
+    this.containerStatuses.addAll(containers);
+  }
+  
+  private void addContainerStatusesToProto() {
+    maybeInitBuilder();
+    builder.clearContainerStatuses();
+    if (containerStatuses == null) {
+      return;
+    }
+    Iterable<ContainerStatusProto> it = new Iterable<ContainerStatusProto>() {
+      
+      @Override
+      public Iterator<ContainerStatusProto> iterator() {
+        return new Iterator<ContainerStatusProto>() {
+          Iterator<ContainerStatus> iter = containerStatuses.iterator();
+          
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+          
+          @Override
+          public ContainerStatusProto next() {
+            return convertToProtoFormat(iter.next());  
+          }
+          
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+    builder.addAllContainerStatuses(it);
+  }
+  
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+  
+  @Override
   public String getNMVersion() {
     RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
     if (!p.hasNmVersion()) {
@@ -170,6 +259,11 @@ public class RegisterNodeManagerRequestP
     return ((ResourcePBImpl)t).getProto();
   }
 
-
-
-}  
+  private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) {
+    return new ContainerStatusPBImpl(c);
+  }
+  
+  private ContainerStatusProto convertToProtoFormat(ContainerStatus c) {
+    return ((ContainerStatusPBImpl)c).getProto();
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java Tue Nov 19 05:17:20 2013
@@ -22,10 +22,24 @@ import java.util.List;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.Records;
 
 
-public interface NodeStatus {
+public abstract class NodeStatus {
   
+  public static NodeStatus newInstance(NodeId nodeId, int responseId,
+      List<ContainerStatus> containerStatuses,
+      List<ApplicationId> keepAliveApplications,
+      NodeHealthStatus nodeHealthStatus) {
+    NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
+    nodeStatus.setResponseId(responseId);
+    nodeStatus.setNodeId(nodeId);
+    nodeStatus.setContainersStatuses(containerStatuses);
+    nodeStatus.setKeepAliveApplications(keepAliveApplications);
+    nodeStatus.setNodeHealthStatus(nodeHealthStatus);
+    return nodeStatus;
+  }
+
   public abstract NodeId getNodeId();
   public abstract int getResponseId();
   
@@ -36,8 +50,8 @@ public interface NodeStatus {
   public abstract List<ApplicationId> getKeepAliveApplications();
   public abstract void setKeepAliveApplications(List<ApplicationId> appIds);
   
-  NodeHealthStatus getNodeHealthStatus();
-  void setNodeHealthStatus(NodeHealthStatus healthStatus);
+  public abstract NodeHealthStatus getNodeHealthStatus();
+  public abstract void setNodeHealthStatus(NodeHealthStatus healthStatus);
 
   public abstract void setNodeId(NodeId nodeId);
   public abstract void setResponseId(int responseId);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java Tue Nov 19 05:17:20 2013
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@@ -40,8 +39,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
     
 
-public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements
-    NodeStatus {
+public class NodeStatusPBImpl extends NodeStatus {
   NodeStatusProto proto = NodeStatusProto.getDefaultInstance();
   NodeStatusProto.Builder builder = null;
   boolean viaProto = false;
@@ -167,6 +165,21 @@ public class NodeStatusPBImpl extends Pr
   }
 
   @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
   public synchronized int getResponseId() {
     NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
     return p.getResponseId();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto Tue Nov 19 05:17:20 2013
@@ -30,6 +30,7 @@ message RegisterNodeManagerRequestProto 
   optional int32 http_port = 3;
   optional ResourceProto resource = 4;
   optional string nm_version = 5;
+  repeated ContainerStatusProto containerStatuses = 6;
 }
 
 message RegisterNodeManagerResponseProto {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Tue Nov 19 05:17:20 2013
@@ -26,7 +26,7 @@ public interface NodeStatusUpdater exten
 
   void sendOutofBandHeartBeat();
 
-  NodeStatus getNodeStatusAndUpdateContainersInContext();
+  NodeStatus getNodeStatusAndUpdateContainersInContext(int responseId);
 
   long getRMIdentifier();
   

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Nov 19 05:17:20 2013
@@ -48,8 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.ServerRMProxy;
@@ -89,7 +87,6 @@ public class NodeStatusUpdaterImpl exten
   private String nodeManagerVersionId;
   private String minimumResourceManagerVersion;
   private volatile boolean isStopped;
-  private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private boolean tokenKeepAliveEnabled;
   private long tokenRemovalDelayMs;
   /** Keeps track of when the next keep alive request should be sent for an app*/
@@ -134,9 +131,7 @@ public class NodeStatusUpdaterImpl exten
         conf.getInt(
             YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
 
-    this.totalResource = recordFactory.newRecordInstance(Resource.class);
-    this.totalResource.setMemory(memoryMb);
-    this.totalResource.setVirtualCores(virtualCores);
+    this.totalResource = Resource.newInstance(memoryMb, virtualCores);
     metrics.addResource(totalResource);
     this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
     this.tokenRemovalDelayMs =
@@ -238,13 +233,17 @@ public class NodeStatusUpdaterImpl exten
   }
 
   @VisibleForTesting
-  protected void registerWithRM() throws YarnException, IOException {
+  protected void registerWithRM()
+      throws YarnException, IOException {
+    List<ContainerStatus> containerStatuses =
+        this.updateAndGetContainerStatuses();
     RegisterNodeManagerRequest request =
-        recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
-    request.setHttpPort(this.httpPort);
-    request.setResource(this.totalResource);
-    request.setNodeId(this.nodeId);
-    request.setNMVersion(this.nodeManagerVersionId);
+        RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
+          nodeManagerVersionId, containerStatuses);
+    if (containerStatuses != null) {
+      LOG.info("Registering with RM using finished containers :"
+          + containerStatuses);
+    }
     RegisterNodeManagerResponse regNMResponse =
         resourceTracker.registerNodeManager(request);
     this.rmIdentifier = regNMResponse.getRMIdentifier();
@@ -323,13 +322,33 @@ public class NodeStatusUpdaterImpl exten
   }
 
   @Override
-  public NodeStatus getNodeStatusAndUpdateContainersInContext() {
+  public NodeStatus getNodeStatusAndUpdateContainersInContext(
+      int responseId) {
 
-    NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
-    nodeStatus.setNodeId(this.nodeId);
+    NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
+    nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
+    nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
+    nodeHealthStatus.setLastHealthReportTime(
+        healthChecker.getLastHealthReportTime());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+                + ", " + nodeHealthStatus.getHealthReport());
+    }
+    List<ContainerStatus> containersStatuses = updateAndGetContainerStatuses();
+    LOG.debug(this.nodeId + " sending out status for "
+        + containersStatuses.size() + " containers");
+    NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId,
+      containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus);
 
-    int numActiveContainers = 0;
-    List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
+    return nodeStatus;
+  }
+
+  /*
+   * It will return current container statuses. If any container has
+   * COMPLETED then it will be removed from context. 
+   */
+  private List<ContainerStatus> updateAndGetContainerStatuses() {
+    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
     for (Iterator<Entry<ContainerId, Container>> i =
         this.context.getContainers().entrySet().iterator(); i.hasNext();) {
       Entry<ContainerId, Container> e = i.next();
@@ -339,8 +358,7 @@ public class NodeStatusUpdaterImpl exten
       // Clone the container to send it to the RM
       org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = 
           container.cloneAndGetContainerStatus();
-      containersStatuses.add(containerStatus);
-      ++numActiveContainers;
+      containerStatuses.add(containerStatus);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Sending out status for container: " + containerStatus);
       }
@@ -356,26 +374,7 @@ public class NodeStatusUpdaterImpl exten
         LOG.info("Removed completed container " + containerId);
       }
     }
-    nodeStatus.setContainersStatuses(containersStatuses);
-
-    LOG.debug(this.nodeId + " sending out status for "
-        + numActiveContainers + " containers");
-
-    NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
-    nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
-    nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
-    nodeHealthStatus.setLastHealthReportTime(
-        healthChecker.getLastHealthReportTime());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
-                + ", " + nodeHealthStatus.getHealthReport());
-    }
-    nodeStatus.setNodeHealthStatus(nodeHealthStatus);
-
-    List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
-    nodeStatus.setKeepAliveApplications(keepAliveAppIds);
-    
-    return nodeStatus;
+    return containerStatuses;
   }
 
   private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@@ -458,18 +457,15 @@ public class NodeStatusUpdaterImpl exten
           // Send heartbeat
           try {
             NodeHeartbeatResponse response = null;
-            NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext();
-            nodeStatus.setResponseId(lastHeartBeatID);
+            NodeStatus nodeStatus =
+                getNodeStatusAndUpdateContainersInContext(lastHeartBeatID);
             
-            NodeHeartbeatRequest request = recordFactory
-                .newRecordInstance(NodeHeartbeatRequest.class);
-            request.setNodeStatus(nodeStatus);
-            request
-              .setLastKnownContainerTokenMasterKey(NodeStatusUpdaterImpl.this.context
-                .getContainerTokenSecretManager().getCurrentKey());
-            request
-              .setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
-                .getNMTokenSecretManager().getCurrentKey());
+            NodeHeartbeatRequest request =
+                NodeHeartbeatRequest.newInstance(nodeStatus,
+                  NodeStatusUpdaterImpl.this.context
+                    .getContainerTokenSecretManager().getCurrentKey(),
+                  NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
+                    .getCurrentKey());
             response = resourceTracker.nodeHeartbeat(request);
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Nov 19 05:17:20 2013
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.SerializedException;
@@ -371,17 +373,31 @@ public class ContainerManagerImpl extend
 
     this.handle(new CMgrCompletedContainersEvent(containerIds,
       CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC));
-    while (!containers.isEmpty()) {
-      try {
-        Thread.sleep(1000);
-        nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
-      } catch (InterruptedException ex) {
-        LOG.warn("Interrupted while sleeping on container kill on resync", ex);
+
+    /*
+     * We will wait till all the containers change their state to COMPLETE. We
+     * will not remove the container statuses from nm context because these
+     * are used while re-registering node manager with resource manager.
+     */
+    boolean allContainersCompleted = false;
+    while (!containers.isEmpty() && !allContainersCompleted) {
+      allContainersCompleted = true;
+      for (Entry<ContainerId, Container> container : containers.entrySet()) {
+        if (((ContainerImpl) container.getValue()).getCurrentState()
+            != ContainerState.COMPLETE) {
+          allContainersCompleted = false;
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ex) {
+            LOG.warn("Interrupted while sleeping on container kill on resync",
+              ex);
+          }
+          break;
+        }
       }
     }
-
     // All containers killed
-    if (containers.isEmpty()) {
+    if (allContainersCompleted) {
       LOG.info("All containers in DONE state");
     } else {
       LOG.info("Done waiting for containers to be killed. Still alive: " +

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Tue Nov 19 05:17:20 2013
@@ -302,7 +302,7 @@ public class ContainerImpl implements Co
   private final StateMachine<ContainerState, ContainerEventType, ContainerEvent>
     stateMachine;
 
-  private org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
+  public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
     switch (stateMachine.getCurrentState()) {
     case NEW:
     case LOCALIZING:

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Tue Nov 19 05:17:20 2013
@@ -29,6 +29,10 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.VersionUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
@@ -46,14 +50,17 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
@@ -183,6 +190,33 @@ public class ResourceTrackerService exte
     Resource capability = request.getResource();
     String nodeManagerVersion = request.getNMVersion();
 
+    if (!request.getContainerStatuses().isEmpty()) {
+      LOG.info("received container statuses on node manager register :"
+          + request.getContainerStatuses());
+      for (ContainerStatus containerStatus : request.getContainerStatuses()) {
+        ApplicationAttemptId appAttemptId =
+            containerStatus.getContainerId().getApplicationAttemptId();
+        RMApp rmApp =
+            rmContext.getRMApps().get(appAttemptId.getApplicationId());
+        if (rmApp != null) {
+          RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
+          if (rmAppAttempt.getMasterContainer().getId()
+              .equals(containerStatus.getContainerId())
+              && containerStatus.getState() == ContainerState.COMPLETE) {
+            // sending master container finished event.
+            RMAppAttemptContainerFinishedEvent evt =
+                new RMAppAttemptContainerFinishedEvent(appAttemptId,
+                    containerStatus);
+            rmContext.getDispatcher().getEventHandler().handle(evt);
+          }
+        } else {
+          LOG.error("Received finished container :"
+              + containerStatus.getContainerId()
+              + " for non existing application :"
+              + appAttemptId.getApplicationId());
+        }
+      }
+    }
     RegisterNodeManagerResponse response = recordFactory
         .newRecordInstance(RegisterNodeManagerResponse.class);
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Nov 19 05:17:20 2013
@@ -132,8 +132,8 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
         RMAppEventType.START, new RMAppSavingTransition())
     .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
-            RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED,
-            RMAppState.FINAL_SAVING),
+            RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED,
+            RMAppState.KILLED, RMAppState.FINAL_SAVING),
         RMAppEventType.RECOVER, new RMAppRecoveredTransition())
     .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL,
         new FinalSavingTransition(
@@ -611,11 +611,11 @@ public class RMAppImpl implements RMApp,
     this.diagnostics.append(appState.getDiagnostics());
     this.storedFinishTime = appState.getFinishTime();
     this.startTime = appState.getStartTime();
+
     for(int i=0; i<appState.getAttemptCount(); ++i) {
       // create attempt
       createNewAttempt(false);
-      // recover attempt
-      ((RMAppAttemptImpl) currentAttempt).recover(state);
+      ((RMAppAttemptImpl)this.currentAttempt).recover(state);
     }
   }
 
@@ -656,30 +656,35 @@ public class RMAppImpl implements RMApp,
     };
   }
 
+  @SuppressWarnings("unchecked")
   private static final class RMAppRecoveredTransition implements
       MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+    
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
 
-      if (app.recoveredFinalState != null) {
-        FINAL_TRANSITION.transition(app, event);
-        return app.recoveredFinalState;
-      }
-      // Directly call AttemptFailedTransition, since now we deem that an
-      // application fails because of RM restart as a normal AM failure.
-
-      // Do not recover unmanaged applications since current recovery 
-      // mechanism of restarting attempts does not work for them.
-      // This will need to be changed in work preserving recovery in which 
-      // RM will re-connect with the running AM's instead of restarting them
-
-      // In work-preserve restart, if attemptCount == maxAttempts, the job still
-      // needs to be recovered because the last attempt may still be running.
-
-      // As part of YARN-1210, we may return ACCECPTED state waiting for AM to
-      // reregister or fail and remove the following code.
-      return new AttemptFailedTransition(RMAppState.SUBMITTED).transition(app,
-        event);
+      if (app.attempts.isEmpty()) {
+        // Saved application was not running any attempts.
+        app.createNewAttempt(true);
+        return RMAppState.SUBMITTED;        
+      } else {
+        /*
+         * If last attempt recovered final state is null .. it means attempt
+         * was started but AM container may or may not have started / finished.
+         * Therefore we should wait for it to finish.
+         */
+        for (RMAppAttempt attempt : app.getAppAttempts().values()) {
+          app.dispatcher.getEventHandler().handle(
+              new RMAppAttemptEvent(attempt.getAppAttemptId(),
+                  RMAppAttemptEventType.RECOVER));
+        }        
+        if (app.recoveredFinalState != null) {
+          FINAL_TRANSITION.transition(app, event);
+          return app.recoveredFinalState;
+        } else {
+          return RMAppState.RUNNING;
+        }
+      }
     }
   }
 
@@ -1017,4 +1022,10 @@ public class RMAppImpl implements RMApp,
       throw new YarnRuntimeException("Unknown state passed!");
     }
   }
+  
+  public static boolean isAppInFinalState(RMApp rmApp) {
+    RMAppState appState = rmApp.getState();
+    return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
+        || appState == RMAppState.KILLED;
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Tue Nov 19 05:17:20 2013
@@ -68,11 +68,14 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
@@ -179,7 +182,7 @@ public class RMAppAttemptImpl implements
             new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
       .addTransition( RMAppAttemptState.NEW,
           EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED,
-            RMAppAttemptState.FAILED, RMAppAttemptState.RECOVERED),
+            RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED),
           RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
           
       // Transitions from SUBMITTED state
@@ -386,25 +389,6 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.KILL,
               RMAppAttemptEventType.STATUS_UPDATE))
-              
-      // Transitions from RECOVERED State
-      .addTransition(
-          RMAppAttemptState.RECOVERED,
-          RMAppAttemptState.RECOVERED,
-          EnumSet.of(RMAppAttemptEventType.START,
-              RMAppAttemptEventType.APP_ACCEPTED,
-              RMAppAttemptEventType.APP_REJECTED,
-              RMAppAttemptEventType.EXPIRE,
-              RMAppAttemptEventType.LAUNCHED,
-              RMAppAttemptEventType.LAUNCH_FAILED,
-              RMAppAttemptEventType.REGISTERED,
-              RMAppAttemptEventType.CONTAINER_ALLOCATED,
-              RMAppAttemptEventType.CONTAINER_ACQUIRED,
-              RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
-              RMAppAttemptEventType.CONTAINER_FINISHED,
-              RMAppAttemptEventType.UNREGISTERED,
-              RMAppAttemptEventType.KILL,
-              RMAppAttemptEventType.STATUS_UPDATE))
     .installTopology();
 
   public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
@@ -694,8 +678,6 @@ public class RMAppAttemptImpl implements
     this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
     this.finalStatus = attemptState.getFinalApplicationStatus();
     this.startTime = attemptState.getStartTime();
-    handle(new RMAppAttemptEvent(getAppAttemptId(),
-      RMAppAttemptEventType.RECOVER));
   }
 
   private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
@@ -865,11 +847,38 @@ public class RMAppAttemptImpl implements
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
+      LOG.info("Recovering attempt :  recoverdFinalState :"
+          + appAttempt.recoveredFinalState);
       if (appAttempt.recoveredFinalState != null) {
         appAttempt.progress = 1.0f;
+        RMApp rmApp =appAttempt.rmContext.getRMApps().get(
+            appAttempt.getAppAttemptId().getApplicationId());
+        // We will replay the final attempt only if last attempt is in final
+        // state but application is not in final state.
+        if (rmApp.getCurrentAppAttempt() == appAttempt
+            && !RMAppImpl.isAppInFinalState(rmApp)) {
+          (new BaseFinalTransition(appAttempt.recoveredFinalState)).transition(
+              appAttempt, event);
+        }
         return appAttempt.recoveredFinalState;
       } else {
-        return RMAppAttemptState.RECOVERED;
+        /*
+         * Since the application attempt's final state is not saved that means
+         * for AM container (previous attempt) state must be one of these.
+         * 1) AM container may not have been launched (RM failed right before
+         * this).
+         * 2) AM container was successfully launched but may or may not have
+         * registered / unregistered.
+         * In whichever case we will wait (by moving attempt into LAUNCHED
+         * state) and mark this attempt failed (assuming non work preserving
+         * restart) only after
+         * 1) Node manager during re-registration heart beats back saying
+         * am container finished.
+         * 2) OR AMLivelinessMonitor expires this attempt (when am doesn't
+         * heart beat back).  
+         */
+        (new AMLaunchedTransition()).transition(appAttempt, event);
+        return RMAppAttemptState.LAUNCHED;
       }
     }
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java Tue Nov 19 05:17:20 2013
@@ -20,6 +20,5 @@ package org.apache.hadoop.yarn.server.re
 
 public enum RMAppAttemptState {
   NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING, 
-  FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED,
-  FINAL_SAVING
+  FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, FINAL_SAVING
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Tue Nov 19 05:17:20 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -98,21 +100,27 @@ public class MockNM {
   }
 
   public RegisterNodeManagerResponse registerNode() throws Exception {
+    return registerNode(null);
+  }
+
+  public RegisterNodeManagerResponse registerNode(
+      List<ContainerStatus> containerStatus) throws Exception{
     RegisterNodeManagerRequest req = Records.newRecord(
         RegisterNodeManagerRequest.class);
     req.setNodeId(nodeId);
     req.setHttpPort(httpPort);
     Resource resource = BuilderUtils.newResource(memory, vCores);
     req.setResource(resource);
+    req.setContainerStatuses(containerStatus);
     req.setNMVersion(version);
     RegisterNodeManagerResponse registrationResponse =
         resourceTracker.registerNodeManager(req);
     this.currentContainerTokenMasterKey =
         registrationResponse.getContainerTokenMasterKey();
     this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
-    return registrationResponse;
+    return registrationResponse;    
   }
-
+  
   public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
     return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
         isHealthy, ++responseId);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Nov 19 05:17:20 2013
@@ -34,6 +34,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -62,10 +63,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -88,6 +93,7 @@ import org.apache.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mortbay.log.Log;
 
 public class TestRMRestart {
 
@@ -109,6 +115,7 @@ public class TestRMRestart {
     Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
   }
 
+  @SuppressWarnings("rawtypes")
   @Test (timeout=180000)
   public void testRMRestart() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
@@ -257,11 +264,14 @@ public class TestRMRestart {
         .getApplicationId());
     
     // verify state machine kicked into expected states
-    rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
+    rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.RUNNING);
     rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED);
     
-    // verify new attempts created
-    Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
+    // verify attempts for apps
+    // The app for which AM was started will wait for previous am
+    // container finish event to arrive. However for an application for which
+    // no am container was running will start new application attempt.
+    Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
     Assert.assertEquals(1, loadedApp2.getAppAttempts().size());
     
     // verify old AM is not accepted
@@ -279,8 +289,20 @@ public class TestRMRestart {
     Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
     
     // new NM to represent NM re-register
-    nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
-    nm2 = rm2.registerNode("127.0.0.2:5678", 15120);
+    nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
+    nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
+
+    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
+    ContainerStatus containerStatus =
+        BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
+            .getCurrentAppAttempt().getAppAttemptId(), 1),
+            ContainerState.COMPLETE, "Killed AM container", 143);
+    containerStatuses.add(containerStatus);
+    nm1.registerNode(containerStatuses);
+    nm2.registerNode();
+    
+    rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(2, loadedApp1.getAppAttempts().size());    
 
     // verify no more reboot response sent
     hbResponse = nm1.nodeHeartbeat(true);
@@ -404,6 +426,157 @@ public class TestRMRestart {
   }
 
   @Test
+  public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
+    // testing 3 cases
+    // After RM restarts
+    // 1) New application attempt is not started until previous AM container
+    // finish event is reported back to RM as a part of nm registration.
+    // 2) If previous AM container finish event is never reported back (i.e.
+    // node manager on which this AM container was running also went down) in
+    // that case AMLivenessMonitor should time out previous attempt and start
+    // new attempt.
+    // 3) If all the stored attempts had finished then new attempt should
+    // be started immediately.
+    YarnConfiguration conf = new YarnConfiguration(this.conf);
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 40);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+    Map<ApplicationId, ApplicationState> rmAppState =
+        rmState.getApplicationState();
+    
+    // start RM
+    final MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService());
+    nm1.registerNode();
+     
+    // submitting app
+    RMApp app1 = rm1.submitApp(200);
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    MockAM am1 = launchAM(app1, rm1, nm1);
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    // Fail first AM.
+    am1.waitForState(RMAppAttemptState.FAILED);
+    
+    // launch another AM.
+    MockAM am2 = launchAM(app1, rm1, nm1);
+    
+    Assert.assertEquals(1, rmAppState.size());
+    Assert.assertEquals(app1.getState(), RMAppState.RUNNING);
+    Assert.assertEquals(app1.getAppAttempts()
+        .get(app1.getCurrentAppAttempt().getAppAttemptId())
+        .getAppAttemptState(), RMAppAttemptState.RUNNING);
+
+    //  start new RM.
+    MockRM rm2 = null;
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    NodeHeartbeatResponse res = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.RESYNC, res.getNodeAction());
+    
+    RMApp rmApp = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+    // application should be in running state
+    rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+    
+    Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+    // new attempt should not be started
+    Assert.assertEquals(2, rmApp.getAppAttempts().size());
+    // am1 attempt should be in FAILED state where as am2 attempt should be in
+    // LAUNCHED state
+    Assert.assertEquals(RMAppAttemptState.FAILED,
+        rmApp.getAppAttempts().get(am1.getApplicationAttemptId())
+            .getAppAttemptState());
+    Assert.assertEquals(RMAppAttemptState.LAUNCHED,
+        rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
+            .getAppAttemptState());
+    
+    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
+    ContainerStatus containerStatus =
+        BuilderUtils.newContainerStatus(
+            BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1),
+            ContainerState.COMPLETE, "Killed AM container", 143);
+    containerStatuses.add(containerStatus);
+    nm1.registerNode(containerStatuses);
+    rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+    launchAM(rmApp, rm2, nm1);
+    Assert.assertEquals(3, rmApp.getAppAttempts().size());
+    rm2.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
+        RMAppAttemptState.RUNNING);
+    // Now restart RM ...
+    // Setting AMLivelinessMonitor interval to be 10 Secs. 
+    conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
+    MockRM rm3 = null;
+    rm3 = new MockRM(conf, memStore);
+    rm3.start();
+    
+    // Wait for RM to process all the events as a part of rm recovery.
+    nm1.setResourceTrackerService(rm3.getResourceTrackerService());
+    
+    rmApp = rm3.getRMContext().getRMApps().get(app1.getApplicationId());
+    // application should be in running state
+    rm3.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+    Assert.assertEquals(rmApp.getState(), RMAppState.RUNNING);
+    // new attempt should not be started
+    Assert.assertEquals(3, rmApp.getAppAttempts().size());
+    // am1 and am2 attempts should be in FAILED state where as am3 should be
+    // in LAUNCHED state
+    Assert.assertEquals(RMAppAttemptState.FAILED,
+        rmApp.getAppAttempts().get(am1.getApplicationAttemptId())
+            .getAppAttemptState());
+    Assert.assertEquals(RMAppAttemptState.FAILED,
+        rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
+            .getAppAttemptState());
+    ApplicationAttemptId latestAppAttemptId =
+        rmApp.getCurrentAppAttempt().getAppAttemptId();
+    Assert.assertEquals(RMAppAttemptState.LAUNCHED,rmApp.getAppAttempts()
+        .get(latestAppAttemptId).getAppAttemptState());
+    
+    rm3.waitForState(latestAppAttemptId, RMAppAttemptState.FAILED);
+    rm3.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(4, rmApp.getAppAttempts().size());
+    Assert.assertEquals(RMAppAttemptState.FAILED,
+        rmApp.getAppAttempts().get(latestAppAttemptId).getAppAttemptState());
+    
+    latestAppAttemptId = rmApp.getCurrentAppAttempt().getAppAttemptId();
+    
+    // The 4th attempt has started but is not yet saved into RMStateStore
+    // It will be saved only when we launch AM.
+
+    // submitting app but not starting AM for it.
+    RMApp app2 = rm3.submitApp(200);
+    rm3.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(1, app2.getAppAttempts().size());
+    Assert.assertEquals(0,
+        memStore.getState().getApplicationState().get(app2.getApplicationId())
+            .getAttemptCount());
+
+    MockRM rm4 = null;
+    rm4 = new MockRM(conf, memStore);
+    rm4.start();
+    
+    rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
+    rm4.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(4, rmApp.getAppAttempts().size());
+    Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
+    Assert.assertEquals(RMAppAttemptState.SCHEDULED, rmApp.getAppAttempts()
+        .get(latestAppAttemptId).getAppAttemptState());
+    
+    // The initial application for which an AM was not started should be in
+    // ACCEPTED state with one application attempt started.
+    app2 = rm4.getRMContext().getRMApps().get(app2.getApplicationId());
+    rm4.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(RMAppState.ACCEPTED, app2.getState());
+    Assert.assertEquals(1, app2.getAppAttempts().size());
+    Assert.assertEquals(RMAppAttemptState.SCHEDULED, app2
+        .getCurrentAppAttempt().getAppAttemptState());
+
+  }
+
+  @Test
   public void testRMRestartFailedApp() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
     MemoryRMStateStore memStore = new MemoryRMStateStore();
@@ -736,6 +909,8 @@ public class TestRMRestart {
     Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), 
                         attemptState.getMasterContainer().getId());
 
+    // Setting AMLivelinessMonitor interval to be 10 Secs. 
+    conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
     // start new RM   
     MockRM rm2 = new MockRM(conf, memStore);
     rm2.start();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Tue Nov 19 05:17:20 2013
@@ -372,10 +372,10 @@ public class TestRMAppAttemptTransitions
   }
   
   /**
-   * {@link RMAppAttemptState#RECOVERED}
+   * {@link RMAppAttemptState#LAUNCHED}
    */
   private void testAppAttemptRecoveredState() {
-    assertEquals(RMAppAttemptState.RECOVERED, 
+    assertEquals(RMAppAttemptState.LAUNCHED, 
         applicationAttempt.getAppAttemptState());
   }