You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/12/08 05:19:17 UTC

svn commit: r1548991 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-common/src/main/resources/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-serv...

Author: vinodkv
Date: Sun Dec  8 04:19:16 2013
New Revision: 1548991

URL: http://svn.apache.org/r1548991
Log:
YARN-1378. Implemented a cleaner of old finished applications from the RM state-store. Contributed by Jian He.
svn merge --ignore-ancestry -c 1548990 ../../trunk/

Removed:
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRemovedEvent.java
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sun Dec  8 04:19:16 2013
@@ -130,6 +130,9 @@ Release 2.4.0 - UNRELEASED
     YARN-807. When querying apps by queue, iterating over all apps is
     inefficient and limiting (Sandy Ryza)
 
+    YARN-1378. Implemented a cleaner of old finished applications from the RM
+    state-store. (Jian He via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Sun Dec  8 04:19:16 2013
@@ -342,7 +342,16 @@ public class YarnConfiguration extends C
   public static final String RM_MAX_COMPLETED_APPLICATIONS =
     RM_PREFIX + "max-completed-applications";
   public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000;
-  
+
+  /**
+   * The maximum number of completed applications RM state store keeps, by
+   * default equals to DEFAULT_RM_MAX_COMPLETED_APPLICATIONS
+   */
+  public static final String RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS =
+      RM_PREFIX + "state-store.max-completed-applications";
+  public static final int DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS =
+      DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
+
   /** Default application name */
   public static final String DEFAULT_APPLICATION_NAME = "N/A";
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Sun Dec  8 04:19:16 2013
@@ -276,6 +276,21 @@
   </property>
 
   <property>
+    <description>The maximum number of completed applications RM state
+    store keeps, less than or equals to ${yarn.resourcemanager.max-completed-applications}.
+    By default, it equals to ${yarn.resourcemanager.max-completed-applications}.
+    This ensures that the applications kept in the state store are consistent with
+    the applications remembered in RM memory.
+    Any values larger than ${yarn.resourcemanager.max-completed-applications} will
+    be reset to ${yarn.resourcemanager.max-completed-applications}.
+    Note that this value impacts the RM recovery performance.Typically,
+    a smaller value indicates better performance on RM recovery.
+    </description>
+    <name>yarn.resourcemanager.state-store.max-completed-applications</name>
+    <value>${yarn.resourcemanager.max-completed-applications}</value>
+  </property>
+
+  <property>
     <description>Host:Port of the ZooKeeper server where RM state will 
     be stored. This must be supplied when using
     org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Sun Dec  8 04:19:16 2013
@@ -65,7 +65,9 @@ public class RMAppManager implements Eve
 
   private static final Log LOG = LogFactory.getLog(RMAppManager.class);
 
-  private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
+  private int maxCompletedAppsInMemory;
+  private int maxCompletedAppsInStateStore;
+  protected int completedAppsInStateStore = 0;
   private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
 
   private final RMContext rmContext;
@@ -82,9 +84,16 @@ public class RMAppManager implements Eve
     this.masterService = masterService;
     this.applicationACLsManager = applicationACLsManager;
     this.conf = conf;
-    setCompletedAppsMax(conf.getInt(
+    this.maxCompletedAppsInMemory = conf.getInt(
         YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
-        YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS));
+        YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
+    this.maxCompletedAppsInStateStore =
+        conf.getInt(
+          YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
+          YarnConfiguration.DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS);
+    if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) {
+      this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
+    }
   }
 
   /**
@@ -173,10 +182,6 @@ public class RMAppManager implements Eve
     ApplicationSummary.logAppSummary(rmContext.getRMApps().get(appId));
   }
 
-  protected synchronized void setCompletedAppsMax(int max) {
-    this.completedAppsMax = max;
-  }
-
   protected synchronized int getCompletedAppsListSize() {
     return this.completedApps.size(); 
   }
@@ -190,7 +195,8 @@ public class RMAppManager implements Eve
         rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);
       }
       
-      completedApps.add(applicationId);  
+      completedApps.add(applicationId);
+      completedAppsInStateStore++;
       writeAuditLog(applicationId);
     }
   }
@@ -229,15 +235,31 @@ public class RMAppManager implements Eve
    * check to see if hit the limit for max # completed apps kept
    */
   protected synchronized void checkAppNumCompletedLimit() {
-    while (completedApps.size() > this.completedAppsMax) {
-      ApplicationId removeId = completedApps.remove();  
-      LOG.info("Application should be expired, max # apps"
-          + " met. Removing app: " + removeId); 
+    // check apps kept in state store.
+    while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) {
+      ApplicationId removeId =
+          completedApps.get(completedApps.size() - completedAppsInStateStore);
+      RMApp removeApp = rmContext.getRMApps().get(removeId);
+      LOG.info("Max number of completed apps kept in state store met:"
+          + " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore
+          + ", removing app " + removeApp.getApplicationId()
+          + " from state store.");
+      rmContext.getStateStore().removeApplication(removeApp);
+      completedAppsInStateStore--;
+    }
+
+    // check apps kept in memorty.
+    while (completedApps.size() > this.maxCompletedAppsInMemory) {
+      ApplicationId removeId = completedApps.remove();
+      LOG.info("Application should be expired, max number of completed apps"
+          + " kept in memory met: maxCompletedAppsInMemory = "
+          + this.maxCompletedAppsInMemory + ", removing app " + removeId
+          + " from memory: ");
       rmContext.getRMApps().remove(removeId);
       this.applicationACLsManager.removeApplication(removeId);
     }
   }
-  
+
   @SuppressWarnings("unchecked")
   protected void submitApplication(
       ApplicationSubmissionContext submissionContext, long submitTime,
@@ -380,8 +402,6 @@ public class RMAppManager implements Eve
     Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
     LOG.info("Recovering " + appStates.size() + " applications");
     for (ApplicationState appState : appStates.values()) {
-      LOG.info("Recovering application " + appState.getAppId());
-      
       submitApplication(appState.getApplicationSubmissionContext(),
         appState.getSubmitTime(), appState.getUser(), true, state);
     }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Sun Dec  8 04:19:16 2013
@@ -167,7 +167,9 @@ public class FileSystemRMStateStore exte
               readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
           if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
             // application
-            LOG.info("Loading application from node: " + childNodeName);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Loading application from node: " + childNodeName);
+            }
             ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
             ApplicationStateDataPBImpl appStateData =
                 new ApplicationStateDataPBImpl(
@@ -185,7 +187,10 @@ public class FileSystemRMStateStore exte
           } else if (childNodeName
             .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
             // attempt
-            LOG.info("Loading application attempt from node: " + childNodeName);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Loading application attempt from node: "
+                  + childNodeName);
+            }
             ApplicationAttemptId attemptId =
                 ConverterUtils.toApplicationAttemptId(childNodeName);
             ApplicationAttemptStateDataPBImpl attemptStateData =
@@ -225,6 +230,7 @@ public class FileSystemRMStateStore exte
         assert appState != null;
         appState.attempts.put(attemptState.getAttemptId(), attemptState);
       }
+      LOG.info("Done Loading applications from FS state store");
     } catch (Exception e) {
       LOG.error("Failed to load state.", e);
       throw e;
@@ -362,7 +368,7 @@ public class FileSystemRMStateStore exte
   }
 
   @Override
-  public synchronized void removeApplicationState(ApplicationState appState)
+  public synchronized void removeApplicationStateInternal(ApplicationState appState)
       throws Exception {
     String appId = appState.getAppId().toString();
     Path nodeRemovePath = getAppDir(rmAppRoot, appId);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Sun Dec  8 04:19:16 2013
@@ -171,8 +171,8 @@ public class MemoryRMStateStore extends 
   }
 
   @Override
-  public synchronized void removeApplicationState(ApplicationState appState) 
-                                                            throws Exception {
+  public synchronized void removeApplicationStateInternal(
+      ApplicationState appState) throws Exception {
     ApplicationId appId = appState.getAppId();
     ApplicationState removed = state.appState.remove(appId);
     if (removed == null) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Sun Dec  8 04:19:16 2013
@@ -63,7 +63,7 @@ public class NullRMStateStore extends RM
   }
 
   @Override
-  protected void removeApplicationState(ApplicationState appState)
+  protected void removeApplicationStateInternal(ApplicationState appState)
       throws Exception {
     // Do nothing
   }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Sun Dec  8 04:19:16 2013
@@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -519,6 +518,7 @@ public abstract class RMStateStore exten
    * This does not block the dispatcher threads
    * There is no notification of completion for this operation.
    */
+  @SuppressWarnings("unchecked")
   public synchronized void removeApplication(RMApp app) {
     ApplicationState appState = new ApplicationState(
             app.getSubmitTime(), app.getStartTime(),
@@ -532,14 +532,6 @@ public abstract class RMStateStore exten
       appState.attempts.put(attemptState.getAttemptId(), attemptState);
     }
     
-    removeApplication(appState);
-  }
-  
-  @SuppressWarnings("unchecked")
-  /**
-   * Non-Blocking API
-   */
-  public synchronized void removeApplication(ApplicationState appState) {
     dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
   }
 
@@ -548,8 +540,8 @@ public abstract class RMStateStore exten
    * Derived classes must implement this method to remove the state of an 
    * application and its attempts
    */
-  protected abstract void removeApplicationState(ApplicationState appState) 
-                                                             throws Exception;
+  protected abstract void removeApplicationStateInternal(
+      ApplicationState appState) throws Exception;
 
   // TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
   // YARN-986 
@@ -666,11 +658,9 @@ public abstract class RMStateStore exten
       ApplicationState appState =
           ((RMStateStoreRemoveAppEvent) event).getAppState();
       ApplicationId appId = appState.getAppId();
-      Exception removedException = null;
       LOG.info("Removing info for app: " + appId);
       try {
-        removeApplicationState(appState);
-        notifyDoneRemovingApplcation(appId, removedException);
+        removeApplicationStateInternal(appState);
       } catch (Exception e) {
         LOG.error("Error removing app: " + appId, e);
         notifyStoreOperationFailed(e);
@@ -738,17 +728,6 @@ public abstract class RMStateStore exten
       new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
   }
 
-  @SuppressWarnings("unchecked")
-  /**
-   * This is to notify RMApp that this application has been removed from
-   * RMStateStore
-   */
-  private void notifyDoneRemovingApplcation(ApplicationId appId,
-      Exception removedException) {
-    rmDispatcher.getEventHandler().handle(
-      new RMAppRemovedEvent(appId, removedException));
-  }
-
   /**
    * EventHandler implementation which forward events to the FSRMStateStore
    * This hides the EventHandle methods of the store from its public interface 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Sun Dec  8 04:19:16 2013
@@ -392,7 +392,9 @@ public class ZKRMStateStore extends RMSt
       byte[] childData = getDataWithRetries(childNodePath, true);
       if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
         // application
-        LOG.info("Loading application from znode: " + childNodeName);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Loading application from znode: " + childNodeName);
+        }
         ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
         ApplicationStateDataPBImpl appStateData =
             new ApplicationStateDataPBImpl(
@@ -412,7 +414,9 @@ public class ZKRMStateStore extends RMSt
       } else if (childNodeName
           .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
         // attempt
-        LOG.info("Loading application attempt from znode: " + childNodeName);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Loading application attempt from znode: " + childNodeName);
+        }
         ApplicationAttemptId attemptId =
             ConverterUtils.toApplicationAttemptId(childNodeName);
         ApplicationAttemptStateDataPBImpl attemptStateData =
@@ -456,10 +460,10 @@ public class ZKRMStateStore extends RMSt
         LOG.info("Application node not found for attempt: "
             + attemptState.getAttemptId());
         deleteWithRetries(
-            getNodePath(rmAppRoot, attemptState.getAttemptId().toString()),
-            0);
+            getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), -1);
       }
     }
+    LOG.info("Done Loading applications from ZK state store");
   }
 
   @Override
@@ -517,16 +521,16 @@ public class ZKRMStateStore extends RMSt
   }
 
   @Override
-  public synchronized void removeApplicationState(ApplicationState appState)
+  public synchronized void removeApplicationStateInternal(ApplicationState appState)
       throws Exception {
     String appId = appState.getAppId().toString();
     String nodeRemovePath = getNodePath(rmAppRoot, appId);
     ArrayList<Op> opList = new ArrayList<Op>();
-    opList.add(Op.delete(nodeRemovePath, 0));
+    opList.add(Op.delete(nodeRemovePath, -1));
 
     for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
       String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
-      opList.add(Op.delete(attemptRemovePath, 0));
+      opList.add(Op.delete(attemptRemovePath, -1));
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
@@ -569,7 +573,7 @@ public class ZKRMStateStore extends RMSt
     }
 
     if (dtSequenceNumberPath != null) {
-      opList.add(Op.delete(dtSequenceNumberPath, 0));
+      opList.add(Op.delete(dtSequenceNumberPath, -1));
     }
     opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
         CreateMode.PERSISTENT));
@@ -587,7 +591,7 @@ public class ZKRMStateStore extends RMSt
       LOG.debug("Removing RMDelegationToken_"
           + rmDTIdentifier.getSequenceNumber());
     }
-    deleteWithRetries(nodeRemovePath, 0);
+    deleteWithRetries(nodeRemovePath, -1);
   }
 
   @Override
@@ -619,7 +623,7 @@ public class ZKRMStateStore extends RMSt
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
     }
-    deleteWithRetries(nodeRemovePath, 0);
+    deleteWithRetries(nodeRemovePath, -1);
   }
 
   // ZK related code

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Sun Dec  8 04:19:16 2013
@@ -660,32 +660,34 @@ public class RMAppImpl implements RMApp,
   @SuppressWarnings("unchecked")
   private static final class RMAppRecoveredTransition implements
       MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
-    
+
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+      /*
+       * If last attempt recovered final state is null .. it means attempt was
+       * started but AM container may or may not have started / finished.
+       * Therefore we should wait for it to finish.
+       */
+      for (RMAppAttempt attempt : app.getAppAttempts().values()) {
+        app.dispatcher.getEventHandler().handle(
+          new RMAppAttemptEvent(attempt.getAppAttemptId(),
+            RMAppAttemptEventType.RECOVER));
+      }
+
+      // The app has completed.
+      if (app.recoveredFinalState != null) {
+        FINAL_TRANSITION.transition(app, event);
+        return app.recoveredFinalState;
+      }
 
+      // No existent attempts means the attempt associated with this app was not
+      // started or started but not yet saved。
       if (app.attempts.isEmpty()) {
-        // Saved application was not running any attempts.
         app.createNewAttempt(true);
-        return RMAppState.SUBMITTED;        
-      } else {
-        /*
-         * If last attempt recovered final state is null .. it means attempt
-         * was started but AM container may or may not have started / finished.
-         * Therefore we should wait for it to finish.
-         */
-        for (RMAppAttempt attempt : app.getAppAttempts().values()) {
-          app.dispatcher.getEventHandler().handle(
-              new RMAppAttemptEvent(attempt.getAppAttemptId(),
-                  RMAppAttemptEventType.RECOVER));
-        }        
-        if (app.recoveredFinalState != null) {
-          FINAL_TRANSITION.transition(app, event);
-          return app.recoveredFinalState;
-        } else {
-          return RMAppState.RUNNING;
-        }
+        return RMAppState.SUBMITTED;
       }
+
+      return RMAppState.RUNNING;
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Sun Dec  8 04:19:16 2013
@@ -76,14 +76,13 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -675,9 +674,8 @@ public class RMAppAttemptImpl implements
     ApplicationAttemptState attemptState =
         appState.getAttempt(getAppAttemptId());
     assert attemptState != null;
-    LOG.info("Recovered attempt: AppId: "
-        + getAppAttemptId().getApplicationId() + " AttemptId: "
-        + getAppAttemptId() + " MasterContainer: " + masterContainer);
+    LOG.info("Recovering attempt: " + getAppAttemptId() + " with final state: "
+        + attemptState.getState());
     diagnostics.append("Attempt recovered after RM restart");
     diagnostics.append(attemptState.getDiagnostics());
     setMasterContainer(attemptState.getMasterContainer());
@@ -856,8 +854,6 @@ public class RMAppAttemptImpl implements
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-      LOG.info("Recovering attempt :  recoverdFinalState :"
-          + appAttempt.recoveredFinalState);
       if (appAttempt.recoveredFinalState != null) {
         appAttempt.progress = 1.0f;
         RMApp rmApp =appAttempt.rmContext.getRMApps().get(

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java Sun Dec  8 04:19:16 2013
@@ -19,8 +19,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
 
 import java.util.HashMap;
 import java.util.List;
@@ -43,6 +47,7 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -99,7 +104,7 @@ public class TestAppManager{
         rmDispatcher);
     AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
         rmDispatcher);
-    return new RMContextImpl(rmDispatcher,
+    RMContext context = new RMContextImpl(rmDispatcher,
         containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
         null, null, null, null, null) {
       @Override
@@ -107,6 +112,8 @@ public class TestAppManager{
         return map;
       }
     };
+    ((RMContextImpl)context).setStateStore(mock(RMStateStore.class));
+    return context;
   }
 
   public class TestAppManagerDispatcher implements
@@ -142,7 +149,6 @@ public class TestAppManager{
 
     public TestRMAppManager(RMContext context, Configuration conf) {
       super(context, null, null, new ApplicationACLsManager(conf), conf);
-      setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
     }
 
     public TestRMAppManager(RMContext context,
@@ -150,7 +156,6 @@ public class TestAppManager{
         YarnScheduler scheduler, ApplicationMasterService masterService,
         ApplicationACLsManager applicationACLsManager, Configuration conf) {
       super(context, scheduler, masterService, applicationACLsManager, conf);
-      setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
     }
 
     public void checkAppNumCompletedLimit() {
@@ -164,9 +169,8 @@ public class TestAppManager{
     public int getCompletedAppsListSize() {
       return super.getCompletedAppsListSize();
     }
-
-    public void setCompletedAppsMax(int max) {
-      super.setCompletedAppsMax(max);
+    public int getCompletedAppsInStateStore() {
+      return this.completedAppsInStateStore;
     }
     public void submitApplication(
         ApplicationSubmissionContext submissionContext, String user)
@@ -227,9 +231,9 @@ public class TestAppManager{
     // Create such that none of the applications will retire since
     // haven't hit max #
     RMContext rmContext = mockRMContext(10, now - 10);
-    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
-
-    appMonitor.setCompletedAppsMax(10);
+    Configuration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 10);
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext,conf);
 
     Assert.assertEquals("Number of apps incorrect before checkAppTimeLimit",
         10, rmContext.getRMApps().size());
@@ -243,6 +247,8 @@ public class TestAppManager{
         rmContext.getRMApps().size());
     Assert.assertEquals("Number of completed apps incorrect after check", 10,
         appMonitor.getCompletedAppsListSize());
+    verify(rmContext.getStateStore(), never()).removeApplication(
+      isA(RMApp.class));
   }
 
   @Test
@@ -250,9 +256,10 @@ public class TestAppManager{
     long now = System.currentTimeMillis();
 
     RMContext rmContext = mockRMContext(10, now - 20000);
-    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
-
-    appMonitor.setCompletedAppsMax(3);
+    Configuration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 3); 
+    conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 3);
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
 
     Assert.assertEquals("Number of apps incorrect before", 10, rmContext
         .getRMApps().size());
@@ -266,6 +273,8 @@ public class TestAppManager{
         rmContext.getRMApps().size());
     Assert.assertEquals("Number of completed apps incorrect after check", 3,
         appMonitor.getCompletedAppsListSize());
+    verify(rmContext.getStateStore(), times(7)).removeApplication(
+      isA(RMApp.class));
   }
 
   @Test
@@ -274,14 +283,17 @@ public class TestAppManager{
 
     // these parameters don't matter, override applications below
     RMContext rmContext = mockRMContext(10, now - 20000);
-    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
+    Configuration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 2);
+    conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
 
-    appMonitor.setCompletedAppsMax(2);
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
 
     // clear out applications map
     rmContext.getRMApps().clear();
     Assert.assertEquals("map isn't empty", 0, rmContext.getRMApps().size());
 
+    // 6 applications are in final state, 4 are not in final state.
     // / set with various finished states
     RMApp app = new MockRMApp(0, now - 20000, RMAppState.KILLED);
     rmContext.getRMApps().put(app.getApplicationId(), app);
@@ -318,7 +330,9 @@ public class TestAppManager{
         rmContext.getRMApps().size());
     Assert.assertEquals("Number of completed apps incorrect after check", 2,
         appMonitor.getCompletedAppsListSize());
-
+    // 6 applications in final state, 4 of them are removed
+    verify(rmContext.getStateStore(), times(4)).removeApplication(
+      isA(RMApp.class));
   }
 
   @Test
@@ -342,14 +356,13 @@ public class TestAppManager{
     long now = System.currentTimeMillis();
 
     RMContext rmContext = mockRMContext(10, now - 20000);
-    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
-
+    Configuration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 0);
+    conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 0);
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
     Assert.assertEquals("Number of apps incorrect before", 10, rmContext
         .getRMApps().size());
 
-    // test with 0
-    appMonitor.setCompletedAppsMax(0);
-
     addToCompletedApps(appMonitor, rmContext);
     Assert.assertEquals("Number of completed apps incorrect", 10,
         appMonitor.getCompletedAppsListSize());
@@ -360,6 +373,64 @@ public class TestAppManager{
         rmContext.getRMApps().size());
     Assert.assertEquals("Number of completed apps incorrect after check", 0,
         appMonitor.getCompletedAppsListSize());
+    verify(rmContext.getStateStore(), times(10)).removeApplication(
+      isA(RMApp.class));
+  }
+
+  @Test
+  public void testStateStoreAppLimitLessThanMemoryAppLimit() {
+    long now = System.currentTimeMillis();
+    RMContext rmContext = mockRMContext(10, now - 20000);
+    Configuration conf = new YarnConfiguration();
+    int maxAppsInMemory = 8;
+    int maxAppsInStateStore = 4;
+    conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory);
+    conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
+      maxAppsInStateStore);
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
+
+    addToCompletedApps(appMonitor, rmContext);
+    Assert.assertEquals("Number of completed apps incorrect", 10,
+        appMonitor.getCompletedAppsListSize());
+    appMonitor.checkAppNumCompletedLimit();
+
+    Assert.assertEquals("Number of apps incorrect after # completed check",
+      maxAppsInMemory, rmContext.getRMApps().size());
+    Assert.assertEquals("Number of completed apps incorrect after check",
+      maxAppsInMemory, appMonitor.getCompletedAppsListSize());
+
+    int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore;
+    verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore))
+      .removeApplication(isA(RMApp.class));
+    Assert.assertEquals(maxAppsInStateStore,
+      appMonitor.getCompletedAppsInStateStore());
+  }
+
+  @Test
+  public void testStateStoreAppLimitLargerThanMemoryAppLimit() {
+    long now = System.currentTimeMillis();
+    RMContext rmContext = mockRMContext(10, now - 20000);
+    Configuration conf = new YarnConfiguration();
+    int maxAppsInMemory = 8;
+    conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory);
+    // larger than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS.
+    conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1000);
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
+
+    addToCompletedApps(appMonitor, rmContext);
+    Assert.assertEquals("Number of completed apps incorrect", 10,
+        appMonitor.getCompletedAppsListSize());
+    appMonitor.checkAppNumCompletedLimit();
+
+    int numRemoveApps = 10 - maxAppsInMemory;
+    Assert.assertEquals("Number of apps incorrect after # completed check",
+      maxAppsInMemory, rmContext.getRMApps().size());
+    Assert.assertEquals("Number of completed apps incorrect after check",
+      maxAppsInMemory, appMonitor.getCompletedAppsListSize());
+    verify(rmContext.getStateStore(), times(numRemoveApps)).removeApplication(
+      isA(RMApp.class));
+    Assert.assertEquals(maxAppsInMemory,
+      appMonitor.getCompletedAppsInStateStore());
   }
 
   protected void setupDispatcher(RMContext rmContext, Configuration conf) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Sun Dec  8 04:19:16 2013
@@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -423,6 +424,8 @@ public class TestRMRestart {
         rm2.getRMContext().getRMApps().get(app0.getApplicationId());
     Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp
       .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
+    rm1.stop();
+    rm2.stop();
   }
 
   @Test
@@ -629,6 +632,8 @@ public class TestRMRestart {
       .contains("Failing the application."));
     // failed diagnostics from attempt is lost because the diagnostics from
     // attempt is not yet available by the time app is saving the app state.
+    rm1.stop();
+    rm2.stop();
   }
 
   @Test
@@ -675,6 +680,48 @@ public class TestRMRestart {
     ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2);
     Assert.assertEquals(app0.getDiagnostics().toString(),
       appReport.getDiagnostics());
+    rm1.stop();
+    rm2.stop();
+  }
+
+  @Test
+  public void testRMRestartKilledAppWithNoAttempts() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore() {
+      @Override
+      public synchronized void storeApplicationAttemptStateInternal(
+          String attemptIdStr,
+          ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+        // ignore attempt saving request.
+      }
+
+      @Override
+      public synchronized void updateApplicationAttemptStateInternal(
+          String attemptIdStr,
+          ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+        // ignore attempt saving request.
+      }
+    };
+    memStore.init(conf);
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    // create app
+    RMApp app0 =
+        rm1.submitApp(200, "name", "user",
+          new HashMap<ApplicationAccessType, String>(), false, "default", -1,
+          null, "MAPREDUCE", false);
+    // kill the app.
+    rm1.killApp(app0.getApplicationId());
+    rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED);
+
+    // restart rm
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    RMApp loadedApp0 =
+        rm2.getRMContext().getRMApps().get(app0.getApplicationId());
+    rm2.waitForState(loadedApp0.getApplicationId(), RMAppState.KILLED);
+    Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0);
   }
 
   @Test
@@ -724,6 +771,9 @@ public class TestRMRestart {
     Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
       appReport.getFinalApplicationStatus());
     Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl());
+
+    rm1.stop();
+    rm2.stop();
   }
 
   @Test
@@ -817,6 +867,9 @@ public class TestRMRestart {
     // check application summary is logged for the completed apps after RM restart.
     verify(rm2.getRMAppManager(), times(3)).logApplicationSummary(
       isA(ApplicationId.class));
+
+    rm1.stop();
+    rm2.stop();
   }
 
   private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
@@ -1378,6 +1431,52 @@ public class TestRMRestart {
     Assert.assertTrue(rmAppState.size() == NUM_APPS);
   }
 
+  @Test
+  public void testFinishedAppRemovalAfterRMRestart() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create an app and finish the app.
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+    finishApplicationMaster(app0, rm1, nm1, am0);
+
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
+
+    Map<ApplicationId, ApplicationState> rmAppState =
+        rmState.getApplicationState();
+
+    // app0 exits in both state store and rmContext
+    Assert.assertEquals(RMAppState.FINISHED,
+      rmAppState.get(app0.getApplicationId()).getState());
+    rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
+
+    // create one more app and finish the app.
+    RMApp app1 = rm2.submitApp(200);
+    MockAM am1 = launchAM(app1, rm2, nm1);
+    finishApplicationMaster(app1, rm2, nm1, am1);
+
+    // the first app0 get kicked out from both rmContext and state store
+    Assert.assertNull(rm2.getRMContext().getRMApps()
+      .get(app0.getApplicationId()));
+    Assert.assertNull(rmAppState.get(app0.getApplicationId()));
+
+    rm1.stop();
+    rm2.stop();
+  }
+
   public static class TestSecurityMockRM extends MockRM {
 
     public TestSecurityMockRM(Configuration conf, RMStateStore store) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Sun Dec  8 04:19:16 2013
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -109,6 +110,7 @@ public class RMStateStoreTestBase extend
     boolean isFinalStateValid() throws Exception;
     void writeVersion(RMStateVersion version) throws Exception;
     RMStateVersion getCurrentVersion() throws Exception;
+    boolean appExists(RMApp app) throws Exception;
   }
 
   void waitNotify(TestDispatcher dispatcher) {
@@ -128,7 +130,7 @@ public class RMStateStoreTestBase extend
     dispatcher.notified = false;
   }
 
-  void storeApp(RMStateStore store, ApplicationId appId, long submitTime,
+  RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime,
       long startTime) throws Exception {
     ApplicationSubmissionContext context =
         new ApplicationSubmissionContextPBImpl();
@@ -141,6 +143,7 @@ public class RMStateStoreTestBase extend
     when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
     when(mockApp.getUser()).thenReturn("test");
     store.storeNewApplication(mockApp);
+    return mockApp;
   }
 
   ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
@@ -370,6 +373,7 @@ public class RMStateStoreTestBase extend
     Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
     Assert.assertEquals(sequenceNumber,
         secretManagerState.getDTSequenceNumber());
+    store.close();
   }
 
   private Token<AMRMTokenIdentifier> generateAMRMToken(
@@ -415,4 +419,43 @@ public class RMStateStoreTestBase extend
       Assert.assertTrue(t instanceof RMStateVersionIncompatibleException);
     }
   }
+
+  public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
+      throws Exception {
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    store.setRMDispatcher(new TestDispatcher());
+    // create and store apps
+    ArrayList<RMApp> appList = new ArrayList<RMApp>();
+    int NUM_APPS = 5;
+    for (int i = 0; i < NUM_APPS; i++) {
+      ApplicationId appId = ApplicationId.newInstance(1383183338, i);
+      RMApp app = storeApp(store, appId, 123456789, 987654321);
+      appList.add(app);
+    }
+
+    Assert.assertEquals(NUM_APPS, appList.size());
+    for (RMApp app : appList) {
+      // wait for app to be stored.
+      while (true) {
+        if (stateStoreHelper.appExists(app)) {
+          break;
+        } else {
+          Thread.sleep(100);
+        }
+      }
+    }
+
+    for (RMApp app : appList) {
+      // remove the app
+      store.removeApplication(app);
+      // wait for app to be removed.
+      while (true) {
+        if (!stateStoreHelper.appExists(app)) {
+          break;
+        } else {
+          Thread.sleep(100);
+        }
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java Sun Dec  8 04:19:16 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import junit.framework.Assert;
@@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Test;
@@ -69,6 +71,13 @@ public class TestFSRMStateStore extends 
       public RMStateVersion getCurrentVersion() {
         return CURRENT_VERSION_INFO;
       }
+
+      public Path getAppDir(String appId) {
+        Path rootDir = new Path(workingDirPathURI, ROOT_DIR_NAME);
+        Path appRootDir = new Path(rootDir, RM_APP_ROOT);
+        Path appDir = new Path(appRootDir, appId);
+        return appDir;
+      }
     }
 
     public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
@@ -109,9 +118,16 @@ public class TestFSRMStateStore extends 
     public RMStateVersion getCurrentVersion() throws Exception {
       return store.getCurrentVersion();
     }
+
+    public boolean appExists(RMApp app) throws IOException {
+      FileSystem fs = cluster.getFileSystem();
+      Path nodePath =
+          store.getAppDir(app.getApplicationId().toString());
+      return fs.exists(nodePath);
+    }
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testFSRMStateStore() throws Exception {
     HdfsConfiguration conf = new HdfsConfiguration();
     MiniDFSCluster cluster =
@@ -126,11 +142,8 @@ public class TestFSRMStateStore extends 
       String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
       ApplicationAttemptId attemptId3 =
           ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
-      Path rootDir =
-          new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot");
-      Path appRootDir = new Path(rootDir, "RMAppRoot");
       Path appDir =
-          new Path(appRootDir, attemptId3.getApplicationId().toString());
+          fsTester.store.getAppDir(attemptId3.getApplicationId().toString());
       Path tempAppAttemptFile =
           new Path(appDir, attemptId3.toString() + ".tmp");
       fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
@@ -138,10 +151,11 @@ public class TestFSRMStateStore extends 
       fsOut.close();
 
       testRMAppStateStore(fsTester);
-      Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
+      Assert.assertFalse(fsTester.workingDirPathURI
           .getFileSystem(conf).exists(tempAppAttemptFile));
       testRMDTSecretManagerStateStore(fsTester);
       testCheckVersion(fsTester);
+      testAppDeletion(fsTester);
     } finally {
       cluster.shutdown();
     }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java Sun Dec  8 04:19:16 2013
@@ -46,7 +46,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.junit.Test;
 
 public class TestZKRMStateStore extends RMStateStoreTestBase {
@@ -57,6 +59,7 @@ public class TestZKRMStateStore extends 
 
     ZooKeeper client;
     TestZKRMStateStoreInternal store;
+    String workingZnode;
 
     class TestZKRMStateStoreInternal extends ZKRMStateStore {
 
@@ -79,11 +82,16 @@ public class TestZKRMStateStore extends 
       public RMStateVersion getCurrentVersion() {
         return CURRENT_VERSION_INFO;
       }
+
+      public String getAppNode(String appId) {
+        return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/"
+            + appId;
+      }
     }
 
     public RMStateStore getRMStateStore() throws Exception {
-      String workingZnode = "/Test";
-      Configuration conf = new YarnConfiguration();
+      YarnConfiguration conf = new YarnConfiguration();
+      workingZnode = "/Test";
       conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
       conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
       this.client = createClient();
@@ -107,14 +115,22 @@ public class TestZKRMStateStore extends 
     public RMStateVersion getCurrentVersion() throws Exception {
       return store.getCurrentVersion();
     }
+
+    public boolean appExists(RMApp app) throws Exception {
+      Stat node =
+          client.exists(store.getAppNode(app.getApplicationId().toString()),
+            false);
+      return node !=null;
+    }
   }
 
-  @Test
+  @Test (timeout = 60000)
   public void testZKRMStateStoreRealZK() throws Exception {
     TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
     testRMAppStateStore(zkTester);
     testRMDTSecretManagerStateStore(zkTester);
     testCheckVersion(zkTester);
+    testAppDeletion(zkTester);
   }
 
   private Configuration createHARMConf(

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java?rev=1548991&r1=1548990&r2=1548991&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java Sun Dec  8 04:19:16 2013
@@ -120,7 +120,7 @@ public class TestZKRMStateStoreZKClientC
     TestZKClient zkClientTester = new TestZKClient();
     final String path = "/test";
     YarnConfiguration conf = new YarnConfiguration();
-    conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
+    conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 1000);
     conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100);
     final ZKRMStateStore store =
         (ZKRMStateStore) zkClientTester.getRMStateStore(conf);