You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/05/02 22:05:39 UTC

[22/50] [abbrv] hadoop git commit: YARN-2962. ZKRMStateStore: Limit the number of znodes under a znode (Contributed by Varun Sexena via Daniel Templeton)

YARN-2962. ZKRMStateStore: Limit the number of znodes under a znode (Contributed by Varun Sexena via Daniel Templeton)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2e52789e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2e52789e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2e52789e

Branch: refs/heads/HDFS-10467
Commit: 2e52789edf68016e7a3f450164f8bd3d8e6cb210
Parents: fdf5192
Author: Daniel Templeton <te...@apache.org>
Authored: Fri Apr 28 13:26:36 2017 -0700
Committer: Daniel Templeton <te...@apache.org>
Committed: Fri Apr 28 13:30:28 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   4 +
 .../src/main/resources/yarn-default.xml         |  22 +-
 .../recovery/ZKRMStateStore.java                | 506 ++++++++++++----
 .../resourcemanager/TestRMStoreCommands.java    |  15 +-
 .../recovery/RMStateStoreTestBase.java          |  10 +-
 .../recovery/TestZKRMStateStore.java            | 572 ++++++++++++++++++-
 6 files changed, 1013 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e52789e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index fa4d2e3..82274fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -568,6 +568,10 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms";
   public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000;
 
+  public static final  String ZK_APPID_NODE_SPLIT_INDEX =
+      RM_ZK_PREFIX + "appid-node.split-index";
+  public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 0;
+
   public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
   public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e52789e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index bdd4de5..e687eef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -609,7 +609,27 @@
   </property>
 
   <property>
-    <description>Name of the cluster. In an HA setting,
+    <description>Index at which last section of application id (with each section
+      separated by _ in application id) will be split so that application znode
+      stored in zookeeper RM state store will be stored as two different znodes
+      (parent-child). Split is done from the end.
+      For instance, with no split, appid znode will be of the form
+      application_1352994193343_0001. If the value of this config is 1, the
+      appid znode will be broken into two parts application_1352994193343_000
+      and 1 respectively with former being the parent node.
+      application_1352994193343_0002 will then be stored as 2 under the parent
+      node application_1352994193343_000. This config can take values from 0 to 4.
+      0 means there will be no split. If configuration value is outside this
+      range, it will be treated as config value of 0(i.e. no split). A value
+      larger than 0 (up to 4) should be configured if you are storing a large number
+      of apps in ZK based RM state store and state store operations are failing due to
+      LenError in Zookeeper.</description>
+    <name>yarn.resourcemanager.zk-appid-node.split-index</name>
+    <value>0</value>
+  </property>
+
+  <property>
+    <description>Name of the cluster. In a HA setting,
       this is used to ensure the RM participates in leader
       election for this cluster and ensures it does not affect
       other clusters</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e52789e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 1212a91..86f7a5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.Ap
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
@@ -72,6 +73,8 @@ import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * {@link RMStateStore} implementation backed by ZooKeeper.
@@ -82,6 +85,31 @@ import java.util.List;
  * |--- EPOCH_NODE
  * |--- RM_ZK_FENCING_LOCK
  * |--- RM_APP_ROOT
+ * |     |----- HIERARCHIES
+ * |     |        |----- 1
+ * |     |        |      |----- (#ApplicationId barring last character)
+ * |     |        |      |       |----- (#Last character of ApplicationId)
+ * |     |        |      |       |       |----- (#ApplicationAttemptIds)
+ * |     |        |      ....
+ * |     |        |
+ * |     |        |----- 2
+ * |     |        |      |----- (#ApplicationId barring last 2 characters)
+ * |     |        |      |       |----- (#Last 2 characters of ApplicationId)
+ * |     |        |      |       |       |----- (#ApplicationAttemptIds)
+ * |     |        |      ....
+ * |     |        |
+ * |     |        |----- 3
+ * |     |        |      |----- (#ApplicationId barring last 3 characters)
+ * |     |        |      |       |----- (#Last 3 characters of ApplicationId)
+ * |     |        |      |       |       |----- (#ApplicationAttemptIds)
+ * |     |        |      ....
+ * |     |        |
+ * |     |        |----- 4
+ * |     |        |      |----- (#ApplicationId barring last 4 characters)
+ * |     |        |      |       |----- (#Last 4 characters of ApplicationId)
+ * |     |        |      |       |       |----- (#ApplicationAttemptIds)
+ * |     |        |      ....
+ * |     |        |
  * |     |----- (#ApplicationId1)
  * |     |        |----- (#ApplicationAttemptIds)
  * |     |
@@ -121,6 +149,7 @@ import java.util.List;
 @Unstable
 public class ZKRMStateStore extends RMStateStore {
   private static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
+
   private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
       "RMDelegationTokensRoot";
   private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
@@ -129,12 +158,15 @@ public class ZKRMStateStore extends RMStateStore {
       "RMDTMasterKeysRoot";
   @VisibleForTesting
   public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
-  protected static final Version CURRENT_VERSION_INFO =
-      Version.newInstance(1, 3);
+  protected static final Version CURRENT_VERSION_INFO = Version
+      .newInstance(2, 0);
+  @VisibleForTesting
+  public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES";
 
   /* Znode paths */
   private String zkRootNodePath;
   private String rmAppRoot;
+  private Map<Integer, String> rmAppRootHierarchies;
   private String rmDTSecretManagerRoot;
   private String dtMasterKeysRootPath;
   private String delegationTokensRootPath;
@@ -144,6 +176,7 @@ public class ZKRMStateStore extends RMStateStore {
 
   @VisibleForTesting
   protected String znodeWorkingPath;
+  private int appIdNodeSplitIndex = 0;
 
   /* Fencing related variables */
   private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@@ -165,6 +198,27 @@ public class ZKRMStateStore extends RMStateStore {
   @VisibleForTesting
   protected CuratorFramework curatorFramework;
 
+  /*
+   * Indicates different app attempt state store operations.
+   */
+  private enum AppAttemptOp {
+    STORE,
+    UPDATE,
+    REMOVE
+  };
+
+  /**
+   * Encapsulates full app node path and corresponding split index.
+   */
+  private final static class AppNodeSplitInfo {
+    private final String path;
+    private final int splitIndex;
+    AppNodeSplitInfo(String path, int splitIndex) {
+      this.path = path;
+      this.splitIndex = splitIndex;
+    }
+  }
+
   /**
    * Given the {@link Configuration} and {@link ACL}s used (sourceACLs) for
    * ZooKeeper access, construct the {@link ACL}s for the store's root node.
@@ -212,11 +266,30 @@ public class ZKRMStateStore extends RMStateStore {
         conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
             YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
     zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
-    fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
     rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
+    String hierarchiesPath = getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES);
+    rmAppRootHierarchies = new HashMap<>(5);
+    rmAppRootHierarchies.put(0, rmAppRoot);
+    for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
+      rmAppRootHierarchies.put(splitIndex,
+          getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
+    }
+
+    fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
     zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
         YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
 
+    appIdNodeSplitIndex =
+        conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
+            YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
+    if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 4) {
+      LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " +
+          YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " +
+              "Resetting it to " +
+                  YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
+      appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
+    }
+
     zkAcl = RMZKUtils.getZKAcls(conf);
 
     if (HAUtil.isHAEnabled(conf)) {
@@ -269,6 +342,10 @@ public class ZKRMStateStore extends RMStateStore {
       verifyActiveStatusThread.start();
     }
     create(rmAppRoot);
+    create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES));
+    for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
+      create(rmAppRootHierarchies.get(splitIndex));
+    }
     create(rmDTSecretManagerRoot);
     create(dtMasterKeysRootPath);
     create(delegationTokensRootPath);
@@ -524,42 +601,64 @@ public class ZKRMStateStore extends RMStateStore {
     }
   }
 
-  private synchronized void loadRMAppState(RMState rmState) throws Exception {
-    List<String> childNodes = getChildren(rmAppRoot);
-
-    for (String childNodeName : childNodes) {
-      String childNodePath = getNodePath(rmAppRoot, childNodeName);
-      byte[] childData = getData(childNodePath);
-
-      if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
-        // application
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Loading application from znode: " + childNodeName);
-        }
-
-        ApplicationId appId = ApplicationId.fromString(childNodeName);
-        ApplicationStateDataPBImpl appState =
-            new ApplicationStateDataPBImpl(
-                ApplicationStateDataProto.parseFrom(childData));
+  private void loadRMAppStateFromAppNode(RMState rmState, String appNodePath,
+      String appIdStr) throws Exception {
+    byte[] appData = getData(appNodePath);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Loading application from znode: " + appNodePath);
+    }
+    ApplicationId appId = ApplicationId.fromString(appIdStr);
+    ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl(
+        ApplicationStateDataProto.parseFrom(appData));
+    if (!appId.equals(
+        appState.getApplicationSubmissionContext().getApplicationId())) {
+      throw new YarnRuntimeException("The node name is different from the " +
+             "application id");
+    }
+    rmState.appState.put(appId, appState);
+    loadApplicationAttemptState(appState, appNodePath);
+  }
 
-        if (!appId.equals(
-            appState.getApplicationSubmissionContext().getApplicationId())) {
-          throw new YarnRuntimeException("The child node name is different "
-              + "from the application id");
+  private synchronized void loadRMAppState(RMState rmState) throws Exception {
+    for (int splitIndex = 0; splitIndex <= 4; splitIndex++) {
+      String appRoot = rmAppRootHierarchies.get(splitIndex);
+      if (appRoot == null) {
+        continue;
+      }
+      List<String> childNodes = getChildren(appRoot);
+      boolean appNodeFound = false;
+      for (String childNodeName : childNodes) {
+        if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
+          appNodeFound = true;
+          if (splitIndex == 0) {
+            loadRMAppStateFromAppNode(rmState,
+                getNodePath(appRoot, childNodeName), childNodeName);
+          } else {
+            // If AppId Node is partitioned.
+            String parentNodePath = getNodePath(appRoot, childNodeName);
+            List<String> leafNodes = getChildren(parentNodePath);
+            for (String leafNodeName : leafNodes) {
+              String appIdStr = childNodeName + leafNodeName;
+              loadRMAppStateFromAppNode(rmState,
+                  getNodePath(parentNodePath, leafNodeName), appIdStr);
+            }
+          }
+        } else {
+          LOG.info("Unknown child node with name: " + childNodeName);
         }
-
-        rmState.appState.put(appId, appState);
-        loadApplicationAttemptState(appState, appId);
-      } else {
-        LOG.info("Unknown child node with name: " + childNodeName);
+      }
+      if (splitIndex != appIdNodeSplitIndex && !appNodeFound) {
+        // If no loaded app exists for a particular split index and the split
+        // index for which apps are being loaded is not the one configured, then
+        // we do not need to keep track of this hierarchy for storing/updating/
+        // removing app/app attempt znodes.
+        rmAppRootHierarchies.remove(splitIndex);
       }
     }
   }
 
   private void loadApplicationAttemptState(ApplicationStateData appState,
-      ApplicationId appId)
-      throws Exception {
-    String appPath = getNodePath(rmAppRoot, appId.toString());
+      String appPath) throws Exception {
     List<String> attempts = getChildren(appPath);
 
     for (String attemptIDStr : attempts) {
@@ -574,14 +673,68 @@ public class ZKRMStateStore extends RMStateStore {
         appState.attempts.put(attemptState.getAttemptId(), attemptState);
       }
     }
-
     LOG.debug("Done loading applications from ZK state store");
   }
 
+  /**
+   * Get parent app node path based on full path and split index supplied.
+   * @param appIdPath App id path for which parent needs to be returned.
+   * @param splitIndex split index.
+   * @return parent app node path.
+   */
+  private String getSplitAppNodeParent(String appIdPath, int splitIndex) {
+    // Calculated as string upto index (appIdPath Length - split index - 1). We
+    // deduct 1 to exclude path separator.
+    return appIdPath.substring(0, appIdPath.length() - splitIndex - 1);
+  }
+
+  /**
+   * Checks if parent app node has no leaf nodes and if it does not have,
+   * removes it. Called while removing application.
+   * @param appIdPath path of app id to be removed.
+   * @param splitIndex split index.
+   * @throws Exception if any problem occurs while performing ZK operation.
+   */
+  private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
+      throws Exception {
+    if (splitIndex != 0) {
+      String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex);
+      List<String> children = null;
+      try {
+        children = getChildren(parentAppNode);
+      } catch (KeeperException.NoNodeException ke) {
+        // It should be fine to swallow this exception as the parent app node we
+        // intend to delete is already deleted.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Unable to remove app parent node " + parentAppNode +
+              " as it does not exist.");
+        }
+        return;
+      }
+      // No apps stored under parent path.
+      if (children != null && children.isEmpty()) {
+        try {
+          safeDelete(parentAppNode);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("No leaf app node exists. Removing parent node " +
+                parentAppNode);
+          }
+        } catch (KeeperException.NotEmptyException ke) {
+          // It should be fine to swallow this exception as the parent app node
+          // has to be deleted only if it has no children. And this node has.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Unable to remove app parent node " + parentAppNode +
+                " as it has children.");
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public synchronized void storeApplicationStateInternal(ApplicationId appId,
       ApplicationStateData appStateDataPB) throws Exception {
-    String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
+    String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), true);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
@@ -596,7 +749,26 @@ public class ZKRMStateStore extends RMStateStore {
   protected synchronized void updateApplicationStateInternal(
       ApplicationId appId, ApplicationStateData appStateDataPB)
       throws Exception {
-    String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
+    String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), false);
+    boolean pathExists = true;
+    // Look for paths based on other split indices if path as per split index
+    // does not exist.
+    if (!exists(nodeUpdatePath)) {
+      AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString());
+      if (alternatePathInfo != null) {
+        nodeUpdatePath = alternatePathInfo.path;
+      } else {
+        // No alternate path exists. Create path as per configured split index.
+        pathExists = false;
+        if (appIdNodeSplitIndex != 0) {
+          String rootNode =
+              getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
+          if (!exists(rootNode)) {
+            safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT);
+          }
+        }
+      }
+    }
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing final state info for app: " + appId + " at: "
@@ -605,34 +777,79 @@ public class ZKRMStateStore extends RMStateStore {
 
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
 
-    if (exists(nodeUpdatePath)) {
+    if (pathExists) {
       safeSetData(nodeUpdatePath, appStateData, -1);
     } else {
-      safeCreate(nodeUpdatePath, appStateData, zkAcl,
-          CreateMode.PERSISTENT);
+      safeCreate(nodeUpdatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
       if (LOG.isDebugEnabled()) {
-        LOG.debug(appId + " znode didn't exist. Created a new znode to"
-            + " update the application state.");
+        LOG.debug("Path " + nodeUpdatePath + " for " + appId + " didn't " +
+            "exist. Creating a new znode to update the application state.");
       }
     }
   }
 
-  @Override
-  protected synchronized void storeApplicationAttemptStateInternal(
+  /*
+   * Handles store, update and remove application attempt state store
+   * operations.
+   */
+  private void handleApplicationAttemptStateOp(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateData attemptStateDataPB)
+      ApplicationAttemptStateData attemptStateDataPB, AppAttemptOp operation)
       throws Exception {
-    String appDirPath = getNodePath(rmAppRoot,
-        appAttemptId.getApplicationId().toString());
-    String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
-
+    String appId = appAttemptId.getApplicationId().toString();
+    String appDirPath = getLeafAppIdNodePath(appId, false);
+    // Look for paths based on other split indices.
+    if (!exists(appDirPath)) {
+      AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId);
+      if (alternatePathInfo == null) {
+        if (operation == AppAttemptOp.REMOVE) {
+          // Unexpected. Assume that app attempt has been deleted.
+          return;
+        } else { // Store or Update operation
+          throw new YarnRuntimeException("Unexpected Exception. App node for " +
+              "app " + appId + " not found");
+        }
+      } else {
+        appDirPath = alternatePathInfo.path;
+      }
+    }
+    String path = getNodePath(appDirPath, appAttemptId.toString());
+    byte[] attemptStateData = (attemptStateDataPB == null) ? null :
+        attemptStateDataPB.getProto().toByteArray();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
-          + nodeCreatePath);
+      LOG.debug(operation + " info for attempt: " + appAttemptId + " at: "
+          + path);
     }
+    switch (operation) {
+    case UPDATE:
+      if (exists(path)) {
+        safeSetData(path, attemptStateData, -1);
+      } else {
+        safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Path " + path + " for " + appAttemptId + " didn't exist." +
+              " Created a new znode to update the application attempt state.");
+        }
+      }
+      break;
+    case STORE:
+      safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
+      break;
+    case REMOVE:
+      safeDelete(path);
+      break;
+    default:
+      break;
+    }
+  }
 
-    byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
-    safeCreate(nodeCreatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT);
+  @Override
+  protected synchronized void storeApplicationAttemptStateInternal(
+      ApplicationAttemptId appAttemptId,
+      ApplicationAttemptStateData attemptStateDataPB)
+      throws Exception {
+    handleApplicationAttemptStateOp(appAttemptId, attemptStateDataPB,
+        AppAttemptOp.STORE);
   }
 
   @Override
@@ -640,65 +857,73 @@ public class ZKRMStateStore extends RMStateStore {
       ApplicationAttemptId appAttemptId,
       ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
-    String appIdStr = appAttemptId.getApplicationId().toString();
-    String appAttemptIdStr = appAttemptId.toString();
-    String appDirPath = getNodePath(rmAppRoot, appIdStr);
-    String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
-          + " at: " + nodeUpdatePath);
-    }
-
-    byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
-
-    if (exists(nodeUpdatePath)) {
-      safeSetData(nodeUpdatePath, attemptStateData, -1);
-    } else {
-      safeCreate(nodeUpdatePath, attemptStateData, zkAcl,
-          CreateMode.PERSISTENT);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
-            + " update the application attempt state.");
-      }
-    }
+    handleApplicationAttemptStateOp(appAttemptId, attemptStateDataPB,
+        AppAttemptOp.UPDATE);
   }
 
   @Override
   protected synchronized void removeApplicationAttemptInternal(
       ApplicationAttemptId appAttemptId) throws Exception {
-    String appId = appAttemptId.getApplicationId().toString();
-    String appIdRemovePath = getNodePath(rmAppRoot, appId);
-    String attemptIdRemovePath =
-        getNodePath(appIdRemovePath, appAttemptId.toString());
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Removing info for attempt: " + appAttemptId + " at: "
-          + attemptIdRemovePath);
-    }
-
-    safeDelete(attemptIdRemovePath);
+    handleApplicationAttemptStateOp(appAttemptId, null, AppAttemptOp.REMOVE);
   }
 
   @Override
   protected synchronized void removeApplicationStateInternal(
       ApplicationStateData appState) throws Exception {
-    String appId = appState.getApplicationSubmissionContext().getApplicationId()
-        .toString();
-    String appIdRemovePath = getNodePath(rmAppRoot, appId);
+    removeApp(appState.getApplicationSubmissionContext().
+        getApplicationId().toString(), true, appState.attempts.keySet());
+  }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
-          + " and its attempts.");
-    }
+  private void removeApp(String removeAppId) throws Exception {
+    removeApp(removeAppId, false, null);
+  }
 
-    for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
-      String attemptRemovePath =
-          getNodePath(appIdRemovePath, attemptId.toString());
-      safeDelete(attemptRemovePath);
+  /**
+   * Remove application node and its attempt nodes.
+   *
+   * @param removeAppId Application Id to be removed.
+   * @param safeRemove Flag indicating if application and attempt nodes have to
+   *     be removed safely under a fencing or not.
+   * @param attempts list of attempts to be removed associated with this app.
+   *     Ignored if safeRemove flag is false as we recursively delete all the
+   *     child nodes directly.
+   * @throws Exception if any exception occurs during ZK operation.
+   */
+  private void removeApp(String removeAppId, boolean safeRemove,
+      Set<ApplicationAttemptId> attempts) throws Exception {
+    String appIdRemovePath = getLeafAppIdNodePath(removeAppId, false);
+    int splitIndex = appIdNodeSplitIndex;
+    // Look for paths based on other split indices if path as per configured
+    // split index does not exist.
+    if (!exists(appIdRemovePath)) {
+      AppNodeSplitInfo alternatePathInfo = getAlternatePath(removeAppId);
+      if (alternatePathInfo != null) {
+        appIdRemovePath = alternatePathInfo.path;
+        splitIndex = alternatePathInfo.splitIndex;
+      } else {
+        // Alternate path not found so return.
+        return;
+      }
     }
-
-    safeDelete(appIdRemovePath);
+    if (safeRemove) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Removing info for app: " + removeAppId + " at: " +
+            appIdRemovePath + " and its attempts.");
+      }
+      if (attempts != null) {
+        for (ApplicationAttemptId attemptId : attempts) {
+          String attemptRemovePath =
+              getNodePath(appIdRemovePath, attemptId.toString());
+          safeDelete(attemptRemovePath);
+        }
+      }
+      safeDelete(appIdRemovePath);
+    } else {
+      curatorFramework.delete().deletingChildrenIfNeeded().
+          forPath(appIdRemovePath);
+    }
+    // Check if we should remove the parent app node as well.
+    checkRemoveParentAppNode(appIdRemovePath, splitIndex);
   }
 
   @Override
@@ -820,8 +1045,7 @@ public class ZKRMStateStore extends RMStateStore {
   @Override
   public synchronized void removeApplication(ApplicationId removeAppId)
       throws Exception {
-    String appIdRemovePath = getNodePath(rmAppRoot, removeAppId.toString());
-    delete(appIdRemovePath);
+    removeApp(removeAppId.toString());
   }
 
   @VisibleForTesting
@@ -920,6 +1144,79 @@ public class ZKRMStateStore extends RMStateStore {
     }
   }
 
+  /**
+   * Get alternate path for app id if path according to configured split index
+   * does not exist. We look for path based on all possible split indices.
+   * @param appId
+   * @return a {@link AppNodeSplitInfo} object containing the path and split
+   *    index if it exists, null otherwise.
+   * @throws Exception if any problem occurs while performing ZK operation.
+   */
+  private AppNodeSplitInfo getAlternatePath(String appId) throws Exception {
+    for (Map.Entry<Integer, String> entry : rmAppRootHierarchies.entrySet()) {
+      // Look for other paths
+      int splitIndex = entry.getKey();
+      if (splitIndex != appIdNodeSplitIndex) {
+        String alternatePath =
+            getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false);
+        if (exists(alternatePath)) {
+          return new AppNodeSplitInfo(alternatePath, splitIndex);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Returns leaf app node path based on app id and passed split index. If the
+   * passed flag createParentIfNotExists is true, also creates the parent app
+   * node if it does not exist.
+   * @param appId application id.
+   * @param rootNode app root node based on split index.
+   * @param appIdNodeSplitIdx split index.
+   * @param createParentIfNotExists flag which determines if parent app node
+   *     needs to be created(as per split) if it does not exist.
+   * @return leaf app node path.
+   * @throws Exception if any problem occurs while performing ZK operation.
+   */
+  private String getLeafAppIdNodePath(String appId, String rootNode,
+      int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception {
+    if (appIdNodeSplitIdx == 0) {
+      return getNodePath(rootNode, appId);
+    }
+    String nodeName = appId;
+    int splitIdx = nodeName.length() - appIdNodeSplitIdx;
+    String rootNodePath =
+        getNodePath(rootNode, nodeName.substring(0, splitIdx));
+    if (createParentIfNotExists && !exists(rootNodePath)) {
+      try {
+        safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT);
+      } catch (KeeperException.NodeExistsException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Unable to create app parent node " + rootNodePath +
+              " as it already exists.");
+        }
+      }
+    }
+    return getNodePath(rootNodePath, nodeName.substring(splitIdx));
+  }
+
+  /**
+   * Returns leaf app node path based on app id and configured split index. If
+   * the passed flag createParentIfNotExists is true, also creates the parent
+   * app node if it does not exist.
+   * @param appId application id.
+   * @param createParentIfNotExists flag which determines if parent app node
+   *     needs to be created(as per split) if it does not exist.
+   * @return leaf app node path.
+   * @throws Exception if any problem occurs while performing ZK operation.
+   */
+  private String getLeafAppIdNodePath(String appId,
+      boolean createParentIfNotExists) throws Exception {
+    return getLeafAppIdNodePath(appId, rmAppRootHierarchies.get(
+        appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists);
+  }
+
   @VisibleForTesting
   byte[] getData(final String path) throws Exception {
     return curatorFramework.getData().forPath(path);
@@ -930,11 +1227,13 @@ public class ZKRMStateStore extends RMStateStore {
     return curatorFramework.getACL().forPath(path);
   }
 
-  private List<String> getChildren(final String path) throws Exception {
+  @VisibleForTesting
+  List<String> getChildren(final String path) throws Exception {
     return curatorFramework.getChildren().forPath(path);
   }
 
-  private boolean exists(final String path) throws Exception {
+  @VisibleForTesting
+  boolean exists(final String path) throws Exception {
     return curatorFramework.checkExists().forPath(path) != null;
   }
 
@@ -963,6 +1262,11 @@ public class ZKRMStateStore extends RMStateStore {
     }
   }
 
+  /**
+   * Deletes the path. Checks for existence of path as well.
+   * @param path Path to be deleted.
+   * @throws Exception if any problem occurs while performing deletion.
+   */
   private void safeDelete(final String path) throws Exception {
     if (exists(path)) {
       SafeTransaction transaction = new SafeTransaction();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e52789e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMStoreCommands.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMStoreCommands.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMStoreCommands.java
index 6c74616..0203351 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMStoreCommands.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMStoreCommands.java
@@ -87,8 +87,13 @@ public class TestRMStoreCommands {
           ZKRMStateStore.ROOT_ZNODE_NAME + "/" + RMStateStore.RM_APP_ROOT;
       String appIdPath = appRootPath + "/" + appId;
       curatorFramework.create().forPath(appIdPath);
-      assertEquals("Application node for " + appId + "should exist",
-          appId, curatorFramework.getChildren().forPath(appRootPath).get(0));
+      for (String path : curatorFramework.getChildren().forPath(appRootPath)) {
+        if (path.equals(ZKRMStateStore.RM_APP_ROOT_HIERARCHIES)) {
+          continue;
+        }
+        assertEquals("Application node for " + appId + " should exist",
+            appId, path);
+      }
       try {
         ResourceManager.removeApplication(conf, appId);
       } catch (Exception e) {
@@ -96,8 +101,10 @@ public class TestRMStoreCommands {
             "rm state store.");
       }
       assertTrue("After remove app from store there should be no child nodes" +
-          " in app root path",
-          curatorFramework.getChildren().forPath(appRootPath).isEmpty());
+          " for application in app root path",
+          curatorFramework.getChildren().forPath(appRootPath).size() == 1 &&
+          curatorFramework.getChildren().forPath(appRootPath).get(0).equals(
+              ZKRMStateStore.RM_APP_ROOT_HIERARCHIES));
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e52789e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 514e9a0..ca97914 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -156,8 +156,7 @@ public class RMStateStoreTestBase {
   }
 
   protected RMApp storeApp(RMStateStore store, ApplicationId appId,
-      long submitTime,
-      long startTime) throws Exception {
+      long submitTime, long startTime) throws Exception {
     ApplicationSubmissionContext context =
         new ApplicationSubmissionContextPBImpl();
     context.setApplicationId(appId);
@@ -200,6 +199,13 @@ public class RMStateStoreTestBase {
     return mockAttempt;
   }
 
+  protected void updateAttempt(RMStateStore store, TestDispatcher dispatcher,
+      ApplicationAttemptStateData attemptState) {
+    dispatcher.attemptId = attemptState.getAttemptId();
+    store.updateApplicationAttemptState(attemptState);
+    waitNotify(dispatcher);
+  }
+
   void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
           throws Exception {
     testRMAppStateStore(stateStoreHelper, new StoreStateVerifier());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e52789e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 6d5d2d7..7c40ddf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -40,19 +42,25 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPB
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Perms;
 import org.apache.zookeeper.data.ACL;
@@ -61,13 +69,22 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import javax.crypto.SecretKey;
 
@@ -131,9 +148,21 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
         return CURRENT_VERSION_INFO;
       }
 
+      private String getAppNode(String appId, int splitIdx) {
+        String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" +
+            RM_APP_ROOT;
+        String appPath = appId;
+        if (splitIdx != 0) {
+          int idx = appId.length() - splitIdx;
+          appPath = appId.substring(0, idx) + "/" + appId.substring(idx);
+          return rootPath + "/" + RM_APP_ROOT_HIERARCHIES + "/" +
+              Integer.toString(splitIdx) + "/" + appPath;
+        }
+        return rootPath + "/" + appPath;
+      }
+
       public String getAppNode(String appId) {
-        return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/"
-            + appId;
+        return getAppNode(appId, 0);
       }
 
       public String getAttemptNode(String appId, String attemptId) {
@@ -150,8 +179,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
 
     }
 
-    public RMStateStore getRMStateStore() throws Exception {
-      YarnConfiguration conf = new YarnConfiguration();
+    private RMStateStore createStore(Configuration conf) throws Exception {
       workingZnode = "/jira/issue/3077/rmstore";
       conf.set(YarnConfiguration.RM_ZK_ADDRESS,
           curatorTestingServer.getConnectString());
@@ -160,6 +188,15 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
       return this.store;
     }
 
+    public RMStateStore getRMStateStore(Configuration conf) throws Exception {
+      return createStore(conf);
+    }
+
+    public RMStateStore getRMStateStore() throws Exception {
+      YarnConfiguration conf = new YarnConfiguration();
+      return createStore(conf);
+    }
+
     @Override
     public boolean isFinalStateValid() throws Exception {
       return 1 ==
@@ -179,8 +216,12 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
     }
 
     public boolean appExists(RMApp app) throws Exception {
+      String appIdPath = app.getApplicationId().toString();
+      int split =
+          store.getConfig().getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
+          YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
       return null != curatorFramework.checkExists()
-          .forPath(store.getAppNode(app.getApplicationId().toString()));
+          .forPath(store.getAppNode(appIdPath, split));
     }
 
     public boolean attemptExists(RMAppAttempt attempt) throws Exception {
@@ -343,7 +384,6 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
     rm.close();
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testFencing() throws Exception {
     StateChangeRequestInfo req = new StateChangeRequestInfo(
@@ -383,13 +423,15 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
     assertEquals("RM should be Active",
         HAServiceProtocol.HAServiceState.ACTIVE,
         rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+    rm1.close();
+    rm2.close();
   }
   
   @Test
   public void testFencedState() throws Exception {
     TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
-	RMStateStore store = zkTester.getRMStateStore();
-   
+    RMStateStore store = zkTester.getRMStateStore();
+
     // Move state to FENCED from ACTIVE
     store.updateFencedState();
     assertEquals("RMStateStore should have been in fenced state",
@@ -528,4 +570,518 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
     }
     store.close();
   }
+
+  private static String createPath(String... parts) {
+    return Joiner.on("/").join(parts);
+  }
+
+  private static Configuration createConfForAppNodeSplit(int splitIndex) {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, splitIndex);
+    return conf;
+  }
+
+  private static RMApp createMockAppForRemove(ApplicationId appId,
+      ApplicationAttemptId... attemptIds) {
+    RMApp app = mock(RMApp.class);
+    ApplicationSubmissionContextPBImpl context =
+        new ApplicationSubmissionContextPBImpl();
+    context.setApplicationId(appId);
+    when(app.getApplicationSubmissionContext()).thenReturn(context);
+    when(app.getUser()).thenReturn("test");
+    if (attemptIds.length > 0) {
+      HashMap<ApplicationAttemptId, RMAppAttempt> attempts = new HashMap<>();
+      for (ApplicationAttemptId attemptId : attemptIds) {
+        RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+        when(appAttempt.getAppAttemptId()).thenReturn(attemptId);
+        attempts.put(attemptId, appAttempt);
+      }
+      when(app.getAppAttempts()).thenReturn(attempts);
+    }
+    return app;
+  }
+
+  private static void verifyLoadedApp(ApplicationStateData appState,
+      ApplicationId appId, String user, long submitTime, long startTime,
+      RMAppState state, long finishTime, String diagnostics) {
+    // Check if app is loaded correctly
+    assertNotNull("App " + appId + " should have been loaded.", appState);
+    assertEquals("App submit time in app state", submitTime,
+        appState.getSubmitTime());
+    assertEquals("App start time in app state", startTime,
+        appState.getStartTime());
+    assertEquals("App ID in app state", appId,
+        appState.getApplicationSubmissionContext().getApplicationId());
+    assertEquals("App state", state, appState.getState());
+    assertEquals("Finish time in app state", finishTime,
+        appState.getFinishTime());
+    assertEquals("User in app state", user, appState.getUser());
+    assertEquals("Diagnostics in app state", diagnostics,
+        appState.getDiagnostics());
+  }
+
+  private static void verifyLoadedApp(RMState rmState,
+      ApplicationId appId, long submitTime, long startTime, long finishTime,
+      boolean isFinished, List<ApplicationAttemptId> attempts) {
+    verifyLoadedApp(rmState, appId, submitTime, startTime, finishTime,
+        isFinished, attempts, null, null);
+  }
+
+  private static void verifyLoadedApp(RMState rmState,
+      ApplicationId appId, long submitTime, long startTime, long finishTime,
+      boolean isFinished, List<ApplicationAttemptId> attempts,
+      List<Integer> amExitStatuses,
+      List<FinalApplicationStatus> finalStatuses) {
+    Map<ApplicationId, ApplicationStateData> rmAppState =
+        rmState.getApplicationState();
+    ApplicationStateData appState = rmAppState.get(appId);
+    assertNotNull(appId + " is not there in loaded apps", appState);
+    verifyLoadedApp(appState, appId, "test", submitTime, startTime,
+        isFinished ? RMAppState.FINISHED : null, finishTime,
+        isFinished ? "appDiagnostics" : "");
+    // Check attempt state.
+    if (attempts != null) {
+      assertEquals("Attempts loaded for app " + appId, attempts.size(),
+          appState.attempts.size());
+      if (finalStatuses != null && amExitStatuses != null) {
+        for (int i = 0; i < attempts.size(); i++) {
+          if (finalStatuses.get(i) != null) {
+            verifyLoadedAttempt(appState, attempts.get(i),
+                amExitStatuses.get(i), true);
+          } else {
+            verifyLoadedAttempt(appState, attempts.get(i),
+                amExitStatuses.get(i), false);
+          }
+        }
+      }
+    } else {
+      assertEquals(
+          "Attempts loaded for app " + appId, 0, appState.attempts.size());
+    }
+  }
+
+  private static void verifyLoadedAttempt(ApplicationStateData appState,
+      ApplicationAttemptId attemptId, int amExitStatus, boolean isFinished) {
+    verifyLoadedAttempt(appState, attemptId, isFinished ? "myTrackingUrl" :
+        "N/A", ContainerId.newContainerId(attemptId, 1), null,
+        isFinished ? RMAppAttemptState.FINISHED : null, isFinished ?
+        "attemptDiagnostics" : "", 0, amExitStatus,
+        isFinished ? FinalApplicationStatus.SUCCEEDED : null);
+  }
+
+  private static void verifyLoadedAttempt(ApplicationStateData appState,
+      ApplicationAttemptId attemptId, String trackingURL,
+      ContainerId masterContainerId, SecretKey clientTokenKey,
+      RMAppAttemptState state, String diagnostics, long finishTime,
+      int amExitStatus, FinalApplicationStatus finalStatus) {
+    ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId);
+    // Check if attempt is loaded correctly
+    assertNotNull(
+        "Attempt " + attemptId + " should have been loaded.", attemptState);
+    assertEquals("Attempt Id in attempt state",
+        attemptId, attemptState.getAttemptId());
+    assertEquals("Master Container Id in attempt state",
+        masterContainerId, attemptState.getMasterContainer().getId());
+    if (null != clientTokenKey) {
+      assertArrayEquals("Client token key in attempt state",
+          clientTokenKey.getEncoded(), attemptState.getAppAttemptTokens().
+          getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+    }
+    assertEquals("Attempt state", state, attemptState.getState());
+    assertEquals("Finish time in attempt state", finishTime,
+        attemptState.getFinishTime());
+    assertEquals("Diagnostics in attempt state", diagnostics,
+        attemptState.getDiagnostics());
+    assertEquals("AM Container exit status in attempt state", amExitStatus,
+        attemptState.getAMContainerExitStatus());
+    assertEquals("Final app status in attempt state", finalStatus,
+        attemptState.getFinalApplicationStatus());
+    assertEquals("Tracking URL in attempt state", trackingURL,
+        attemptState.getFinalTrackingUrl());
+  }
+
+  private static ApplicationStateData createAppState(
+      ApplicationSubmissionContext ctxt, long submitTime, long startTime,
+      long finishTime, boolean isFinished) {
+    return ApplicationStateData.newInstance(submitTime, startTime, "test",
+        ctxt, isFinished ? RMAppState.FINISHED : null, isFinished ?
+        "appDiagnostics" : "", isFinished ? finishTime : 0, null);
+  }
+
+  private static ApplicationAttemptStateData createFinishedAttempt(
+      ApplicationAttemptId attemptId, Container container, long startTime,
+      int amExitStatus) {
+    return ApplicationAttemptStateData.newInstance(attemptId,
+        container, null, startTime, RMAppAttemptState.FINISHED,
+        "myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED,
+        amExitStatus, 0, 0, 0, 0, 0);
+  }
+
+  private ApplicationAttemptId storeAttempt(RMStateStore store,
+      TestDispatcher dispatcher, String appAttemptIdStr,
+      AMRMTokenSecretManager appTokenMgr,
+      ClientToAMTokenSecretManagerInRM clientToAMTokenMgr,
+      boolean createContainer) throws Exception {
+    ApplicationAttemptId attemptId =
+        ApplicationAttemptId.fromString(appAttemptIdStr);
+    Token<AMRMTokenIdentifier> appAttemptToken = null;
+    if (appTokenMgr != null) {
+      appAttemptToken = generateAMRMToken(attemptId, appTokenMgr);
+    }
+    SecretKey clientTokenKey = null;
+    if (clientToAMTokenMgr != null) {
+      clientTokenKey = clientToAMTokenMgr.createMasterKey(attemptId);
+      Credentials attemptCred = new Credentials();
+      attemptCred.addSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME,
+          clientTokenKey.getEncoded());
+    }
+    ContainerId containerId = null;
+    if (createContainer) {
+      containerId = ContainerId.newContainerId(attemptId, 1);
+    }
+    storeAttempt(store, attemptId, containerId.toString(), appAttemptToken,
+        clientTokenKey, dispatcher);
+    return attemptId;
+  }
+
+  private void finishAppWithAttempts(RMState state, RMStateStore store,
+      TestDispatcher dispatcher, ApplicationAttemptId attemptId,
+      long submitTime, long startTime, int amExitStatus, long finishTime,
+      boolean createNewApp) throws Exception {
+    ApplicationId appId = attemptId.getApplicationId();
+    ApplicationStateData appStateNew = null;
+    if (createNewApp) {
+      ApplicationSubmissionContext context =
+          new ApplicationSubmissionContextPBImpl();
+      context.setApplicationId(appId);
+      appStateNew = createAppState(context, submitTime, startTime, finishTime,
+          true);
+    } else {
+      ApplicationStateData appState = state.getApplicationState().get(appId);
+      appStateNew = createAppState(appState.getApplicationSubmissionContext(),
+          submitTime, startTime, finishTime, true);
+      appStateNew.attempts.putAll(appState.attempts);
+    }
+    store.updateApplicationState(appStateNew);
+    waitNotify(dispatcher);
+    Container container = new ContainerPBImpl();
+    container.setId(ContainerId.newContainerId(attemptId, 1));
+    ApplicationAttemptStateData newAttemptState =
+        createFinishedAttempt(attemptId, container, startTime, amExitStatus);
+    updateAttempt(store, dispatcher, newAttemptState);
+  }
+
+  private void storeAppWithAttempts(RMStateStore store,
+      TestDispatcher dispatcher, ApplicationAttemptId attemptId,
+      long submitTime, long startTime) throws Exception {
+    storeAppWithAttempts(store, dispatcher, submitTime, startTime, null, null,
+        attemptId);
+  }
+
+  private void storeApp(RMStateStore store, TestDispatcher dispatcher,
+      ApplicationId appId, long submitTime, long startTime) throws Exception {
+    storeApp(store, appId, submitTime, startTime);
+    waitNotify(dispatcher);
+  }
+
+  private void storeAppWithAttempts(RMStateStore store,
+      TestDispatcher dispatcher, long submitTime, long startTime,
+      AMRMTokenSecretManager appTokenMgr,
+      ClientToAMTokenSecretManagerInRM clientToAMTokenMgr,
+      ApplicationAttemptId attemptId, ApplicationAttemptId... attemptIds)
+      throws Exception {
+    ApplicationId appId = attemptId.getApplicationId();
+    storeApp(store, dispatcher, appId, submitTime, startTime);
+    storeAttempt(store, dispatcher, attemptId.toString(), appTokenMgr,
+        clientToAMTokenMgr, true);
+    for (ApplicationAttemptId attempt : attemptIds) {
+      storeAttempt(store, dispatcher, attempt.toString(), appTokenMgr,
+          clientToAMTokenMgr, true);
+    }
+  }
+
+  private static void removeApps(RMStateStore store,
+      Map<ApplicationId, ApplicationAttemptId[]> appWithAttempts) {
+    for (Map.Entry<ApplicationId, ApplicationAttemptId[]> entry :
+        appWithAttempts.entrySet()) {
+      RMApp mockApp = createMockAppForRemove(entry.getKey(), entry.getValue());
+      store.removeApplication(mockApp);
+    }
+  }
+
+  private static void verifyAppPathPath(RMStateStore store, ApplicationId appId,
+        int splitIndex) throws Exception {
+    String appIdStr = appId.toString();
+    String appParent = appIdStr.substring(0, appIdStr.length() - splitIndex);
+    String appPath = appIdStr.substring(appIdStr.length() - splitIndex);
+    String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
+        ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT,
+        ZKRMStateStore.RM_APP_ROOT_HIERARCHIES, String.valueOf(splitIndex),
+        appParent, appPath);
+    assertTrue("Application with id " + appIdStr + " does not exist as per " +
+        "split in state store.", ((ZKRMStateStore)store).exists(path));
+  }
+
+  private static void verifyAppInHierarchicalPath(RMStateStore store,
+      String appId, int splitIdx) throws Exception {
+    String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
+        ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT);
+    if (splitIdx != 0) {
+      path = createPath(path, ZKRMStateStore.RM_APP_ROOT_HIERARCHIES,
+          String.valueOf(splitIdx), appId.substring(0, appId.length() -
+          splitIdx), appId.substring(appId.length() - splitIdx));
+    } else {
+      path = createPath(path, appId);
+    }
+    assertTrue(appId + " should exist in path " + path,
+        ((ZKRMStateStore)store).exists(createPath(path)));
+  }
+
+  private static void assertHierarchicalPaths(RMStateStore store,
+      Map<Integer, Integer> pathToApps) throws Exception {
+    for (Map.Entry<Integer, Integer> entry : pathToApps.entrySet()) {
+      String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
+          ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT);
+      if (entry.getKey() != 0) {
+        path = createPath(path, ZKRMStateStore.RM_APP_ROOT_HIERARCHIES,
+            String.valueOf(entry.getKey()));
+      }
+      assertEquals("Number of childrens for path " + path,
+          (int) entry.getValue(),
+          ((ZKRMStateStore)store).getChildren(path).size());
+    }
+  }
+
+  // Test to verify storing of apps and app attempts in ZK state store with app
+  // node split index configured more than 0.
+  @Test
+  public void testAppNodeSplit() throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+    long submitTime = System.currentTimeMillis();
+    long startTime = submitTime + 1234;
+    Configuration conf = new YarnConfiguration();
+
+    // Get store with app node split config set as 1.
+    RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setRMDispatcher(dispatcher);
+
+    // Create RM Context and app token manager.
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getStateStore()).thenReturn(store);
+    AMRMTokenSecretManager appTokenMgr =
+        spy(new AMRMTokenSecretManager(conf, rmContext));
+    MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+    when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+    ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
+        new ClientToAMTokenSecretManagerInRM();
+
+    // Store app1.
+    ApplicationId appId1 = ApplicationId.newInstance(1352994193343L, 1);
+    ApplicationAttemptId attemptId1 =
+        ApplicationAttemptId.newInstance(appId1, 1);
+    ApplicationAttemptId attemptId2 =
+        ApplicationAttemptId.newInstance(appId1, 2);
+    storeAppWithAttempts(store, dispatcher, submitTime, startTime,
+        appTokenMgr, clientToAMTokenMgr, attemptId1, attemptId2);
+
+    // Store app2 with app id application_1352994193343_120213.
+    ApplicationId appId21 = ApplicationId.newInstance(1352994193343L, 120213);
+    storeApp(store, appId21, submitTime, startTime);
+    waitNotify(dispatcher);
+
+    // Store another app which will be removed.
+    ApplicationId appIdRemoved = ApplicationId.newInstance(1352994193343L, 2);
+    ApplicationAttemptId attemptIdRemoved =
+        ApplicationAttemptId.newInstance(appIdRemoved, 1);
+    storeAppWithAttempts(store, dispatcher, submitTime, startTime,
+        null, null, attemptIdRemoved);
+    // Remove the app.
+    RMApp mockRemovedApp =
+        createMockAppForRemove(appIdRemoved, attemptIdRemoved);
+    store.removeApplication(mockRemovedApp);
+    // Close state store
+    store.close();
+
+    // Load state store
+    store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
+    store.setRMDispatcher(dispatcher);
+    RMState state = store.loadState();
+    // Check if application_1352994193343_120213 (i.e. app2) exists in state
+    // store as per split index.
+    verifyAppPathPath(store, appId21, 1);
+
+    // Verify loaded apps and attempts based on the operations we did before
+    // reloading the state store.
+    verifyLoadedApp(state, appId1, submitTime, startTime, 0, false,
+        Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
+        -1000), Lists.newArrayList((FinalApplicationStatus) null, null));
+
+    // Update app state for app1.
+    finishAppWithAttempts(state, store, dispatcher, attemptId2, submitTime,
+        startTime, 100, 1234, false);
+
+    // Test updating app/attempt for app whose initial state is not saved
+    ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
+    ApplicationAttemptId dummyAttemptId =
+        ApplicationAttemptId.newInstance(dummyAppId, 6);
+    finishAppWithAttempts(state, store, dispatcher, dummyAttemptId, submitTime,
+        startTime, 111, 1234, true);
+    // Close the store
+    store.close();
+
+    // Check updated application state.
+    store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
+    store.setRMDispatcher(dispatcher);
+    RMState newRMState = store.loadState();
+    verifyLoadedApp(newRMState, dummyAppId, submitTime, startTime, 1234, true,
+        Lists.newArrayList(dummyAttemptId), Lists.newArrayList(111),
+        Lists.newArrayList(FinalApplicationStatus.SUCCEEDED));
+    verifyLoadedApp(newRMState, appId1, submitTime, startTime, 1234, true,
+        Lists.newArrayList(attemptId1, attemptId2),
+        Lists.newArrayList(-1000, 100), Lists.newArrayList(null,
+        FinalApplicationStatus.SUCCEEDED));
+
+    // assert store is in expected state after everything is cleaned
+    assertTrue("Store is not in expected state", zkTester.isFinalStateValid());
+    store.close();
+  }
+
+  // Test to verify storing of apps and app attempts in ZK state store with app
+  // node split index config changing across restarts.
+  @Test
+  public void testAppNodeSplitChangeAcrossRestarts() throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+    long submitTime = System.currentTimeMillis();
+    long startTime = submitTime + 1234;
+    Configuration conf = new YarnConfiguration();
+
+    // Create store with app node split set as 1.
+    RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setRMDispatcher(dispatcher);
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getStateStore()).thenReturn(store);
+    AMRMTokenSecretManager appTokenMgr =
+        spy(new AMRMTokenSecretManager(conf, rmContext));
+    MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+    when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+    ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
+        new ClientToAMTokenSecretManagerInRM();
+
+    // Store app1 with 2 attempts.
+    ApplicationId appId1 = ApplicationId.newInstance(1442994194053L, 1);
+    ApplicationAttemptId attemptId1 =
+        ApplicationAttemptId.newInstance(appId1, 1);
+    ApplicationAttemptId attemptId2 =
+        ApplicationAttemptId.newInstance(appId1, 2);
+    storeAppWithAttempts(store, dispatcher, submitTime, startTime,
+        appTokenMgr, clientToAMTokenMgr, attemptId1, attemptId2);
+
+    // Store app2 and associated attempt.
+    ApplicationId appId11 = ApplicationId.newInstance(1442994194053L, 2);
+    ApplicationAttemptId attemptId11 =
+        ApplicationAttemptId.newInstance(appId11, 1);
+    storeAppWithAttempts(store, dispatcher, attemptId11, submitTime, startTime);
+    // Close state store
+    store.close();
+
+    // Load state store with app node split config of 2.
+    store = zkTester.getRMStateStore(createConfForAppNodeSplit(2));
+    store.setRMDispatcher(dispatcher);
+    RMState state = store.loadState();
+    ApplicationId appId21 = ApplicationId.newInstance(1442994194053L, 120213);
+    storeApp(store, dispatcher, appId21, submitTime, startTime);
+
+    // Check if app is loaded correctly despite change in split index.
+    verifyLoadedApp(state, appId1, submitTime, startTime, 0, false,
+        Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
+        -1000), Lists.newArrayList((FinalApplicationStatus) null, null));
+
+    // Finish app/attempt state
+    finishAppWithAttempts(state, store, dispatcher, attemptId2, submitTime,
+        startTime, 100, 1234, false);
+
+    // Test updating app/attempt for app whose initial state is not saved
+    ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
+    ApplicationAttemptId dummyAttemptId =
+        ApplicationAttemptId.newInstance(dummyAppId, 6);
+    finishAppWithAttempts(state, store, dispatcher, dummyAttemptId, submitTime,
+        startTime, 111, 1234, true);
+    // Close the store
+    store.close();
+
+    // Load state store this time with split index of 0.
+    store = zkTester.getRMStateStore(createConfForAppNodeSplit(0));
+    store.setRMDispatcher(dispatcher);
+    state = store.loadState();
+    assertEquals("Number of Apps loaded should be 4.", 4,
+        state.getApplicationState().size());
+    verifyLoadedApp(state, appId1, submitTime, startTime, 1234, true,
+        Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
+        100), Lists.newArrayList(null, FinalApplicationStatus.SUCCEEDED));
+    // Remove attempt1
+    store.removeApplicationAttempt(attemptId1);
+    ApplicationId appId31 = ApplicationId.newInstance(1442994195071L, 45);
+    storeApp(store, dispatcher, appId31, submitTime, startTime);
+    // Close state store.
+    store.close();
+
+    // Load state store with split index of 3.
+    store = zkTester.getRMStateStore(createConfForAppNodeSplit(3));
+    store.setRMDispatcher(dispatcher);
+    state = store.loadState();
+    assertEquals("Number of apps loaded should be 5.", 5,
+        state.getApplicationState().size());
+    verifyLoadedApp(state, dummyAppId, submitTime, startTime, 1234, true,
+        Lists.newArrayList(dummyAttemptId), Lists.newArrayList(111),
+        Lists.newArrayList(FinalApplicationStatus.SUCCEEDED));
+    verifyLoadedApp(state, appId31, submitTime, startTime, 0, false, null);
+    verifyLoadedApp(state, appId21, submitTime, startTime, 0, false, null);
+    verifyLoadedApp(state, appId11, submitTime, startTime, 0, false,
+        Lists.newArrayList(attemptId11), Lists.newArrayList(-1000),
+        Lists.newArrayList((FinalApplicationStatus) null));
+    verifyLoadedApp(state, appId1, submitTime, startTime, 1234, true,
+        Lists.newArrayList(attemptId2), Lists.newArrayList(100),
+        Lists.newArrayList(FinalApplicationStatus.SUCCEEDED));
+
+    // Store another app.
+    ApplicationId appId41 = ApplicationId.newInstance(1442994195087L, 1);
+    storeApp(store, dispatcher, appId41, submitTime, startTime);
+    // Check how many apps exist in each of the hierarchy based paths. 0 paths
+    // should exist in "HIERARCHIES/4" path as app split index was never set
+    // as 4 in tests above.
+    assertHierarchicalPaths(store, ImmutableMap.of(0, 2, 1, 1, 2, 2,
+        3, 1, 4, 0));
+    verifyAppInHierarchicalPath(store, "application_1442994195087_0001", 3);
+
+    ApplicationId appId71 = ApplicationId.newInstance(1442994195087L, 7);
+    //storeApp(store, dispatcher, appId71, submitTime, startTime);
+    storeApp(store, appId71, submitTime, startTime);
+    waitNotify(dispatcher);
+    ApplicationAttemptId attemptId71 =
+        ApplicationAttemptId.newInstance(appId71, 1);
+    storeAttempt(store, ApplicationAttemptId.newInstance(appId71, 1),
+        ContainerId.newContainerId(attemptId71, 1).toString(), null, null,
+        dispatcher);
+    // Remove applications.
+    removeApps(store, ImmutableMap.of(appId11, new ApplicationAttemptId[]
+        {attemptId11}, appId71, new ApplicationAttemptId[] {attemptId71},
+        appId41, new ApplicationAttemptId[0], appId31,
+        new ApplicationAttemptId[0], appId21, new ApplicationAttemptId[0]));
+    removeApps(store, ImmutableMap.of(dummyAppId,
+        new ApplicationAttemptId[] {dummyAttemptId}, appId1,
+        new ApplicationAttemptId[] {attemptId1, attemptId2}));
+    store.close();
+
+    // Load state store with split index of 3 again. As all apps have been
+    // removed nothing should be loaded back.
+    store = zkTester.getRMStateStore(createConfForAppNodeSplit(3));
+    store.setRMDispatcher(dispatcher);
+    state = store.loadState();
+    assertEquals("Number of apps loaded should be 0.", 0,
+        state.getApplicationState().size());
+    // Close the state store.
+    store.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org