You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ar...@apache.org on 2013/11/19 18:26:26 UTC

svn commit: r1543510 [1/2] - in /hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/ya...

Author: arp
Date: Tue Nov 19 17:26:23 2013
New Revision: 1543510

URL: http://svn.apache.org/r1543510
Log:
Merging r1543111 through r1543509 from trunk to branch HDFS-2832

Added:
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/SchedulerPageUtil.java
      - copied unchanged from r1543509, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/SchedulerPageUtil.java
Modified:
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt Tue Nov 19 17:26:23 2013
@@ -100,6 +100,20 @@ Release 2.3.0 - UNRELEASED
     YARN-1222. Make improvements in ZKRMStateStore for fencing (Karthik
     Kambatla via bikas)
 
+    YARN-709. Added tests to verify validity of delegation tokens and logging of
+    appsummary after RM restart. (Jian He via vinodkv)
+
+    YARN-1210. Changed RM to start new app-attempts on RM restart only after
+    ensuring that previous AM exited or after expiry time. (Omkar Vinit Joshi via
+    vinodkv)
+
+    YARN-674. Fixed ResourceManager to renew DelegationTokens on submission
+    asynchronously to work around potential slowness in state-store. (Omkar Vinit
+    Joshi via vinodkv)
+
+    YARN-584. In scheduler web UIs, queues unexpand on refresh. (Harshit
+    Daga via Sandy Ryza)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -139,6 +153,9 @@ Release 2.3.0 - UNRELEASED
     YARN-1411. HA config shouldn't affect NodeManager RPC addresses (Karthik
     Kambatla via bikas)
 
+    YARN-1419. TestFifoScheduler.testAppAttemptMetrics fails intermittently
+    under jdk7 (Jonathan Eagles via jlowe)
+
 Release 2.2.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Tue Nov 19 17:26:23 2013
@@ -504,6 +504,11 @@ public class YarnConfiguration extends C
       RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
   public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
       30000l;
+  
+  /** Delegation Token renewer thread count */
+  public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT =
+      RM_PREFIX + "delegation-token-renewer.thread-count";
+  public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
 
   /** Whether to enable log aggregation */
   public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java Tue Nov 19 17:26:23 2013
@@ -20,15 +20,29 @@ package org.apache.hadoop.yarn.server.ap
 
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.util.Records;
 
-public interface NodeHeartbeatRequest {
+public abstract class NodeHeartbeatRequest {
+  
+  public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
+      MasterKey lastKnownContainerTokenMasterKey,
+      MasterKey lastKnownNMTokenMasterKey) {
+    NodeHeartbeatRequest nodeHeartbeatRequest =
+        Records.newRecord(NodeHeartbeatRequest.class);
+    nodeHeartbeatRequest.setNodeStatus(nodeStatus);
+    nodeHeartbeatRequest
+        .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
+    nodeHeartbeatRequest
+        .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
+    return nodeHeartbeatRequest;
+  }
 
-  NodeStatus getNodeStatus();
-  void setNodeStatus(NodeStatus status);
+  public abstract NodeStatus getNodeStatus();
+  public abstract void setNodeStatus(NodeStatus status);
 
-  MasterKey getLastKnownContainerTokenMasterKey();
-  void setLastKnownContainerTokenMasterKey(MasterKey secretKey);
+  public abstract MasterKey getLastKnownContainerTokenMasterKey();
+  public abstract void setLastKnownContainerTokenMasterKey(MasterKey secretKey);
   
-  MasterKey getLastKnownNMTokenMasterKey();
-  void setLastKnownNMTokenMasterKey(MasterKey secretKey);
+  public abstract MasterKey getLastKnownNMTokenMasterKey();
+  public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java Tue Nov 19 17:26:23 2013
@@ -18,17 +18,37 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
 
-public interface RegisterNodeManagerRequest {
-  NodeId getNodeId();
-  int getHttpPort();
-  Resource getResource();
-  String getNMVersion();
+public abstract class RegisterNodeManagerRequest {
+  
+  public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
+      int httpPort, Resource resource, String nodeManagerVersionId,
+      List<ContainerStatus> containerStatuses) {
+    RegisterNodeManagerRequest request =
+        Records.newRecord(RegisterNodeManagerRequest.class);
+    request.setHttpPort(httpPort);
+    request.setResource(resource);
+    request.setNodeId(nodeId);
+    request.setNMVersion(nodeManagerVersionId);
+    request.setContainerStatuses(containerStatuses);
+    return request;
+  }
+  
+  public abstract NodeId getNodeId();
+  public abstract int getHttpPort();
+  public abstract Resource getResource();
+  public abstract String getNMVersion();
+  public abstract List<ContainerStatus> getContainerStatuses();
   
-  void setNodeId(NodeId nodeId);
-  void setHttpPort(int port);
-  void setResource(Resource resource);
-  void setNMVersion(String version);
+  public abstract void setNodeId(NodeId nodeId);
+  public abstract void setHttpPort(int port);
+  public abstract void setResource(Resource resource);
+  public abstract void setNMVersion(String version);
+  public abstract void setContainerStatuses(List<ContainerStatus> containerStatuses);
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java Tue Nov 19 17:26:23 2013
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
@@ -29,8 +28,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
 
-public class NodeHeartbeatRequestPBImpl extends
-    ProtoBase<NodeHeartbeatRequestProto> implements NodeHeartbeatRequest {
+public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   NodeHeartbeatRequestProto proto = NodeHeartbeatRequestProto.getDefaultInstance();
   NodeHeartbeatRequestProto.Builder builder = null;
   boolean viaProto = false;
@@ -55,6 +53,21 @@ public class NodeHeartbeatRequestPBImpl 
     return proto;
   }
 
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
   private void mergeLocalToBuilder() {
     if (this.nodeStatus != null) {
       builder.setNodeStatus(convertToProtoFormat(this.nodeStatus));

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java Tue Nov 19 17:26:23 2013
@@ -19,11 +19,21 @@
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
@@ -32,13 +42,14 @@ import org.apache.hadoop.yarn.server.api
 
 
     
-public class RegisterNodeManagerRequestPBImpl extends ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {
+public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
   RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
   RegisterNodeManagerRequestProto.Builder builder = null;
   boolean viaProto = false;
   
   private Resource resource = null;
   private NodeId nodeId = null;
+  private List<ContainerStatus> containerStatuses = null;
   
   public RegisterNodeManagerRequestPBImpl() {
     builder = RegisterNodeManagerRequestProto.newBuilder();
@@ -57,6 +68,9 @@ public class RegisterNodeManagerRequestP
   }
 
   private void mergeLocalToBuilder() {
+    if (this.containerStatuses != null) {
+      addContainerStatusesToProto();
+    }
     if (this.resource != null) {
       builder.setResource(convertToProtoFormat(this.resource));
     }
@@ -140,6 +154,81 @@ public class RegisterNodeManagerRequestP
   }
 
   @Override
+  public List<ContainerStatus> getContainerStatuses() {
+    initContainerStatuses();
+    return containerStatuses;
+  }
+  
+  private void initContainerStatuses() {
+    if (this.containerStatuses != null) {
+      return;
+    }
+    RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<ContainerStatusProto> list = p.getContainerStatusesList();
+    this.containerStatuses = new ArrayList<ContainerStatus>();
+    for (ContainerStatusProto c : list) {
+      this.containerStatuses.add(convertFromProtoFormat(c));
+    }
+  }
+
+  @Override
+  public void setContainerStatuses(List<ContainerStatus> containers) {
+    if (containers == null) {
+      return;
+    }
+    initContainerStatuses();
+    this.containerStatuses.addAll(containers);
+  }
+  
+  private void addContainerStatusesToProto() {
+    maybeInitBuilder();
+    builder.clearContainerStatuses();
+    if (containerStatuses == null) {
+      return;
+    }
+    Iterable<ContainerStatusProto> it = new Iterable<ContainerStatusProto>() {
+      
+      @Override
+      public Iterator<ContainerStatusProto> iterator() {
+        return new Iterator<ContainerStatusProto>() {
+          Iterator<ContainerStatus> iter = containerStatuses.iterator();
+          
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+          
+          @Override
+          public ContainerStatusProto next() {
+            return convertToProtoFormat(iter.next());  
+          }
+          
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+    builder.addAllContainerStatuses(it);
+  }
+  
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+  
+  @Override
   public String getNMVersion() {
     RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
     if (!p.hasNmVersion()) {
@@ -170,6 +259,11 @@ public class RegisterNodeManagerRequestP
     return ((ResourcePBImpl)t).getProto();
   }
 
-
-
-}  
+  private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) {
+    return new ContainerStatusPBImpl(c);
+  }
+  
+  private ContainerStatusProto convertToProtoFormat(ContainerStatus c) {
+    return ((ContainerStatusPBImpl)c).getProto();
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java Tue Nov 19 17:26:23 2013
@@ -22,10 +22,24 @@ import java.util.List;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.Records;
 
 
-public interface NodeStatus {
+public abstract class NodeStatus {
   
+  public static NodeStatus newInstance(NodeId nodeId, int responseId,
+      List<ContainerStatus> containerStatuses,
+      List<ApplicationId> keepAliveApplications,
+      NodeHealthStatus nodeHealthStatus) {
+    NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
+    nodeStatus.setResponseId(responseId);
+    nodeStatus.setNodeId(nodeId);
+    nodeStatus.setContainersStatuses(containerStatuses);
+    nodeStatus.setKeepAliveApplications(keepAliveApplications);
+    nodeStatus.setNodeHealthStatus(nodeHealthStatus);
+    return nodeStatus;
+  }
+
   public abstract NodeId getNodeId();
   public abstract int getResponseId();
   
@@ -36,8 +50,8 @@ public interface NodeStatus {
   public abstract List<ApplicationId> getKeepAliveApplications();
   public abstract void setKeepAliveApplications(List<ApplicationId> appIds);
   
-  NodeHealthStatus getNodeHealthStatus();
-  void setNodeHealthStatus(NodeHealthStatus healthStatus);
+  public abstract NodeHealthStatus getNodeHealthStatus();
+  public abstract void setNodeHealthStatus(NodeHealthStatus healthStatus);
 
   public abstract void setNodeId(NodeId nodeId);
   public abstract void setResponseId(int responseId);

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java Tue Nov 19 17:26:23 2013
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@@ -40,8 +39,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
     
 
-public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements
-    NodeStatus {
+public class NodeStatusPBImpl extends NodeStatus {
   NodeStatusProto proto = NodeStatusProto.getDefaultInstance();
   NodeStatusProto.Builder builder = null;
   boolean viaProto = false;
@@ -167,6 +165,21 @@ public class NodeStatusPBImpl extends Pr
   }
 
   @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
   public synchronized int getResponseId() {
     NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
     return p.getResponseId();

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto Tue Nov 19 17:26:23 2013
@@ -30,6 +30,7 @@ message RegisterNodeManagerRequestProto 
   optional int32 http_port = 3;
   optional ResourceProto resource = 4;
   optional string nm_version = 5;
+  repeated ContainerStatusProto containerStatuses = 6;
 }
 
 message RegisterNodeManagerResponseProto {

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Tue Nov 19 17:26:23 2013
@@ -26,7 +26,7 @@ public interface NodeStatusUpdater exten
 
   void sendOutofBandHeartBeat();
 
-  NodeStatus getNodeStatusAndUpdateContainersInContext();
+  NodeStatus getNodeStatusAndUpdateContainersInContext(int responseId);
 
   long getRMIdentifier();
   

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Nov 19 17:26:23 2013
@@ -48,8 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.ServerRMProxy;
@@ -89,7 +87,6 @@ public class NodeStatusUpdaterImpl exten
   private String nodeManagerVersionId;
   private String minimumResourceManagerVersion;
   private volatile boolean isStopped;
-  private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private boolean tokenKeepAliveEnabled;
   private long tokenRemovalDelayMs;
   /** Keeps track of when the next keep alive request should be sent for an app*/
@@ -134,9 +131,7 @@ public class NodeStatusUpdaterImpl exten
         conf.getInt(
             YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
 
-    this.totalResource = recordFactory.newRecordInstance(Resource.class);
-    this.totalResource.setMemory(memoryMb);
-    this.totalResource.setVirtualCores(virtualCores);
+    this.totalResource = Resource.newInstance(memoryMb, virtualCores);
     metrics.addResource(totalResource);
     this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
     this.tokenRemovalDelayMs =
@@ -238,13 +233,17 @@ public class NodeStatusUpdaterImpl exten
   }
 
   @VisibleForTesting
-  protected void registerWithRM() throws YarnException, IOException {
+  protected void registerWithRM()
+      throws YarnException, IOException {
+    List<ContainerStatus> containerStatuses =
+        this.updateAndGetContainerStatuses();
     RegisterNodeManagerRequest request =
-        recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
-    request.setHttpPort(this.httpPort);
-    request.setResource(this.totalResource);
-    request.setNodeId(this.nodeId);
-    request.setNMVersion(this.nodeManagerVersionId);
+        RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
+          nodeManagerVersionId, containerStatuses);
+    if (containerStatuses != null) {
+      LOG.info("Registering with RM using finished containers :"
+          + containerStatuses);
+    }
     RegisterNodeManagerResponse regNMResponse =
         resourceTracker.registerNodeManager(request);
     this.rmIdentifier = regNMResponse.getRMIdentifier();
@@ -323,13 +322,33 @@ public class NodeStatusUpdaterImpl exten
   }
 
   @Override
-  public NodeStatus getNodeStatusAndUpdateContainersInContext() {
+  public NodeStatus getNodeStatusAndUpdateContainersInContext(
+      int responseId) {
 
-    NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
-    nodeStatus.setNodeId(this.nodeId);
+    NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
+    nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
+    nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
+    nodeHealthStatus.setLastHealthReportTime(
+        healthChecker.getLastHealthReportTime());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+                + ", " + nodeHealthStatus.getHealthReport());
+    }
+    List<ContainerStatus> containersStatuses = updateAndGetContainerStatuses();
+    LOG.debug(this.nodeId + " sending out status for "
+        + containersStatuses.size() + " containers");
+    NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId,
+      containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus);
 
-    int numActiveContainers = 0;
-    List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
+    return nodeStatus;
+  }
+
+  /*
+   * It will return current container statuses. If any container has
+   * COMPLETED then it will be removed from context. 
+   */
+  private List<ContainerStatus> updateAndGetContainerStatuses() {
+    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
     for (Iterator<Entry<ContainerId, Container>> i =
         this.context.getContainers().entrySet().iterator(); i.hasNext();) {
       Entry<ContainerId, Container> e = i.next();
@@ -339,8 +358,7 @@ public class NodeStatusUpdaterImpl exten
       // Clone the container to send it to the RM
       org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = 
           container.cloneAndGetContainerStatus();
-      containersStatuses.add(containerStatus);
-      ++numActiveContainers;
+      containerStatuses.add(containerStatus);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Sending out status for container: " + containerStatus);
       }
@@ -356,26 +374,7 @@ public class NodeStatusUpdaterImpl exten
         LOG.info("Removed completed container " + containerId);
       }
     }
-    nodeStatus.setContainersStatuses(containersStatuses);
-
-    LOG.debug(this.nodeId + " sending out status for "
-        + numActiveContainers + " containers");
-
-    NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
-    nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
-    nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
-    nodeHealthStatus.setLastHealthReportTime(
-        healthChecker.getLastHealthReportTime());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
-                + ", " + nodeHealthStatus.getHealthReport());
-    }
-    nodeStatus.setNodeHealthStatus(nodeHealthStatus);
-
-    List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
-    nodeStatus.setKeepAliveApplications(keepAliveAppIds);
-    
-    return nodeStatus;
+    return containerStatuses;
   }
 
   private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@@ -458,18 +457,15 @@ public class NodeStatusUpdaterImpl exten
           // Send heartbeat
           try {
             NodeHeartbeatResponse response = null;
-            NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext();
-            nodeStatus.setResponseId(lastHeartBeatID);
+            NodeStatus nodeStatus =
+                getNodeStatusAndUpdateContainersInContext(lastHeartBeatID);
             
-            NodeHeartbeatRequest request = recordFactory
-                .newRecordInstance(NodeHeartbeatRequest.class);
-            request.setNodeStatus(nodeStatus);
-            request
-              .setLastKnownContainerTokenMasterKey(NodeStatusUpdaterImpl.this.context
-                .getContainerTokenSecretManager().getCurrentKey());
-            request
-              .setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
-                .getNMTokenSecretManager().getCurrentKey());
+            NodeHeartbeatRequest request =
+                NodeHeartbeatRequest.newInstance(nodeStatus,
+                  NodeStatusUpdaterImpl.this.context
+                    .getContainerTokenSecretManager().getCurrentKey(),
+                  NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
+                    .getCurrentKey());
             response = resourceTracker.nodeHeartbeat(request);
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Nov 19 17:26:23 2013
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.SerializedException;
@@ -371,17 +373,31 @@ public class ContainerManagerImpl extend
 
     this.handle(new CMgrCompletedContainersEvent(containerIds,
       CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC));
-    while (!containers.isEmpty()) {
-      try {
-        Thread.sleep(1000);
-        nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
-      } catch (InterruptedException ex) {
-        LOG.warn("Interrupted while sleeping on container kill on resync", ex);
+
+    /*
+     * We will wait till all the containers change their state to COMPLETE. We
+     * will not remove the container statuses from nm context because these
+     * are used while re-registering node manager with resource manager.
+     */
+    boolean allContainersCompleted = false;
+    while (!containers.isEmpty() && !allContainersCompleted) {
+      allContainersCompleted = true;
+      for (Entry<ContainerId, Container> container : containers.entrySet()) {
+        if (((ContainerImpl) container.getValue()).getCurrentState()
+            != ContainerState.COMPLETE) {
+          allContainersCompleted = false;
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ex) {
+            LOG.warn("Interrupted while sleeping on container kill on resync",
+              ex);
+          }
+          break;
+        }
       }
     }
-
     // All containers killed
-    if (containers.isEmpty()) {
+    if (allContainersCompleted) {
       LOG.info("All containers in DONE state");
     } else {
       LOG.info("Done waiting for containers to be killed. Still alive: " +

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Tue Nov 19 17:26:23 2013
@@ -302,7 +302,7 @@ public class ContainerImpl implements Co
   private final StateMachine<ContainerState, ContainerEventType, ContainerEvent>
     stateMachine;
 
-  private org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
+  public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
     switch (stateMachine.getCurrentState()) {
     case NEW:
     case LOCALIZING:

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Tue Nov 19 17:26:23 2013
@@ -318,7 +318,7 @@ public class ClientRMService extends Abs
     try {
       // call RMAppManager to submit application directly
       rmAppManager.submitApplication(submissionContext,
-          System.currentTimeMillis(), false, user);
+          System.currentTimeMillis(), user, false, null);
 
       LOG.info("Application with id " + applicationId.getId() + 
           " submitted by user " + user);

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Tue Nov 19 17:26:23 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -54,6 +55,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class manages the list of applications for the resource manager. 
  */
@@ -165,6 +168,11 @@ public class RMAppManager implements Eve
     }
   }
 
+  @VisibleForTesting
+  public void logApplicationSummary(ApplicationId appId) {
+    ApplicationSummary.logAppSummary(rmContext.getRMApps().get(appId));
+  }
+
   protected synchronized void setCompletedAppsMax(int max) {
     this.completedAppsMax = max;
   }
@@ -229,35 +237,63 @@ public class RMAppManager implements Eve
       this.applicationACLsManager.removeApplication(removeId);
     }
   }
-
+  
   @SuppressWarnings("unchecked")
   protected void submitApplication(
       ApplicationSubmissionContext submissionContext, long submitTime,
-      boolean isRecovered, String user) throws YarnException {
+      String user, boolean isRecovered, RMState state) throws YarnException {
     ApplicationId applicationId = submissionContext.getApplicationId();
 
-    // Validation of the ApplicationSubmissionContext needs to be completed
-    // here. Only those fields that are dependent on RM's configuration are
-    // checked here as they have to be validated whether they are part of new
-    // submission or just being recovered.
+    RMAppImpl application =
+        createAndPopulateNewRMApp(submissionContext, submitTime, user);
 
-    // Check whether AM resource requirements are within required limits
-    if (!submissionContext.getUnmanagedAM()) {
-      ResourceRequest amReq = BuilderUtils.newResourceRequest(
-          RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
-          submissionContext.getResource(), 1);
-      try {
-        SchedulerUtils.validateResourceRequest(amReq,
-            scheduler.getMaximumResourceCapability());
-      } catch (InvalidResourceRequestException e) {
-        LOG.warn("RM app submission failed in validating AM resource request"
-            + " for application " + applicationId, e);
-        throw e;
+    if (isRecovered) {
+      recoverApplication(state, application);
+      RMAppState rmAppState =
+          state.getApplicationState().get(applicationId).getState();
+      if (isApplicationInFinalState(rmAppState)) {
+        // We are synchronously moving the application into final state so that
+        // momentarily client will not see this application in NEW state. Also
+        // for finished applications we will avoid renewing tokens.
+        application
+            .handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER));
+        return;
       }
     }
+    
+    if (UserGroupInformation.isSecurityEnabled()) {
+      Credentials credentials = null;
+      try {
+        credentials = parseCredentials(submissionContext);
+      } catch (Exception e) {
+        LOG.warn(
+            "Unable to parse credentials.", e);
+        // Sending APP_REJECTED is fine, since we assume that the
+        // RMApp is in NEW state and thus we haven't yet informed the
+        // scheduler about the existence of the application
+        assert application.getState() == RMAppState.NEW;
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppRejectedEvent(applicationId, e.getMessage()));
+        throw RPCUtil.getRemoteException(e);
+      }
+      this.rmContext.getDelegationTokenRenewer().addApplication(
+          applicationId, credentials,
+          submissionContext.getCancelTokensWhenComplete(), isRecovered);
+    } else {
+      this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppEvent(applicationId,
+              isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START));
+    }
+  }
 
+  private RMAppImpl createAndPopulateNewRMApp(
+      ApplicationSubmissionContext submissionContext,
+      long submitTime, String user)
+      throws YarnException {
+    ApplicationId applicationId = submissionContext.getApplicationId();
+    validateResourceRequest(submissionContext);
     // Create RMApp
-    RMApp application =
+    RMAppImpl application =
         new RMAppImpl(applicationId, rmContext, this.conf,
             submissionContext.getApplicationName(), user,
             submissionContext.getQueue(),
@@ -274,35 +310,52 @@ public class RMAppManager implements Eve
       LOG.warn(message);
       throw RPCUtil.getRemoteException(message);
     }
-
     // Inform the ACLs Manager
     this.applicationACLsManager.addApplication(applicationId,
         submissionContext.getAMContainerSpec().getApplicationACLs());
+    return application;
+  }
+
+  private void validateResourceRequest(
+      ApplicationSubmissionContext submissionContext)
+      throws InvalidResourceRequestException {
+    // Validation of the ApplicationSubmissionContext needs to be completed
+    // here. Only those fields that are dependent on RM's configuration are
+    // checked here as they have to be validated whether they are part of new
+    // submission or just being recovered.
 
+    // Check whether AM resource requirements are within required limits
+    if (!submissionContext.getUnmanagedAM()) {
+      ResourceRequest amReq = BuilderUtils.newResourceRequest(
+          RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
+          submissionContext.getResource(), 1);
+      try {
+        SchedulerUtils.validateResourceRequest(amReq,
+            scheduler.getMaximumResourceCapability());
+      } catch (InvalidResourceRequestException e) {
+        LOG.warn("RM app submission failed in validating AM resource request"
+            + " for application " + submissionContext.getApplicationId(), e);
+        throw e;
+      }
+    }
+  }
+
+  private void recoverApplication(RMState state, RMAppImpl application)
+      throws YarnException {
     try {
-      // Setup tokens for renewal
-      if (UserGroupInformation.isSecurityEnabled()) {
-        this.rmContext.getDelegationTokenRenewer().addApplication(
-            applicationId,parseCredentials(submissionContext),
-            submissionContext.getCancelTokensWhenComplete()
-            );
-      }
-    } catch (IOException ie) {
-      LOG.warn(
-          "Unable to add the application to the delegation token renewer.",
-          ie);
-      // Sending APP_REJECTED is fine, since we assume that the
-      // RMApp is in NEW state and thus we havne't yet informed the
-      // Scheduler about the existence of the application
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppRejectedEvent(applicationId, ie.getMessage()));
-      throw RPCUtil.getRemoteException(ie);
+      application.recover(state);
+    } catch (Exception e) {
+      LOG.error("Error recovering application", e);
+      throw new YarnException(e);
     }
+  }
 
-    if (!isRecovered) {
-      // All done, start the RMApp
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMAppEvent(applicationId, RMAppEventType.START));
+  private boolean isApplicationInFinalState(RMAppState rmAppState) {
+    if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
+        || rmAppState == RMAppState.KILLED) {
+      return true;
+    } else {
+      return false;
     }
   }
   
@@ -328,17 +381,9 @@ public class RMAppManager implements Eve
     LOG.info("Recovering " + appStates.size() + " applications");
     for (ApplicationState appState : appStates.values()) {
       LOG.info("Recovering application " + appState.getAppId());
+      
       submitApplication(appState.getApplicationSubmissionContext(),
-        appState.getSubmitTime(), true, appState.getUser());
-      // re-populate attempt information in application
-      RMAppImpl appImpl =
-          (RMAppImpl) rmContext.getRMApps().get(appState.getAppId());
-      appImpl.recover(state);
-      // Recover the app synchronously, as otherwise client is possible to see
-      // the application not recovered before it is actually recovered because
-      // ClientRMService is already started at this point of time.
-      appImpl.handle(new RMAppEvent(appImpl.getApplicationId(),
-        RMAppEventType.RECOVER));
+        appState.getSubmitTime(), appState.getUser(), true, state);
     }
   }
 
@@ -351,8 +396,7 @@ public class RMAppManager implements Eve
       case APP_COMPLETED: 
       {
         finishApplication(applicationId);
-        ApplicationSummary.logAppSummary(
-            rmContext.getRMApps().get(applicationId));
+        logApplicationSummary(applicationId);
         checkAppNumCompletedLimit(); 
       } 
       break;

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Tue Nov 19 17:26:23 2013
@@ -29,6 +29,10 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.VersionUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
@@ -46,14 +50,17 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
@@ -183,6 +190,33 @@ public class ResourceTrackerService exte
     Resource capability = request.getResource();
     String nodeManagerVersion = request.getNMVersion();
 
+    if (!request.getContainerStatuses().isEmpty()) {
+      LOG.info("received container statuses on node manager register :"
+          + request.getContainerStatuses());
+      for (ContainerStatus containerStatus : request.getContainerStatuses()) {
+        ApplicationAttemptId appAttemptId =
+            containerStatus.getContainerId().getApplicationAttemptId();
+        RMApp rmApp =
+            rmContext.getRMApps().get(appAttemptId.getApplicationId());
+        if (rmApp != null) {
+          RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
+          if (rmAppAttempt.getMasterContainer().getId()
+              .equals(containerStatus.getContainerId())
+              && containerStatus.getState() == ContainerState.COMPLETE) {
+            // sending master container finished event.
+            RMAppAttemptContainerFinishedEvent evt =
+                new RMAppAttemptContainerFinishedEvent(appAttemptId,
+                    containerStatus);
+            rmContext.getDispatcher().getEventHandler().handle(evt);
+          }
+        } else {
+          LOG.error("Received finished container :"
+              + containerStatus.getContainerId()
+              + " for non existing application :"
+              + appAttemptId.getApplicationId());
+        }
+      }
+    }
     RegisterNodeManagerResponse response = recordFactory
         .newRecordInstance(RegisterNodeManagerResponse.class);
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Nov 19 17:26:23 2013
@@ -132,8 +132,8 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
         RMAppEventType.START, new RMAppSavingTransition())
     .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
-            RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED,
-            RMAppState.FINAL_SAVING),
+            RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED,
+            RMAppState.KILLED, RMAppState.FINAL_SAVING),
         RMAppEventType.RECOVER, new RMAppRecoveredTransition())
     .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL,
         new FinalSavingTransition(
@@ -611,11 +611,11 @@ public class RMAppImpl implements RMApp,
     this.diagnostics.append(appState.getDiagnostics());
     this.storedFinishTime = appState.getFinishTime();
     this.startTime = appState.getStartTime();
+
     for(int i=0; i<appState.getAttemptCount(); ++i) {
       // create attempt
       createNewAttempt(false);
-      // recover attempt
-      ((RMAppAttemptImpl) currentAttempt).recover(state);
+      ((RMAppAttemptImpl)this.currentAttempt).recover(state);
     }
   }
 
@@ -656,30 +656,35 @@ public class RMAppImpl implements RMApp,
     };
   }
 
+  @SuppressWarnings("unchecked")
   private static final class RMAppRecoveredTransition implements
       MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+    
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
 
-      if (app.recoveredFinalState != null) {
-        FINAL_TRANSITION.transition(app, event);
-        return app.recoveredFinalState;
-      }
-      // Directly call AttemptFailedTransition, since now we deem that an
-      // application fails because of RM restart as a normal AM failure.
-
-      // Do not recover unmanaged applications since current recovery 
-      // mechanism of restarting attempts does not work for them.
-      // This will need to be changed in work preserving recovery in which 
-      // RM will re-connect with the running AM's instead of restarting them
-
-      // In work-preserve restart, if attemptCount == maxAttempts, the job still
-      // needs to be recovered because the last attempt may still be running.
-
-      // As part of YARN-1210, we may return ACCECPTED state waiting for AM to
-      // reregister or fail and remove the following code.
-      return new AttemptFailedTransition(RMAppState.SUBMITTED).transition(app,
-        event);
+      if (app.attempts.isEmpty()) {
+        // Saved application was not running any attempts.
+        app.createNewAttempt(true);
+        return RMAppState.SUBMITTED;        
+      } else {
+        /*
+         * If last attempt recovered final state is null .. it means attempt
+         * was started but AM container may or may not have started / finished.
+         * Therefore we should wait for it to finish.
+         */
+        for (RMAppAttempt attempt : app.getAppAttempts().values()) {
+          app.dispatcher.getEventHandler().handle(
+              new RMAppAttemptEvent(attempt.getAppAttemptId(),
+                  RMAppAttemptEventType.RECOVER));
+        }        
+        if (app.recoveredFinalState != null) {
+          FINAL_TRANSITION.transition(app, event);
+          return app.recoveredFinalState;
+        } else {
+          return RMAppState.RUNNING;
+        }
+      }
     }
   }
 
@@ -1017,4 +1022,10 @@ public class RMAppImpl implements RMApp,
       throw new YarnRuntimeException("Unknown state passed!");
     }
   }
+  
+  public static boolean isAppInFinalState(RMApp rmApp) {
+    RMAppState appState = rmApp.getState();
+    return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
+        || appState == RMAppState.KILLED;
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Tue Nov 19 17:26:23 2013
@@ -68,11 +68,14 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
@@ -179,7 +182,7 @@ public class RMAppAttemptImpl implements
             new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
       .addTransition( RMAppAttemptState.NEW,
           EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED,
-            RMAppAttemptState.FAILED, RMAppAttemptState.RECOVERED),
+            RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED),
           RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
           
       // Transitions from SUBMITTED state
@@ -386,25 +389,6 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.KILL,
               RMAppAttemptEventType.STATUS_UPDATE))
-              
-      // Transitions from RECOVERED State
-      .addTransition(
-          RMAppAttemptState.RECOVERED,
-          RMAppAttemptState.RECOVERED,
-          EnumSet.of(RMAppAttemptEventType.START,
-              RMAppAttemptEventType.APP_ACCEPTED,
-              RMAppAttemptEventType.APP_REJECTED,
-              RMAppAttemptEventType.EXPIRE,
-              RMAppAttemptEventType.LAUNCHED,
-              RMAppAttemptEventType.LAUNCH_FAILED,
-              RMAppAttemptEventType.REGISTERED,
-              RMAppAttemptEventType.CONTAINER_ALLOCATED,
-              RMAppAttemptEventType.CONTAINER_ACQUIRED,
-              RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
-              RMAppAttemptEventType.CONTAINER_FINISHED,
-              RMAppAttemptEventType.UNREGISTERED,
-              RMAppAttemptEventType.KILL,
-              RMAppAttemptEventType.STATUS_UPDATE))
     .installTopology();
 
   public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
@@ -694,8 +678,6 @@ public class RMAppAttemptImpl implements
     this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
     this.finalStatus = attemptState.getFinalApplicationStatus();
     this.startTime = attemptState.getStartTime();
-    handle(new RMAppAttemptEvent(getAppAttemptId(),
-      RMAppAttemptEventType.RECOVER));
   }
 
   private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
@@ -865,11 +847,38 @@ public class RMAppAttemptImpl implements
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
+      LOG.info("Recovering attempt :  recoverdFinalState :"
+          + appAttempt.recoveredFinalState);
       if (appAttempt.recoveredFinalState != null) {
         appAttempt.progress = 1.0f;
+        RMApp rmApp =appAttempt.rmContext.getRMApps().get(
+            appAttempt.getAppAttemptId().getApplicationId());
+        // We will replay the final attempt only if last attempt is in final
+        // state but application is not in final state.
+        if (rmApp.getCurrentAppAttempt() == appAttempt
+            && !RMAppImpl.isAppInFinalState(rmApp)) {
+          (new BaseFinalTransition(appAttempt.recoveredFinalState)).transition(
+              appAttempt, event);
+        }
         return appAttempt.recoveredFinalState;
       } else {
-        return RMAppAttemptState.RECOVERED;
+        /*
+         * Since the application attempt's final state is not saved that means
+         * for AM container (previous attempt) state must be one of these.
+         * 1) AM container may not have been launched (RM failed right before
+         * this).
+         * 2) AM container was successfully launched but may or may not have
+         * registered / unregistered.
+         * In whichever case we will wait (by moving attempt into LAUNCHED
+         * state) and mark this attempt failed (assuming non work preserving
+         * restart) only after
+         * 1) Node manager during re-registration heart beats back saying
+         * am container finished.
+         * 2) OR AMLivelinessMonitor expires this attempt (when am doesn't
+         * heart beat back).  
+         */
+        (new AMLaunchedTransition()).transition(appAttempt, event);
+        return RMAppAttemptState.LAUNCHED;
       }
     }
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java Tue Nov 19 17:26:23 2013
@@ -20,6 +20,5 @@ package org.apache.hadoop.yarn.server.re
 
 public enum RMAppAttemptState {
   NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING, 
-  FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED,
-  FINAL_SAVING
+  FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, FINAL_SAVING
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Tue Nov 19 17:26:23 2013
@@ -34,6 +34,10 @@ import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
@@ -48,10 +52,15 @@ import org.apache.hadoop.service.Abstrac
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Service to renew application delegation tokens.
@@ -72,7 +81,8 @@ public class DelegationTokenRenewer exte
   // delegation token canceler thread
   private DelegationTokenCancelThread dtCancelThread =
     new DelegationTokenCancelThread();
-
+  private ThreadPoolExecutor renewerService;
+  
   // managing the list of tokens using Map
   // appId=>List<tokens>
   private Set<DelegationTokenToRenew> delegationTokens = 
@@ -84,9 +94,9 @@ public class DelegationTokenRenewer exte
   private long tokenRemovalDelayMs;
   
   private Thread delayedRemovalThread;
-  private boolean isServiceStarted = false;
-  private List<DelegationTokenToRenew> pendingTokenForRenewal =
-      new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
+  private ReadWriteLock serviceStateLock = new ReentrantReadWriteLock();
+  private volatile boolean isServiceStarted;
+  private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
   
   private boolean tokenKeepAliveEnabled;
   
@@ -102,9 +112,27 @@ public class DelegationTokenRenewer exte
     this.tokenRemovalDelayMs =
         conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
             YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+    renewerService = createNewThreadPoolService(conf);
+    pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
     super.serviceInit(conf);
   }
 
+  protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
+    int nThreads = conf.getInt(
+        YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT,
+        YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT);
+
+    ThreadFactory tf = new ThreadFactoryBuilder()
+        .setNameFormat("DelegationTokenRenewer #%d")
+        .build();
+    ThreadPoolExecutor pool =
+        new ThreadPoolExecutor((5 < nThreads ? 5 : nThreads), nThreads, 3L,
+            TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+    pool.setThreadFactory(tf);
+    pool.allowCoreThreadTimeOut(true);
+    return pool;
+  }
+
   @Override
   protected void serviceStart() throws Exception {
     dtCancelThread.start();
@@ -119,21 +147,36 @@ public class DelegationTokenRenewer exte
     RMDelegationTokenIdentifier.Renewer.setSecretManager(
         rmContext.getRMDelegationTokenSecretManager(),
         rmContext.getClientRMService().getBindAddress());
-    // Delegation token renewal is delayed until ClientRMService starts. As
-    // it is required to short circuit the token renewal calls.
+    serviceStateLock.writeLock().lock();
     isServiceStarted = true;
-    renewIfServiceIsStarted(pendingTokenForRenewal);
-    pendingTokenForRenewal.clear();
+    serviceStateLock.writeLock().unlock();
+    while(!pendingEventQueue.isEmpty()) {
+      processDelegationTokenRewewerEvent(pendingEventQueue.take());
+    }
     super.serviceStart();
   }
 
+  private void processDelegationTokenRewewerEvent(
+      DelegationTokenRenewerEvent evt) {
+    serviceStateLock.readLock().lock();
+    try {
+      if (isServiceStarted) {
+        renewerService.execute(new DelegationTokenRenewerRunnable(evt));
+      } else {
+        pendingEventQueue.add(evt);
+      }
+    } finally {
+      serviceStateLock.readLock().unlock();
+    }
+  }
+
   @Override
   protected void serviceStop() {
     if (renewalTimer != null) {
       renewalTimer.cancel();
     }
     delegationTokens.clear();
-
+    this.renewerService.shutdown();
     dtCancelThread.interrupt();
     try {
       dtCancelThread.join(1000);
@@ -290,47 +333,50 @@ public class DelegationTokenRenewer exte
    * @throws IOException
    */
   public void addApplication(
-      ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
+      ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd,
+      boolean isApplicationRecovered) {
+    processDelegationTokenRewewerEvent(new DelegationTokenRenewerAppSubmitEvent(
+        applicationId, ts,
+        shouldCancelAtEnd, isApplicationRecovered));
+  }
+
+  private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
       throws IOException {
+    ApplicationId applicationId = evt.getApplicationId();
+    Credentials ts = evt.getCredentials();
+    boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
     if (ts == null) {
-      return; //nothing to add
+      return; // nothing to add
     }
-    
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Registering tokens for renewal for:" + 
+      LOG.debug("Registering tokens for renewal for:" +
           " appId = " + applicationId);
     }
-    
-    Collection <Token<?>> tokens = ts.getAllTokens();
+
+    Collection<Token<?>> tokens = ts.getAllTokens();
     long now = System.currentTimeMillis();
-    
+
     // find tokens for renewal, but don't add timers until we know
     // all renewable tokens are valid
     // At RM restart it is safe to assume that all the previously added tokens
     // are valid
     List<DelegationTokenToRenew> tokenList =
         new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
-    for(Token<?> token : tokens) {
+    for (Token<?> token : tokens) {
       if (token.isManaged()) {
         tokenList.add(new DelegationTokenToRenew(applicationId,
             token, getConfig(), now, shouldCancelAtEnd));
       }
     }
-    if (!tokenList.isEmpty()){
-      renewIfServiceIsStarted(tokenList);
-    }
-  }
-
-  protected void renewIfServiceIsStarted(List<DelegationTokenToRenew> dtrs)
-      throws IOException {
-    if (isServiceStarted) {
+    if (!tokenList.isEmpty()) {
       // Renewing token and adding it to timer calls are separated purposefully
       // If user provides incorrect token then it should not be added for
       // renewal.
-      for (DelegationTokenToRenew dtr : dtrs) {
+      for (DelegationTokenToRenew dtr : tokenList) {
         renewToken(dtr);
       }
-      for (DelegationTokenToRenew dtr : dtrs) {
+      for (DelegationTokenToRenew dtr : tokenList) {
         addTokenToList(dtr);
         setTimerForTokenRenewal(dtr);
         if (LOG.isDebugEnabled()) {
@@ -338,11 +384,9 @@ public class DelegationTokenRenewer exte
               + dtr.token.getService() + " for appId = " + dtr.applicationId);
         }
       }
-    } else {
-      pendingTokenForRenewal.addAll(dtrs);
     }
   }
-  
+
   /**
    * Task - to renew a token
    *
@@ -449,14 +493,20 @@ public class DelegationTokenRenewer exte
    * @param applicationId completed application
    */
   public void applicationFinished(ApplicationId applicationId) {
+    processDelegationTokenRewewerEvent(new DelegationTokenRenewerEvent(
+        applicationId,
+        DelegationTokenRenewerEventType.FINISH_APPLICATION));
+  }
+
+  private void handleAppFinishEvent(DelegationTokenRenewerEvent evt) {
     if (!tokenKeepAliveEnabled) {
-      removeApplicationFromRenewal(applicationId);
+      removeApplicationFromRenewal(evt.getApplicationId());
     } else {
-      delayedRemovalMap.put(applicationId, System.currentTimeMillis()
+      delayedRemovalMap.put(evt.getApplicationId(), System.currentTimeMillis()
           + tokenRemovalDelayMs);
     }
   }
-
+  
   /**
    * Add a list of applications to the keep alive list. If an appId already
    * exists, update it's keep-alive time.
@@ -546,4 +596,111 @@ public class DelegationTokenRenewer exte
   public void setRMContext(RMContext rmContext) {
     this.rmContext = rmContext;
   }
+  
+  /*
+   * This will run as a separate thread and will process individual events. It
+   * is done in this way to make sure that the token renewal as a part of
+   * application submission and token removal as a part of application finish
+   * is asynchronous in nature.
+   */
+  private final class DelegationTokenRenewerRunnable
+      implements Runnable {
+
+    private DelegationTokenRenewerEvent evt;
+    
+    public DelegationTokenRenewerRunnable(DelegationTokenRenewerEvent evt) {
+      this.evt = evt;
+    }
+    
+    @Override
+    public void run() {
+      if (evt instanceof DelegationTokenRenewerAppSubmitEvent) {
+        DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
+            (DelegationTokenRenewerAppSubmitEvent) evt;
+        handleDTRenewerAppSubmitEvent(appSubmitEvt);
+      } else if (evt.getType().equals(
+          DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
+        DelegationTokenRenewer.this.handleAppFinishEvent(evt);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void handleDTRenewerAppSubmitEvent(
+        DelegationTokenRenewerAppSubmitEvent event) {
+      /*
+       * For applications submitted with delegation tokens we are not submitting
+       * the application to scheduler from RMAppManager. Instead we are doing
+       * it from here. The primary goal is to make token renewal as a part of
+       * application submission asynchronous so that client thread is not
+       * blocked during app submission.
+       */
+      try {
+        // Setup tokens for renewal
+        DelegationTokenRenewer.this.handleAppSubmitEvent(event);
+        rmContext.getDispatcher().getEventHandler()
+            .handle(new RMAppEvent(event.getApplicationId(),
+                event.isApplicationRecovered() ? RMAppEventType.RECOVER
+                    : RMAppEventType.START));
+      } catch (Throwable t) {
+        LOG.warn(
+            "Unable to add the application to the delegation token renewer.",
+            t);
+        // Sending APP_REJECTED is fine, since we assume that the
+        // RMApp is in NEW state and thus we havne't yet informed the
+        // Scheduler about the existence of the application
+        rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppRejectedEvent(event.getApplicationId(), t.getMessage()));
+      }
+    }
+  }
+  
+  class DelegationTokenRenewerAppSubmitEvent extends
+      DelegationTokenRenewerEvent {
+
+    private Credentials credentials;
+    private boolean shouldCancelAtEnd;
+    private boolean isAppRecovered;
+
+    public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
+        Credentials credentails, boolean shouldCancelAtEnd,
+        boolean isApplicationRecovered) {
+      super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
+      this.credentials = credentails;
+      this.shouldCancelAtEnd = shouldCancelAtEnd;
+      this.isAppRecovered = isApplicationRecovered;
+    }
+
+    public Credentials getCredentials() {
+      return credentials;
+    }
+
+    public boolean shouldCancelAtEnd() {
+      return shouldCancelAtEnd;
+    }
+
+    public boolean isApplicationRecovered() {
+      return isAppRecovered;
+    }
+  }
+  
+  enum DelegationTokenRenewerEventType {
+    VERIFY_AND_START_APPLICATION,
+    FINISH_APPLICATION
+  }
+  
+  class DelegationTokenRenewerEvent extends
+      AbstractEvent<DelegationTokenRenewerEventType> {
+
+    private ApplicationId appId;
+
+    public DelegationTokenRenewerEvent(ApplicationId appId,
+        DelegationTokenRenewerEventType type) {
+      super(type);
+      this.appId = appId;
+    }
+
+    public ApplicationId getApplicationId() {
+      return appId;
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java Tue Nov 19 17:26:23 2013
@@ -249,7 +249,8 @@ class CapacitySchedulerPage extends RmVi
         _("$(function() {",
           "  $('#cs a span').addClass('ui-corner-all').css('position', 'absolute');",
           "  $('#cs').bind('loaded.jstree', function (e, data) {",
-          "    data.inst.open_node('#pq', true);",
+          "    var callback = { call:reopenQueryNodes }",
+          "    data.inst.open_node('#pq', callback);",
           "   }).",
           "    jstree({",
           "    core: { animation: 188, html_titles: true },",
@@ -265,7 +266,8 @@ class CapacitySchedulerPage extends RmVi
           "    $('#apps').dataTable().fnFilter(q, 3, true);",
           "  });",
           "  $('#cs').show();",
-          "});")._();
+          "});")._().
+      _(SchedulerPageUtil.QueueBlockUtil.class);
   }
 
   @Override protected Class<? extends SubView> content() {