You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/06/18 07:09:36 UTC

svn commit: r1603355 [1/2] - in /hadoop/common/branches/HDFS-5442/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/ h...

Author: vinayakumarb
Date: Wed Jun 18 05:09:28 2014
New Revision: 1603355

URL: http://svn.apache.org/r1603355
Log:
Merged latest changes from trunk

Added:
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java
      - copied unchanged from r1603354, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRunningOnNodeEvent.java
      - copied unchanged from r1603354, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRunningOnNodeEvent.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppState.java
      - copied unchanged from r1603354, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppState.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java
      - copied unchanged from r1603354, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java
Removed:
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java
Modified:
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
    hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ResourceManagerRest.apt.vm

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/CHANGES.txt?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/CHANGES.txt Wed Jun 18 05:09:28 2014
@@ -36,6 +36,12 @@ Release 2.5.0 - UNRELEASED
     schedulers after ResourceManager Restart so as to preserve running work in
     the cluster. (Jian He via vinodkv)
 
+    YARN-1702. Added kill app functionality to RM web services. (Varun Vasudev
+    via vinodkv)
+
+    YARN-1339. Recover DeletionService state upon nodemanager restart. (Jason Lowe
+    via junping_du)
+
   IMPROVEMENTS
 
     YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
@@ -158,6 +164,12 @@ Release 2.5.0 - UNRELEASED
     YARN-2091. Add more values to ContainerExitStatus and pass it from NM to
     RM and then to app masters (Tsuyoshi OZAWA via bikas)
 
+    YARN-2125. Changed ProportionalCapacityPreemptionPolicy to log CSV in debug
+    level. (Wangda Tan via jianhe)
+
+    YARN-2159. Better logging in SchedulerNode#allocateContainer.
+    (Ray Chiang via kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES 
@@ -227,7 +239,27 @@ Release 2.5.0 - UNRELEASED
     YARN-2128. FairScheduler: Incorrect calculation of amResource usage.
     (Wei Yan via kasha)
 
-Release 2.4.1 - UNRELEASED
+    YARN-2124. Fixed NPE in ProportionalCapacityPreemptionPolicy. (Wangda Tan
+    via jianhe)
+
+    YARN-2148. TestNMClient failed due more exit code values added and passed
+    to AM (Wangda Tan via bikas)
+
+    YARN-2075. Fixed the test failure of TestRMAdminCLI. (Kenji Kikushima via
+    zjshen)
+
+    YARN-2155. FairScheduler: Incorrect threshold check for preemption.
+    (Wei Yan via kasha)
+
+    YARN-1885. Fixed a bug that RM may not send application-clean-up signal
+    to NMs where the completed applications previously ran in case of RM restart.
+    (Wangda Tan via jianhe)
+
+    YARN-2167. LeveldbIterator should get closed in
+    NMLeveldbStateStoreService#loadLocalizationState() within finally block
+    (Junping Du via jlowe)
+
+Release 2.4.1 - 2014-06-23 
 
   INCOMPATIBLE CHANGES
 

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java Wed Jun 18 05:09:28 2014
@@ -305,7 +305,8 @@ public class TestRMAdminCLI {
       testError(new String[] { "-help", "-getGroups" },
           "Usage: yarn rmadmin [-getGroups [username]]", dataErr, 0);
       testError(new String[] { "-help", "-transitionToActive" },
-          "Usage: yarn rmadmin [-transitionToActive <serviceId>]", dataErr, 0);
+          "Usage: yarn rmadmin [-transitionToActive <serviceId>" +
+          " [--forceactive]]", dataErr, 0);
       testError(new String[] { "-help", "-transitionToStandby" },
           "Usage: yarn rmadmin [-transitionToStandby <serviceId>]", dataErr, 0);
       testError(new String[] { "-help", "-getServiceState" },
@@ -332,9 +333,9 @@ public class TestRMAdminCLI {
               "yarn rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" +
               "UserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
               "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" +
-              " [username]] [-help [cmd]] [-transitionToActive <serviceId>]" +
-              " [-transitionToStandby <serviceId>] [-failover [--forcefence]" +
-              " [--forceactive] <serviceId> <serviceId>] " +
+              " [username]] [-help [cmd]] [-transitionToActive <serviceId>" + 
+              " [--forceactive]] [-transitionToStandby <serviceId>] [-failover" +
+              " [--forcefence] [--forceactive] <serviceId> <serviceId>] " +
               "[-getServiceState <serviceId>] [-checkHealth <serviceId>]"));
     } finally {
       System.setOut(oldOutPrintStream);

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java Wed Jun 18 05:09:28 2014
@@ -60,7 +60,7 @@ public class TestResourceTrackerOnHA ext
     // make sure registerNodeManager works when failover happens
     RegisterNodeManagerRequest request =
         RegisterNodeManagerRequest.newInstance(nodeId, 0, resource,
-            YarnVersionInfo.getVersion(), null);
+            YarnVersionInfo.getVersion(), null, null);
     resourceTracker.registerNodeManager(request);
     Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId));
 

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java Wed Jun 18 05:09:28 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -346,7 +347,7 @@ public class TestNMClient {
           // 137 is possible if the container is not terminated but killed
           testGetContainerStatus(container, i, ContainerState.COMPLETE,
               "Container killed by the ApplicationMaster.", Arrays.asList(
-                  new Integer[] {137, 143, 0}));
+                  new Integer[] {ContainerExitStatus.KILLED_BY_APPMASTER}));
         } catch (YarnException e) {
           // The exception is possible because, after the container is stopped,
           // it may be removed from NM's context.

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java Wed Jun 18 05:09:28 2014
@@ -20,15 +20,17 @@ package org.apache.hadoop.yarn.server.ap
 
 import java.util.List;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Records;
 
 public abstract class RegisterNodeManagerRequest {
-  
+
   public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
       int httpPort, Resource resource, String nodeManagerVersionId,
-      List<NMContainerStatus> containerStatuses) {
+      List<NMContainerStatus> containerStatuses,
+      List<ApplicationId> runningApplications) {
     RegisterNodeManagerRequest request =
         Records.newRecord(RegisterNodeManagerRequest.class);
     request.setHttpPort(httpPort);
@@ -36,6 +38,7 @@ public abstract class RegisterNodeManage
     request.setNodeId(nodeId);
     request.setNMVersion(nodeManagerVersionId);
     request.setContainerStatuses(containerStatuses);
+    request.setRunningApplications(runningApplications);
     return request;
   }
   
@@ -45,10 +48,30 @@ public abstract class RegisterNodeManage
   public abstract String getNMVersion();
   public abstract List<NMContainerStatus> getNMContainerStatuses();
   
+  /**
+   * We introduce this here because currently YARN RM doesn't persist nodes info
+   * for application running. When RM restart happened, we cannot determinate if
+   * a node should do application cleanup (like log-aggregation, status update,
+   * etc.) or not. <p/>
+   * When we have this running application list in node manager register
+   * request, we can recover nodes info for running applications. And then we
+   * can take actions accordingly
+   * 
+   * @return running application list in this node
+   */
+  public abstract List<ApplicationId> getRunningApplications();
+  
   public abstract void setNodeId(NodeId nodeId);
   public abstract void setHttpPort(int port);
   public abstract void setResource(Resource resource);
   public abstract void setNMVersion(String version);
   public abstract void setContainerStatuses(
       List<NMContainerStatus> containerStatuses);
+  
+  /**
+   * Setter for {@link RegisterNodeManagerRequest#getRunningApplications()}
+   * @param runningApplications running application in this node
+   */
+  public abstract void setRunningApplications(
+      List<ApplicationId> runningApplications);
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java Wed Jun 18 05:09:28 2014
@@ -20,12 +20,23 @@ package org.apache.hadoop.yarn.server.ap
 
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
@@ -44,6 +55,7 @@ public class RegisterNodeManagerRequestP
   private Resource resource = null;
   private NodeId nodeId = null;
   private List<NMContainerStatus> containerStatuses = null;
+  private List<ApplicationId> runningApplications = null;
   
   public RegisterNodeManagerRequestPBImpl() {
     builder = RegisterNodeManagerRequestProto.newBuilder();
@@ -65,6 +77,9 @@ public class RegisterNodeManagerRequestP
     if (this.containerStatuses != null) {
       addNMContainerStatusesToProto();
     }
+    if (this.runningApplications != null) {
+      addRunningApplicationsToProto();
+    }
     if (this.resource != null) {
       builder.setResource(convertToProtoFormat(this.resource));
     }
@@ -158,6 +173,66 @@ public class RegisterNodeManagerRequestP
     maybeInitBuilder();
     builder.setHttpPort(httpPort);
   }
+  
+  @Override
+  public List<ApplicationId> getRunningApplications() {
+    initRunningApplications();
+    return runningApplications;
+  }
+  
+  private void initRunningApplications() {
+    if (this.runningApplications != null) {
+      return;
+    }
+    RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<ApplicationIdProto> list = p.getRunningApplicationsList();
+    this.runningApplications = new ArrayList<ApplicationId>();
+    for (ApplicationIdProto c : list) {
+      this.runningApplications.add(convertFromProtoFormat(c));
+    }
+  }
+
+  @Override
+  public void setRunningApplications(List<ApplicationId> apps) {
+    if (apps == null) {
+      return;
+    }
+    initRunningApplications();
+    this.runningApplications.addAll(apps);
+  }
+  
+  private void addRunningApplicationsToProto() {
+    maybeInitBuilder();
+    builder.clearRunningApplications();
+    if (runningApplications == null) {
+      return;
+    }
+    Iterable<ApplicationIdProto> it = new Iterable<ApplicationIdProto>() {
+      
+      @Override
+      public Iterator<ApplicationIdProto> iterator() {
+        return new Iterator<ApplicationIdProto>() {
+          Iterator<ApplicationId> iter = runningApplications.iterator();
+          
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+          
+          @Override
+          public ApplicationIdProto next() {
+            return convertToProtoFormat(iter.next());  
+          }
+          
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+    builder.addAllRunningApplications(it);
+  }
 
   @Override
   public List<NMContainerStatus> getNMContainerStatuses() {
@@ -216,6 +291,14 @@ public class RegisterNodeManagerRequestP
     maybeInitBuilder();
     builder.setNmVersion(version);
   }
+  
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl)t).getProto();
+  }
 
   private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) {
     return new NodeIdPBImpl(p);

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto Wed Jun 18 05:09:28 2014
@@ -31,6 +31,7 @@ message RegisterNodeManagerRequestProto 
   optional ResourceProto resource = 4;
   optional string nm_version = 5;
   repeated NMContainerStatusProto container_statuses = 6;
+  repeated ApplicationIdProto runningApplications = 7;
 }
 
 message RegisterNodeManagerResponseProto {
@@ -66,4 +67,4 @@ message NMContainerStatusProto {
   optional PriorityProto priority = 4;
   optional string diagnostics = 5 [default = "N/A"];
   optional int32 container_exit_status = 6;
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java Wed Jun 18 05:09:28 2014
@@ -83,7 +83,8 @@ public class TestProtocolRecords {
     RegisterNodeManagerRequest request =
         RegisterNodeManagerRequest.newInstance(
           NodeId.newInstance("1.1.1.1", 1000), 8080,
-          Resource.newInstance(1024, 1), "NM-version-id", reports);
+            Resource.newInstance(1024, 1), "NM-version-id", reports,
+            Arrays.asList(appId));
     RegisterNodeManagerRequest requestProto =
         new RegisterNodeManagerRequestPBImpl(
           ((RegisterNodeManagerRequestPBImpl) request).getProto());
@@ -95,5 +96,7 @@ public class TestProtocolRecords {
       requestProto.getNodeId());
     Assert.assertEquals(Resource.newInstance(1024, 1),
       requestProto.getResource());
+    Assert.assertEquals(1, requestProto.getRunningApplications().size());
+    Assert.assertEquals(appId, requestProto.getRunningApplications().get(0)); 
   }
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Wed Jun 18 05:09:28 2014
@@ -21,10 +21,13 @@ package org.apache.hadoop.yarn.server.no
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -40,6 +43,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -50,6 +57,8 @@ public class DeletionService extends Abs
   private final ContainerExecutor exec;
   private ScheduledThreadPoolExecutor sched;
   private static final FileContext lfs = getLfs();
+  private final NMStateStoreService stateStore;
+  private AtomicInteger nextTaskId = new AtomicInteger(0);
 
   static final FileContext getLfs() {
     try {
@@ -60,14 +69,18 @@ public class DeletionService extends Abs
   }
 
   public DeletionService(ContainerExecutor exec) {
+    this(exec, new NMNullStateStoreService());
+  }
+
+  public DeletionService(ContainerExecutor exec,
+      NMStateStoreService stateStore) {
     super(DeletionService.class.getName());
     this.exec = exec;
     this.debugDelay = 0;
+    this.stateStore = stateStore;
   }
   
   /**
-   * 
-  /**
    * Delete the path(s) as this user.
    * @param user The user to delete as, or the JVM user if null
    * @param subDir the sub directory name
@@ -76,19 +89,20 @@ public class DeletionService extends Abs
   public void delete(String user, Path subDir, Path... baseDirs) {
     // TODO if parent owned by NM, rename within parent inline
     if (debugDelay != -1) {
-      if (baseDirs == null || baseDirs.length == 0) {
-        sched.schedule(new FileDeletionTask(this, user, subDir, null),
-          debugDelay, TimeUnit.SECONDS);
-      } else {
-        sched.schedule(
-          new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)),
-          debugDelay, TimeUnit.SECONDS);
+      List<Path> baseDirList = null;
+      if (baseDirs != null && baseDirs.length != 0) {
+        baseDirList = Arrays.asList(baseDirs);
       }
+      FileDeletionTask task =
+          new FileDeletionTask(this, user, subDir, baseDirList);
+      recordDeletionTaskInStateStore(task);
+      sched.schedule(task, debugDelay, TimeUnit.SECONDS);
     }
   }
   
   public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
     if (debugDelay != -1) {
+      recordDeletionTaskInStateStore(fileDeletionTask);
       sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
     }
   }
@@ -109,6 +123,9 @@ public class DeletionService extends Abs
     }
     sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
     sched.setKeepAliveTime(60L, SECONDS);
+    if (stateStore.canRecover()) {
+      recover(stateStore.loadDeletionServiceState());
+    }
     super.serviceInit(conf);
   }
 
@@ -139,6 +156,8 @@ public class DeletionService extends Abs
   }
 
   public static class FileDeletionTask implements Runnable {
+    public static final int INVALID_TASK_ID = -1;
+    private int taskId;
     private final String user;
     private final Path subDir;
     private final List<Path> baseDirs;
@@ -152,6 +171,12 @@ public class DeletionService extends Abs
     
     private FileDeletionTask(DeletionService delService, String user,
         Path subDir, List<Path> baseDirs) {
+      this(INVALID_TASK_ID, delService, user, subDir, baseDirs);
+    }
+
+    private FileDeletionTask(int taskId, DeletionService delService,
+        String user, Path subDir, List<Path> baseDirs) {
+      this.taskId = taskId;
       this.delService = delService;
       this.user = user;
       this.subDir = subDir;
@@ -198,6 +223,12 @@ public class DeletionService extends Abs
       return this.success;
     }
     
+    public synchronized FileDeletionTask[] getSuccessorTasks() {
+      FileDeletionTask[] successors =
+          new FileDeletionTask[successorTaskSet.size()];
+      return successorTaskSet.toArray(successors);
+    }
+
     @Override
     public void run() {
       if (LOG.isDebugEnabled()) {
@@ -286,6 +317,12 @@ public class DeletionService extends Abs
      * dependent tasks of it has failed marking its success = false.  
      */
     private synchronized void fileDeletionTaskFinished() {
+      try {
+        delService.stateStore.removeDeletionTask(taskId);
+      } catch (IOException e) {
+        LOG.error("Unable to remove deletion task " + taskId
+            + " from state store", e);
+      }
       Iterator<FileDeletionTask> successorTaskI =
           this.successorTaskSet.iterator();
       while (successorTaskI.hasNext()) {
@@ -318,4 +355,129 @@ public class DeletionService extends Abs
       Path[] baseDirs) {
     return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
   }
+
+  private void recover(RecoveredDeletionServiceState state)
+      throws IOException {
+    List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
+    Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
+        new HashMap<Integer, DeletionTaskRecoveryInfo>(taskProtos.size());
+    Set<Integer> successorTasks = new HashSet<Integer>();
+    for (DeletionServiceDeleteTaskProto proto : taskProtos) {
+      DeletionTaskRecoveryInfo info = parseTaskProto(proto);
+      idToInfoMap.put(info.task.taskId, info);
+      nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId));
+      successorTasks.addAll(info.successorTaskIds);
+    }
+
+    // restore the task dependencies and schedule the deletion tasks that
+    // have no predecessors
+    final long now = System.currentTimeMillis();
+    for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) {
+      for (Integer successorId : info.successorTaskIds){
+        DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId);
+        if (successor != null) {
+          info.task.addFileDeletionTaskDependency(successor.task);
+        } else {
+          LOG.error("Unable to locate dependency task for deletion task "
+              + info.task.taskId + " at " + info.task.getSubDir());
+        }
+      }
+      if (!successorTasks.contains(info.task.taskId)) {
+        long msecTilDeletion = info.deletionTimestamp - now;
+        sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS);
+      }
+    }
+  }
+
+  private DeletionTaskRecoveryInfo parseTaskProto(
+      DeletionServiceDeleteTaskProto proto) throws IOException {
+    int taskId = proto.getId();
+    String user = proto.hasUser() ? proto.getUser() : null;
+    Path subdir = null;
+    List<Path> basePaths = null;
+    if (proto.hasSubdir()) {
+      subdir = new Path(proto.getSubdir());
+    }
+    List<String> basedirs = proto.getBasedirsList();
+    if (basedirs != null && basedirs.size() > 0) {
+      basePaths = new ArrayList<Path>(basedirs.size());
+      for (String basedir : basedirs) {
+        basePaths.add(new Path(basedir));
+      }
+    }
+
+    FileDeletionTask task = new FileDeletionTask(taskId, this, user,
+        subdir, basePaths);
+    return new DeletionTaskRecoveryInfo(task,
+        proto.getSuccessorIdsList(),
+        proto.getDeletionTime());
+  }
+
+  private int generateTaskId() {
+    // get the next ID but avoid an invalid ID
+    int taskId = nextTaskId.incrementAndGet();
+    while (taskId == FileDeletionTask.INVALID_TASK_ID) {
+      taskId = nextTaskId.incrementAndGet();
+    }
+    return taskId;
+  }
+
+  private void recordDeletionTaskInStateStore(FileDeletionTask task) {
+    if (!stateStore.canRecover()) {
+      // optimize the case where we aren't really recording
+      return;
+    }
+    if (task.taskId != FileDeletionTask.INVALID_TASK_ID) {
+      return;  // task already recorded
+    }
+
+    task.taskId = generateTaskId();
+
+    FileDeletionTask[] successors = task.getSuccessorTasks();
+
+    // store successors first to ensure task IDs have been generated for them
+    for (FileDeletionTask successor : successors) {
+      recordDeletionTaskInStateStore(successor);
+    }
+
+    DeletionServiceDeleteTaskProto.Builder builder =
+        DeletionServiceDeleteTaskProto.newBuilder();
+    builder.setId(task.taskId);
+    if (task.getUser() != null) {
+      builder.setUser(task.getUser());
+    }
+    if (task.getSubDir() != null) {
+      builder.setSubdir(task.getSubDir().toString());
+    }
+    builder.setDeletionTime(System.currentTimeMillis() +
+        TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS));
+    if (task.getBaseDirs() != null) {
+      for (Path dir : task.getBaseDirs()) {
+        builder.addBasedirs(dir.toString());
+      }
+    }
+    for (FileDeletionTask successor : successors) {
+      builder.addSuccessorIds(successor.taskId);
+    }
+
+    try {
+      stateStore.storeDeletionTask(task.taskId, builder.build());
+    } catch (IOException e) {
+      LOG.error("Unable to store deletion task " + task.taskId + " for "
+          + task.getSubDir(), e);
+    }
+  }
+
+  private static class DeletionTaskRecoveryInfo {
+    FileDeletionTask task;
+    List<Integer> successorTaskIds;
+    long deletionTimestamp;
+
+    public DeletionTaskRecoveryInfo(FileDeletionTask task,
+        List<Integer> successorTaskIds, long deletionTimestamp) {
+      this.task = task;
+      this.successorTaskIds = successorTaskIds;
+      this.deletionTimestamp = deletionTimestamp;
+    }
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Wed Jun 18 05:09:28 2014
@@ -114,7 +114,7 @@ public class NodeManager extends Composi
   }
 
   protected DeletionService createDeletionService(ContainerExecutor exec) {
-    return new DeletionService(exec);
+    return new DeletionService(exec, nmStore);
   }
 
   protected NMContext createNMContext(

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Jun 18 05:09:28 2014
@@ -250,7 +250,7 @@ public class NodeStatusUpdaterImpl exten
     List<NMContainerStatus> containerReports = getNMContainerStatuses();
     RegisterNodeManagerRequest request =
         RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
-          nodeManagerVersionId, containerReports);
+          nodeManagerVersionId, containerReports, getRunningApplications());
     if (containerReports != null) {
       LOG.info("Registering with RM using containers :" + containerReports);
     }
@@ -374,6 +374,12 @@ public class NodeStatusUpdaterImpl exten
     }
     return containerStatuses;
   }
+  
+  private List<ApplicationId> getRunningApplications() {
+    List<ApplicationId> runningApplications = new ArrayList<ApplicationId>();
+    runningApplications.addAll(this.context.getApplications().keySet());
+    return runningApplications;
+  }
 
   // These NMContainerStatus are sent on NM registration and used by YARN only.
   private List<NMContainerStatus> getNMContainerStatuses() {

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java Wed Jun 18 05:09:28 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -58,6 +59,9 @@ public class NMLeveldbStateStoreService 
   private static final String DB_SCHEMA_VERSION_KEY = "schema-version";
   private static final String DB_SCHEMA_VERSION = "1.0";
 
+  private static final String DELETION_TASK_KEY_PREFIX =
+      "DeletionService/deltask_";
+
   private static final String LOCALIZATION_KEY_PREFIX = "Localization/";
   private static final String LOCALIZATION_PUBLIC_KEY_PREFIX =
       LOCALIZATION_KEY_PREFIX + "public/";
@@ -91,8 +95,9 @@ public class NMLeveldbStateStoreService 
       throws IOException {
     RecoveredLocalizationState state = new RecoveredLocalizationState();
 
+    LeveldbIterator iter = null;
     try {
-      LeveldbIterator iter = new LeveldbIterator(db);
+      iter = new LeveldbIterator(db);
       iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX));
       state.publicTrackerState = loadResourceTrackerState(iter,
           LOCALIZATION_PUBLIC_KEY_PREFIX);
@@ -118,6 +123,10 @@ public class NMLeveldbStateStoreService 
       }
     } catch (DBException e) {
       throw new IOException(e.getMessage(), e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
     }
 
     return state;
@@ -309,6 +318,56 @@ public class NMLeveldbStateStoreService 
 
 
   @Override
+  public RecoveredDeletionServiceState loadDeletionServiceState()
+      throws IOException {
+    RecoveredDeletionServiceState state = new RecoveredDeletionServiceState();
+    state.tasks = new ArrayList<DeletionServiceDeleteTaskProto>();
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(DELETION_TASK_KEY_PREFIX));
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.next();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) {
+          break;
+        }
+        state.tasks.add(
+            DeletionServiceDeleteTaskProto.parseFrom(entry.getValue()));
+      }
+    } catch (DBException e) {
+      throw new IOException(e.getMessage(), e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    return state;
+  }
+
+  @Override
+  public void storeDeletionTask(int taskId,
+      DeletionServiceDeleteTaskProto taskProto) throws IOException {
+    String key = DELETION_TASK_KEY_PREFIX + taskId;
+    try {
+      db.put(bytes(key), taskProto.toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void removeDeletionTask(int taskId) throws IOException {
+    String key = DELETION_TASK_KEY_PREFIX + taskId;
+    try {
+      db.delete(bytes(key));
+    } catch (DBException e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+
+  @Override
   protected void initStorage(Configuration conf)
       throws IOException {
     Path storeRoot = createStorageDir(conf);

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java Wed Jun 18 05:09:28 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 
 // The state store to use when state isn't being stored
@@ -61,6 +62,22 @@ public class NMNullStateStoreService ext
   }
 
   @Override
+  public RecoveredDeletionServiceState loadDeletionServiceState()
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "Recovery not supported by this state store");
+  }
+
+  @Override
+  public void storeDeletionTask(int taskId,
+      DeletionServiceDeleteTaskProto taskProto) throws IOException {
+  }
+
+  @Override
+  public void removeDeletionTask(int taskId) throws IOException {
+  }
+
+  @Override
   protected void initStorage(Configuration conf) throws IOException {
   }
 

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Wed Jun 18 05:09:28 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 
 @Private
@@ -91,6 +92,14 @@ public abstract class NMStateStoreServic
     }
   }
 
+  public static class RecoveredDeletionServiceState {
+    List<DeletionServiceDeleteTaskProto> tasks;
+
+    public List<DeletionServiceDeleteTaskProto> getTasks() {
+      return tasks;
+    }
+  }
+
   /** Initialize the state storage */
   @Override
   public void serviceInit(Configuration conf) throws IOException {
@@ -155,6 +164,15 @@ public abstract class NMStateStoreServic
       ApplicationId appId, Path localPath) throws IOException;
 
 
+  public abstract RecoveredDeletionServiceState loadDeletionServiceState()
+      throws IOException;
+
+  public abstract void storeDeletionTask(int taskId,
+      DeletionServiceDeleteTaskProto taskProto) throws IOException;
+
+  public abstract void removeDeletionTask(int taskId) throws IOException;
+
+
   protected abstract void initStorage(Configuration conf) throws IOException;
 
   protected abstract void startStorage() throws IOException;

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto Wed Jun 18 05:09:28 2014
@@ -24,6 +24,15 @@ package hadoop.yarn;
 
 import "yarn_protos.proto";
 
+message DeletionServiceDeleteTaskProto {
+  optional int32 id = 1;
+  optional string user = 2;
+  optional string subdir = 3;
+  optional int64 deletionTime = 4;
+  repeated string basedirs = 5;
+  repeated int32 successorIds = 6;
+}
+
 message LocalizedResourceProto {
   optional LocalResourceProto resource = 1;
   optional string localPath = 2;

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java Wed Jun 18 05:09:28 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -285,4 +286,58 @@ public class TestDeletionService {
       del.stop();
     }
   }
+
+  @Test
+  public void testRecovery() throws Exception {
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    System.out.println("SEED: " + seed);
+    List<Path> baseDirs = buildDirs(r, base, 4);
+    createDirs(new Path("."), baseDirs);
+    List<Path> content = buildDirs(r, new Path("."), 10);
+    for (Path b : baseDirs) {
+      createDirs(b, content);
+    }
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 1);
+    NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    DeletionService del =
+      new DeletionService(new FakeDefaultContainerExecutor(), stateStore);
+    try {
+      del.init(conf);
+      del.start();
+      for (Path p : content) {
+        assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p)));
+        del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
+            p, baseDirs.toArray(new Path[4]));
+      }
+
+      // restart the deletion service
+      del.stop();
+      del = new DeletionService(new FakeDefaultContainerExecutor(),
+          stateStore);
+      del.init(conf);
+      del.start();
+
+      // verify paths are still eventually deleted
+      int msecToWait = 10 * 1000;
+      for (Path p : baseDirs) {
+        for (Path q : content) {
+          Path fp = new Path(p, q);
+          while (msecToWait > 0 && lfs.util().exists(fp)) {
+            Thread.sleep(100);
+            msecToWait -= 100;
+          }
+          assertFalse(lfs.util().exists(fp));
+        }
+      }
+    } finally {
+      del.close();
+      stateStore.close();
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java Wed Jun 18 05:09:28 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.recovery;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -25,10 +27,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 
 public class NMMemoryStateStoreService extends NMStateStoreService {
   private Map<TrackerKey, TrackerState> trackerStates;
+  private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
 
   public NMMemoryStateStoreService() {
     super(NMMemoryStateStoreService.class.getName());
@@ -110,6 +114,7 @@ public class NMMemoryStateStoreService e
   @Override
   protected void initStorage(Configuration conf) {
     trackerStates = new HashMap<TrackerKey, TrackerState>();
+    deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
   }
 
   @Override
@@ -121,6 +126,28 @@ public class NMMemoryStateStoreService e
   }
 
 
+  @Override
+  public RecoveredDeletionServiceState loadDeletionServiceState()
+      throws IOException {
+    RecoveredDeletionServiceState result =
+        new RecoveredDeletionServiceState();
+    result.tasks = new ArrayList<DeletionServiceDeleteTaskProto>(
+        deleteTasks.values());
+    return result;
+  }
+
+  @Override
+  public synchronized void storeDeletionTask(int taskId,
+      DeletionServiceDeleteTaskProto taskProto) throws IOException {
+    deleteTasks.put(taskId, taskProto);
+  }
+
+  @Override
+  public synchronized void removeDeletionTask(int taskId) throws IOException {
+    deleteTasks.remove(taskId);
+  }
+
+
   private static class TrackerState {
     Map<Path, LocalResourceProto> inProgressMap =
         new HashMap<Path, LocalResourceProto>();

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Wed Jun 18 05:09:28 2014
@@ -35,8 +35,10 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -404,4 +406,58 @@ public class TestNMLeveldbStateStoreServ
         state.getUserResources();
     assertTrue(userResources.isEmpty());
   }
+
+  @Test
+  public void testDeletionTaskStorage() throws IOException {
+    // test empty when no state
+    RecoveredDeletionServiceState state =
+        stateStore.loadDeletionServiceState();
+    assertTrue(state.getTasks().isEmpty());
+
+    // store a deletion task and verify recovered
+    DeletionServiceDeleteTaskProto proto =
+        DeletionServiceDeleteTaskProto.newBuilder()
+        .setId(7)
+        .setUser("someuser")
+        .setSubdir("some/subdir")
+        .addBasedirs("some/dir/path")
+        .addBasedirs("some/other/dir/path")
+        .setDeletionTime(123456L)
+        .addSuccessorIds(8)
+        .addSuccessorIds(9)
+        .build();
+    stateStore.storeDeletionTask(proto.getId(), proto);
+    restartStateStore();
+    state = stateStore.loadDeletionServiceState();
+    assertEquals(1, state.getTasks().size());
+    assertEquals(proto, state.getTasks().get(0));
+
+    // store another deletion task
+    DeletionServiceDeleteTaskProto proto2 =
+        DeletionServiceDeleteTaskProto.newBuilder()
+        .setId(8)
+        .setUser("user2")
+        .setSubdir("subdir2")
+        .setDeletionTime(789L)
+        .build();
+    stateStore.storeDeletionTask(proto2.getId(), proto2);
+    restartStateStore();
+    state = stateStore.loadDeletionServiceState();
+    assertEquals(2, state.getTasks().size());
+    assertTrue(state.getTasks().contains(proto));
+    assertTrue(state.getTasks().contains(proto2));
+
+    // delete a task and verify gone after recovery
+    stateStore.removeDeletionTask(proto2.getId());
+    restartStateStore();
+    state = stateStore.loadDeletionServiceState();
+    assertEquals(1, state.getTasks().size());
+    assertEquals(proto, state.getTasks().get(0));
+
+    // delete the last task and verify none left
+    stateStore.removeDeletionTask(proto.getId());
+    restartStateStore();
+    state = stateStore.loadDeletionServiceState();
+    assertTrue(state.getTasks().isEmpty());
+  }
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Jun 18 05:09:28 2014
@@ -327,7 +327,7 @@ public class ResourceManager extends Com
    * RMActiveServices handles all the Active services in the RM.
    */
   @Private
-  class RMActiveServices extends CompositeService {
+  public class RMActiveServices extends CompositeService {
 
     private DelegationTokenRenewer delegationTokenRenewer;
     private EventHandler<SchedulerEvent> schedulerDispatcher;
@@ -526,11 +526,9 @@ public class ResourceManager extends Com
                   (PreemptableResourceScheduler) scheduler));
           for (SchedulingEditPolicy policy : policies) {
             LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
-            policy.init(conf, rmContext.getDispatcher().getEventHandler(),
-                (PreemptableResourceScheduler) scheduler);
             // periodically check whether we need to take action to guarantee
             // constraints
-            SchedulingMonitor mon = new SchedulingMonitor(policy);
+            SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
             addService(mon);
           }
         } else {

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Wed Jun 18 05:09:28 2014
@@ -244,15 +244,6 @@ public class ResourceTrackerService exte
     Resource capability = request.getResource();
     String nodeManagerVersion = request.getNMVersion();
 
-    if (!rmContext.isWorkPreservingRecoveryEnabled()) {
-      if (!request.getNMContainerStatuses().isEmpty()) {
-        LOG.info("received container statuses on node manager register :"
-            + request.getNMContainerStatuses());
-        for (NMContainerStatus status : request.getNMContainerStatuses()) {
-          handleNMContainerStatus(status);
-        }
-      }
-    }
     RegisterNodeManagerResponse response = recordFactory
         .newRecordInstance(RegisterNodeManagerResponse.class);
 
@@ -311,7 +302,8 @@ public class ResourceTrackerService exte
     RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
     if (oldNode == null) {
       this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses()));
+              new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(),
+                  request.getRunningApplications()));
     } else {
       LOG.info("Reconnect from the node at: " + host);
       this.nmLivelinessMonitor.unregister(nodeId);
@@ -322,6 +314,18 @@ public class ResourceTrackerService exte
     // present for any running application.
     this.nmTokenSecretManager.removeNodeKey(nodeId);
     this.nmLivelinessMonitor.register(nodeId);
+    
+    // Handle received container status, this should be processed after new
+    // RMNode inserted
+    if (!rmContext.isWorkPreservingRecoveryEnabled()) {
+      if (!request.getNMContainerStatuses().isEmpty()) {
+        LOG.info("received container statuses on node manager register :"
+            + request.getNMContainerStatuses());
+        for (NMContainerStatus status : request.getNMContainerStatuses()) {
+          handleNMContainerStatus(status);
+        }
+      }
+    }
 
     String message =
         "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java Wed Jun 18 05:09:28 2014
@@ -21,6 +21,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -34,18 +36,29 @@ public class SchedulingMonitor extends A
   private Thread checkerThread;
   private volatile boolean stopped;
   private long monitorInterval;
+  private RMContext rmContext;
 
-  public SchedulingMonitor(SchedulingEditPolicy scheduleEditPolicy) {
+  public SchedulingMonitor(RMContext rmContext,
+      SchedulingEditPolicy scheduleEditPolicy) {
     super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")");
     this.scheduleEditPolicy = scheduleEditPolicy;
-    this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
+    this.rmContext = rmContext;
   }
 
   public long getMonitorInterval() {
     return monitorInterval;
   }
+  
+  @VisibleForTesting
+  public synchronized SchedulingEditPolicy getSchedulingEditPolicy() {
+    return scheduleEditPolicy;
+  }
 
+  @SuppressWarnings("unchecked")
   public void serviceInit(Configuration conf) throws Exception {
+    scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(),
+        (PreemptableResourceScheduler) rmContext.getScheduler());
+    this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
     super.serviceInit(conf);
   }
 

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java Wed Jun 18 05:09:28 2014
@@ -165,6 +165,11 @@ public class ProportionalCapacityPreempt
     observeOnly = config.getBoolean(OBSERVE_ONLY, false);
     rc = scheduler.getResourceCalculator();
   }
+  
+  @VisibleForTesting
+  public ResourceCalculator getResourceCalculator() {
+    return rc;
+  }
 
   @Override
   public void editSchedule(){
@@ -203,7 +208,9 @@ public class ProportionalCapacityPreempt
     Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
         getContainersToPreempt(queues, clusterResources);
 
-    logToCSV(queues);
+    if (LOG.isDebugEnabled()) {
+      logToCSV(queues);
+    }
 
     // if we are in observeOnly mode return before any action is taken
     if (observeOnly) {
@@ -603,7 +610,7 @@ public class ProportionalCapacityPreempt
       sb.append(", ");
       tq.appendLogString(sb);
     }
-    LOG.info(sb.toString());
+    LOG.debug(sb.toString());
   }
 
   /**

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Wed Jun 18 05:09:28 2014
@@ -19,16 +19,16 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import java.util.Collection;
-
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -208,6 +208,14 @@ public interface RMApp extends EventHand
    * @return the flag indicating whether the applications's state is stored.
    */
   boolean isAppFinalStateStored();
+  
+  
+  /**
+   * Nodes on which the containers for this {@link RMApp} ran.
+   * @return the set of nodes that ran any containers from this {@link RMApp}
+   * Add more node on which containers for this {@link RMApp} ran
+   */
+  Set<NodeId> getRanNodes();
 
   /**
    * Create the external user-facing state of ApplicationMaster from the

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Wed Jun 18 05:09:28 2014
@@ -38,6 +38,9 @@ public enum RMAppEventType {
   ATTEMPT_FAILED,
   ATTEMPT_KILLED,
   NODE_UPDATE,
+  
+  // Source: Container and ResourceTracker
+  APP_RUNNING_ON_NODE,
 
   // Source: RMStateStore
   APP_NEW_SAVED,

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1603355&r1=1603354&r2=1603355&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Jun 18 05:09:28 2014
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -71,7 +72,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -116,6 +116,7 @@ public class RMAppImpl implements RMApp,
   private EventHandler handler;
   private static final AppFinishedTransition FINISHED_TRANSITION =
       new AppFinishedTransition();
+  private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
 
   // These states stored are only valid when app is at killing or final_saving.
   private RMAppState stateBeforeKilling;
@@ -180,7 +181,6 @@ public class RMAppImpl implements RMApp,
         new FinalSavingTransition(
           new AppKilledTransition(), RMAppState.KILLED))
 
-
      // Transitions from ACCEPTED state
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
@@ -200,6 +200,9 @@ public class RMAppImpl implements RMApp,
         new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
     .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
         RMAppEventType.KILL, new KillAttemptTransition())
+    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     // ACCECPTED state can once again receive APP_ACCEPTED event, because on
     // recovery the app returns ACCEPTED state and the app once again go
     // through the scheduler and triggers one more APP_ACCEPTED event at
@@ -220,6 +223,9 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
       // UnManagedAM directly jumps to finished
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     .addTransition(RMAppState.RUNNING,
         EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
         RMAppEventType.ATTEMPT_FAILED,
@@ -235,6 +241,9 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_FINISHED,
         new AttemptFinishedAtFinalSavingTransition())
+    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
         EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@@ -243,6 +252,9 @@ public class RMAppImpl implements RMApp,
      // Transitions from FINISHING state
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+    .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
       EnumSet.of(RMAppEventType.NODE_UPDATE,
@@ -251,6 +263,9 @@ public class RMAppImpl implements RMApp,
         RMAppEventType.KILL))
 
      // Transitions from KILLING state
+    .addTransition(RMAppState.KILLING, RMAppState.KILLING, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_KILLED,
         new FinalSavingTransition(
@@ -267,6 +282,9 @@ public class RMAppImpl implements RMApp,
 
      // Transitions from FINISHED state
      // ignorable transitions
+    .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
         EnumSet.of(
             RMAppEventType.NODE_UPDATE,
@@ -276,11 +294,17 @@ public class RMAppImpl implements RMApp,
 
      // Transitions from FAILED state
      // ignorable transitions
+    .addTransition(RMAppState.FAILED, RMAppState.FAILED, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     .addTransition(RMAppState.FAILED, RMAppState.FAILED,
         EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
 
      // Transitions from KILLED state
      // ignorable transitions
+    .addTransition(RMAppState.KILLED, RMAppState.KILLED, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     .addTransition(
         RMAppState.KILLED,
         RMAppState.KILLED,
@@ -695,6 +719,23 @@ public class RMAppImpl implements RMApp,
           nodeUpdateEvent.getNode());
     };
   }
+  
+  private static final class AppRunningOnNodeTransition extends RMAppTransition {
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event;
+      
+      // if final state already stored, notify RMNode
+      if (isAppInFinalState(app)) {
+        app.handler.handle(
+            new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent
+                .getApplicationId()));
+        return;
+      }
+      
+      // otherwise, add it to ranNodes for further process
+      app.ranNodes.add(nodeAddedEvent.getNodeId());
+    };
+  }
 
   /**
    * Move an app to a new queue.
@@ -1037,17 +1078,8 @@ public class RMAppImpl implements RMApp,
       this.finalState = finalState;
     }
 
-    private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) {
-      Set<NodeId> nodes = new HashSet<NodeId>();
-      for (RMAppAttempt attempt : app.attempts.values()) {
-        nodes.addAll(attempt.getRanNodes());
-      }
-      return nodes;
-    }
-
     public void transition(RMAppImpl app, RMAppEvent event) {
-      Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
-      for (NodeId nodeId : nodes) {
+      for (NodeId nodeId : app.getRanNodes()) {
         app.handler.handle(
             new RMNodeCleanAppEvent(nodeId, app.applicationId));
       }
@@ -1148,4 +1180,9 @@ public class RMAppImpl implements RMApp,
   private RMAppState getRecoveredFinalState() {
     return this.recoveredFinalState;
   }
+
+  @Override
+  public Set<NodeId> getRanNodes() {
+    return ranNodes;
+  }
 }