You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/06/08 23:51:11 UTC

hadoop git commit: YARN-2716. Refactor ZKRMStateStore retry code with Apache Curator. Contributed by Karthik Kambatla

Repository: hadoop
Updated Branches:
  refs/heads/trunk 0e80d5198 -> 960b8f19c


YARN-2716. Refactor ZKRMStateStore retry code with Apache Curator. Contributed by Karthik Kambatla


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/960b8f19
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/960b8f19
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/960b8f19

Branch: refs/heads/trunk
Commit: 960b8f19ca98dbcfdd30f2f1f275b8718d2e872f
Parents: 0e80d51
Author: Jian He <ji...@apache.org>
Authored: Mon Jun 8 14:50:58 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Mon Jun 8 14:50:58 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   2 +-
 .../hadoop-yarn-server-resourcemanager/pom.xml  |   8 +
 .../recovery/ZKRMStateStore.java                | 770 ++++++-------------
 .../recovery/RMStateStoreTestBase.java          |   3 +-
 .../recovery/TestZKRMStateStore.java            |  83 +-
 .../recovery/TestZKRMStateStorePerf.java        |  12 +-
 .../TestZKRMStateStoreZKClientConnections.java  | 181 +----
 8 files changed, 336 insertions(+), 726 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f393cad..86494cc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -294,6 +294,9 @@ Release 2.8.0 - UNRELEASED
 
     YARN-1462. AHS API and other AHS changes to handle tags for completed MR jobs. (xgong)
 
+    YARN-2716. Refactor ZKRMStateStore retry code with Apache Curator. 
+    (Karthik Kambatla via jianhe)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 72855cc..3ea1558 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -413,7 +413,7 @@ public class YarnConfiguration extends Configuration {
 
   public static final String RM_ZK_RETRY_INTERVAL_MS =
       RM_ZK_PREFIX + "retry-interval-ms";
-  public static final long DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000;
+  public static final int DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000;
 
   public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms";
   public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 76d280a..4960f95 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -175,6 +175,14 @@
       <artifactId>hadoop-yarn-server-web-proxy</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 15ac971..bca5348 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -22,22 +22,25 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.IOException;
 import java.nio.charset.Charset;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ZKUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -64,14 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.Ap
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
@@ -80,7 +76,37 @@ import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 import com.google.common.annotations.VisibleForTesting;
 
 /**
- * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
+ * {@link RMStateStore} implementation backed by ZooKeeper.
+ *
+ * The znode structure is as follows:
+ * ROOT_DIR_PATH
+ * |--- VERSION_INFO
+ * |--- EPOCH_NODE
+ * |--- RM_ZK_FENCING_LOCK
+ * |--- RM_APP_ROOT
+ * |     |----- (#ApplicationId1)
+ * |     |        |----- (#ApplicationAttemptIds)
+ * |     |
+ * |     |----- (#ApplicationId2)
+ * |     |       |----- (#ApplicationAttemptIds)
+ * |     ....
+ * |
+ * |--- RM_DT_SECRET_MANAGER_ROOT
+ *        |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
+ *        |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
+ *        |       |----- Token_1
+ *        |       |----- Token_2
+ *        |       ....
+ *        |
+ *        |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
+ *        |      |----- Key_1
+ *        |      |----- Key_2
+ *                ....
+ * |--- AMRMTOKEN_SECRET_MANAGER_ROOT
+ *        |----- currentMasterKey
+ *        |----- nextMasterKey
+ *
+ * Note: Changes from 1.1 to 1.2 - AMRMTokenSecretManager state has been saved
  * separately. The currentMasterkey and nextMasterkey have been stored.
  * Also, AMRMToken has been removed from ApplicationAttemptState.
  */
@@ -100,46 +126,14 @@ public class ZKRMStateStore extends RMStateStore {
       "RMDTSequentialNumber";
   private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
       "RMDTMasterKeysRoot";
-  private int numRetries;
 
   private String zkHostPort = null;
+  private int numRetries;
   private int zkSessionTimeout;
-
   @VisibleForTesting
-  long zkRetryInterval;
-  private List<ACL> zkAcl;
-  private List<ZKUtil.ZKAuthInfo> zkAuths;
+  int zkRetryInterval;
 
-  /**
-   *
-   * ROOT_DIR_PATH
-   * |--- VERSION_INFO
-   * |--- EPOCH_NODE
-   * |--- RM_ZK_FENCING_LOCK
-   * |--- RM_APP_ROOT
-   * |     |----- (#ApplicationId1)
-   * |     |        |----- (#ApplicationAttemptIds)
-   * |     |
-   * |     |----- (#ApplicationId2)
-   * |     |       |----- (#ApplicationAttemptIds)
-   * |     ....
-   * |
-   * |--- RM_DT_SECRET_MANAGER_ROOT
-   *        |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
-   *        |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
-   *        |       |----- Token_1
-   *        |       |----- Token_2
-   *        |       ....
-   *        |
-   *        |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
-   *        |      |----- Key_1
-   *        |      |----- Key_2
-   *                ....
-   * |--- AMRMTOKEN_SECRET_MANAGER_ROOT
-   *        |----- currentMasterKey
-   *        |----- nextMasterKey
-   *
-   */
+  /** Znode paths */
   private String zkRootNodePath;
   private String rmAppRoot;
   private String rmDTSecretManagerRoot;
@@ -147,37 +141,29 @@ public class ZKRMStateStore extends RMStateStore {
   private String delegationTokensRootPath;
   private String dtSequenceNumberPath;
   private String amrmTokenSecretManagerRoot;
-
   @VisibleForTesting
   protected String znodeWorkingPath;
 
-  @VisibleForTesting
-  protected ZooKeeper zkClient;
-
-  /* activeZkClient is not used to do actual operations,
-   * it is only used to verify client session for watched events and
-   * it gets activated into zkClient on connection event.
-   */
-  @VisibleForTesting
-  ZooKeeper activeZkClient;
-
   /** Fencing related variables */
   private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
+  private boolean useDefaultFencingScheme = false;
   private String fencingNodePath;
-  private Op createFencingNodePathOp;
-  private Op deleteFencingNodePathOp;
   private Thread verifyActiveStatusThread;
-  private String zkRootNodeUsername;
-  private final String zkRootNodePassword = Long.toString(random.nextLong());
 
+  /** ACL and auth info */
+  private List<ACL> zkAcl;
+  private List<ZKUtil.ZKAuthInfo> zkAuths;
   @VisibleForTesting
   List<ACL> zkRootNodeAcl;
-  private boolean useDefaultFencingScheme = false;
+  private String zkRootNodeUsername;
+  private final String zkRootNodePassword = Long.toString(random.nextLong());
   public static final int CREATE_DELETE_PERMS =
       ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
   private final String zkRootNodeAuthScheme =
       new DigestAuthenticationProvider().getScheme();
 
+  @VisibleForTesting
+  protected CuratorFramework curatorFramework;
   /**
    * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
    * ZooKeeper access, construct the {@link ACL}s for the store's root node.
@@ -192,7 +178,7 @@ public class ZKRMStateStore extends RMStateStore {
   @Unstable
   protected List<ACL> constructZkRootNodeACL(
       Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
-    List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
+    List<ACL> zkRootNodeAcl = new ArrayList<>();
     for (ACL acl : sourceACLs) {
       zkRootNodeAcl.add(new ACL(
           ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
@@ -231,7 +217,7 @@ public class ZKRMStateStore extends RMStateStore {
       zkRetryInterval = zkSessionTimeout / numRetries;
     } else {
       zkRetryInterval =
-          conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
+          conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
               YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
     }
 
@@ -243,9 +229,6 @@ public class ZKRMStateStore extends RMStateStore {
 
     /* Initialize fencing related paths, acls, and ops */
     fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
-    createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl,
-        CreateMode.PERSISTENT);
-    deleteFencingNodePathOp = Op.delete(fencingNodePath, -1);
     if (HAUtil.isHAEnabled(conf)) {
       String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
           (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
@@ -283,42 +266,23 @@ public class ZKRMStateStore extends RMStateStore {
 
     // ensure root dirs exist
     createRootDirRecursively(znodeWorkingPath);
-    createRootDir(zkRootNodePath);
+    create(zkRootNodePath);
     if (HAUtil.isHAEnabled(getConfig())){
       fence();
       verifyActiveStatusThread = new VerifyActiveStatusThread();
       verifyActiveStatusThread.start();
     }
-    createRootDir(rmAppRoot);
-    createRootDir(rmDTSecretManagerRoot);
-    createRootDir(dtMasterKeysRootPath);
-    createRootDir(delegationTokensRootPath);
-    createRootDir(dtSequenceNumberPath);
-    createRootDir(amrmTokenSecretManagerRoot);
-  }
-
-  protected void createRootDir(final String rootPath) throws Exception {
-    // For root dirs, we shouldn't use the doMulti helper methods
-    new ZKAction<String>() {
-      @Override
-      public String run() throws KeeperException, InterruptedException {
-        try {
-          return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT);
-        } catch (KeeperException ke) {
-          if (ke.code() == Code.NODEEXISTS) {
-            LOG.debug(rootPath + "znode already exists!");
-            return null;
-          } else {
-            throw ke;
-          }
-        }
-      }
-    }.runWithRetries();
+    create(rmAppRoot);
+    create(rmDTSecretManagerRoot);
+    create(dtMasterKeysRootPath);
+    create(delegationTokensRootPath);
+    create(dtSequenceNumberPath);
+    create(amrmTokenSecretManagerRoot);
   }
 
   private void logRootNodeAcls(String prefix) throws Exception {
     Stat getStat = new Stat();
-    List<ACL> getAcls = getACLWithRetries(zkRootNodePath, getStat);
+    List<ACL> getAcls = getACL(zkRootNodePath);
 
     StringBuilder builder = new StringBuilder();
     builder.append(prefix);
@@ -334,51 +298,21 @@ public class ZKRMStateStore extends RMStateStore {
       logRootNodeAcls("Before fencing\n");
     }
 
-    new ZKAction<Void>() {
-      @Override
-      public Void run() throws KeeperException, InterruptedException {
-        zkClient.setACL(zkRootNodePath, zkRootNodeAcl, -1);
-        return null;
-      }
-    }.runWithRetries();
-
-    // delete fencingnodepath
-    new ZKAction<Void>() {
-      @Override
-      public Void run() throws KeeperException, InterruptedException {
-        try {
-          zkClient.multi(Collections.singletonList(deleteFencingNodePathOp));
-        } catch (KeeperException.NoNodeException nne) {
-          LOG.info("Fencing node " + fencingNodePath + " doesn't exist to delete");
-        }
-        return null;
-      }
-    }.runWithRetries();
+    curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath);
+    delete(fencingNodePath);
 
     if (LOG.isTraceEnabled()) {
       logRootNodeAcls("After fencing\n");
     }
   }
 
-  private synchronized void closeZkClients() throws IOException {
-    zkClient = null;
-    if (activeZkClient != null) {
-      try {
-        activeZkClient.close();
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted while closing ZK", e);
-      }
-      activeZkClient = null;
-    }
-  }
-
   @Override
   protected synchronized void closeInternal() throws Exception {
     if (verifyActiveStatusThread != null) {
       verifyActiveStatusThread.interrupt();
       verifyActiveStatusThread.join(1000);
     }
-    closeZkClients();
+    curatorFramework.close();
   }
 
   @Override
@@ -391,10 +325,10 @@ public class ZKRMStateStore extends RMStateStore {
     String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
     byte[] data =
         ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
-    if (existsWithRetries(versionNodePath, false) != null) {
-      setDataWithRetries(versionNodePath, data, -1);
+    if (exists(versionNodePath)) {
+      safeSetData(versionNodePath, data, -1);
     } else {
-      createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
+      safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
     }
   }
 
@@ -402,11 +336,9 @@ public class ZKRMStateStore extends RMStateStore {
   protected synchronized Version loadVersion() throws Exception {
     String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
 
-    if (existsWithRetries(versionNodePath, false) != null) {
-      byte[] data = getDataWithRetries(versionNodePath, false);
-      Version version =
-          new VersionPBImpl(VersionProto.parseFrom(data));
-      return version;
+    if (exists(versionNodePath)) {
+      byte[] data = getData(versionNodePath);
+      return new VersionPBImpl(VersionProto.parseFrom(data));
     }
     return null;
   }
@@ -415,20 +347,20 @@ public class ZKRMStateStore extends RMStateStore {
   public synchronized long getAndIncrementEpoch() throws Exception {
     String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
     long currentEpoch = 0;
-    if (existsWithRetries(epochNodePath, false) != null) {
+    if (exists(epochNodePath)) {
       // load current epoch
-      byte[] data = getDataWithRetries(epochNodePath, false);
+      byte[] data = getData(epochNodePath);
       Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
       currentEpoch = epoch.getEpoch();
       // increment epoch and store it
       byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
           .toByteArray();
-      setDataWithRetries(epochNodePath, storeData, -1);
+      safeSetData(epochNodePath, storeData, -1);
     } else {
       // initialize epoch node with 1 for the next time.
       byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
           .toByteArray();
-      createWithRetries(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
+      safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
     }
     return currentEpoch;
   }
@@ -447,7 +379,7 @@ public class ZKRMStateStore extends RMStateStore {
 
   private void loadAMRMTokenSecretManagerState(RMState rmState)
       throws Exception {
-    byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, false);
+    byte[] data = getData(amrmTokenSecretManagerRoot);
     if (data == null) {
       LOG.warn("There is no data saved");
       return;
@@ -458,7 +390,6 @@ public class ZKRMStateStore extends RMStateStore {
     rmState.amrmTokenSecretManagerState =
         AMRMTokenSecretManagerState.newInstance(
           stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
-
   }
 
   private synchronized void loadRMDTSecretManagerState(RMState rmState)
@@ -470,10 +401,10 @@ public class ZKRMStateStore extends RMStateStore {
 
   private void loadRMDelegationKeyState(RMState rmState) throws Exception {
     List<String> childNodes =
-        getChildrenWithRetries(dtMasterKeysRootPath, false);
+        getChildren(dtMasterKeysRootPath);
     for (String childNodeName : childNodes) {
       String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
-      byte[] childData = getDataWithRetries(childNodePath, false);
+      byte[] childData = getData(childNodePath);
 
       if (childData == null) {
         LOG.warn("Content of " + childNodePath + " is broken.");
@@ -500,7 +431,7 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   private void loadRMSequentialNumberState(RMState rmState) throws Exception {
-    byte[] seqData = getDataWithRetries(dtSequenceNumberPath, false);
+    byte[] seqData = getData(dtSequenceNumberPath);
     if (seqData != null) {
       ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);
       DataInputStream seqIn = new DataInputStream(seqIs);
@@ -515,11 +446,11 @@ public class ZKRMStateStore extends RMStateStore {
 
   private void loadRMDelegationTokenState(RMState rmState) throws Exception {
     List<String> childNodes =
-        getChildrenWithRetries(delegationTokensRootPath, false);
+        getChildren(delegationTokensRootPath);
     for (String childNodeName : childNodes) {
       String childNodePath =
           getNodePath(delegationTokensRootPath, childNodeName);
-      byte[] childData = getDataWithRetries(childNodePath, false);
+      byte[] childData = getData(childNodePath);
 
       if (childData == null) {
         LOG.warn("Content of " + childNodePath + " is broken.");
@@ -551,10 +482,10 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   private synchronized void loadRMAppState(RMState rmState) throws Exception {
-    List<String> childNodes = getChildrenWithRetries(rmAppRoot, false);
+    List<String> childNodes = getChildren(rmAppRoot);
     for (String childNodeName : childNodes) {
       String childNodePath = getNodePath(rmAppRoot, childNodeName);
-      byte[] childData = getDataWithRetries(childNodePath, false);
+      byte[] childData = getData(childNodePath);
       if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
         // application
         if (LOG.isDebugEnabled()) {
@@ -581,11 +512,11 @@ public class ZKRMStateStore extends RMStateStore {
       ApplicationId appId)
       throws Exception {
     String appPath = getNodePath(rmAppRoot, appId.toString());
-    List<String> attempts = getChildrenWithRetries(appPath, false);
+    List<String> attempts = getChildren(appPath);
     for (String attemptIDStr : attempts) {
       if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
         String attemptPath = getNodePath(appPath, attemptIDStr);
-        byte[] attemptData = getDataWithRetries(attemptPath, false);
+        byte[] attemptData = getData(attemptPath);
 
         ApplicationAttemptStateDataPBImpl attemptState =
             new ApplicationAttemptStateDataPBImpl(
@@ -606,8 +537,8 @@ public class ZKRMStateStore extends RMStateStore {
       LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
     }
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
-    createWithRetries(nodeCreatePath, appStateData, zkAcl,
-      CreateMode.PERSISTENT);
+    safeCreate(nodeCreatePath, appStateData, zkAcl,
+        CreateMode.PERSISTENT);
 
   }
 
@@ -622,11 +553,11 @@ public class ZKRMStateStore extends RMStateStore {
     }
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
 
-    if (existsWithRetries(nodeUpdatePath, false) != null) {
-      setDataWithRetries(nodeUpdatePath, appStateData, -1);
+    if (exists(nodeUpdatePath)) {
+      safeSetData(nodeUpdatePath, appStateData, -1);
     } else {
-      createWithRetries(nodeUpdatePath, appStateData, zkAcl,
-        CreateMode.PERSISTENT);
+      safeCreate(nodeUpdatePath, appStateData, zkAcl,
+          CreateMode.PERSISTENT);
       LOG.debug(appId + " znode didn't exist. Created a new znode to"
           + " update the application state.");
     }
@@ -646,8 +577,8 @@ public class ZKRMStateStore extends RMStateStore {
           + nodeCreatePath);
     }
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
-    createWithRetries(nodeCreatePath, attemptStateData, zkAcl,
-      CreateMode.PERSISTENT);
+    safeCreate(nodeCreatePath, attemptStateData, zkAcl,
+        CreateMode.PERSISTENT);
   }
 
   @Override
@@ -665,11 +596,11 @@ public class ZKRMStateStore extends RMStateStore {
     }
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
 
-    if (existsWithRetries(nodeUpdatePath, false) != null) {
-      setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
+    if (exists(nodeUpdatePath)) {
+      safeSetData(nodeUpdatePath, attemptStateData, -1);
     } else {
-      createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
-        CreateMode.PERSISTENT);
+      safeCreate(nodeUpdatePath, attemptStateData, zkAcl,
+          CreateMode.PERSISTENT);
       LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
           + " update the application attempt state.");
     }
@@ -682,28 +613,26 @@ public class ZKRMStateStore extends RMStateStore {
     String appId = appState.getApplicationSubmissionContext().getApplicationId()
         .toString();
     String appIdRemovePath = getNodePath(rmAppRoot, appId);
-    ArrayList<Op> opList = new ArrayList<Op>();
-
-    for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
-      String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
-      opList.add(Op.delete(attemptRemovePath, -1));
-    }
-    opList.add(Op.delete(appIdRemovePath, -1));
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
           + " and its attempts.");
     }
-    doDeleteMultiWithRetries(opList);
+
+    for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
+      String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
+      safeDelete(attemptRemovePath);
+    }
+    safeDelete(appIdRemovePath);
   }
 
   @Override
   protected synchronized void storeRMDelegationTokenState(
       RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
       throws Exception {
-    ArrayList<Op> opList = new ArrayList<Op>();
-    addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
-    doStoreMultiWithRetries(opList);
+    SafeTransaction trx = new SafeTransaction();
+    addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
+    trx.commit();
   }
 
   @Override
@@ -716,35 +645,29 @@ public class ZKRMStateStore extends RMStateStore {
       LOG.debug("Removing RMDelegationToken_"
           + rmDTIdentifier.getSequenceNumber());
     }
-    if (existsWithRetries(nodeRemovePath, false) != null) {
-      ArrayList<Op> opList = new ArrayList<Op>();
-      opList.add(Op.delete(nodeRemovePath, -1));
-      doDeleteMultiWithRetries(opList);
-    } else {
-      LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
-    }
+    safeDelete(nodeRemovePath);
   }
 
   @Override
   protected synchronized void updateRMDelegationTokenState(
       RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
       throws Exception {
-    ArrayList<Op> opList = new ArrayList<Op>();
+    SafeTransaction trx = new SafeTransaction();
     String nodeRemovePath =
         getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
-    if (existsWithRetries(nodeRemovePath, false) == null) {
+    if (exists(nodeRemovePath)) {
+      // in case znode exists
+      addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true);
+    } else {
       // in case znode doesn't exist
-      addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
+      addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
       LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
-    } else {
-      // in case znode exists
-      addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
     }
-    doStoreMultiWithRetries(opList);
+    trx.commit();
   }
 
-  private void addStoreOrUpdateOps(ArrayList<Op> opList,
+  private void addStoreOrUpdateOps(SafeTransaction trx,
       RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
       boolean isUpdate) throws Exception {
     // store RM delegation token
@@ -762,18 +685,18 @@ public class ZKRMStateStore extends RMStateStore {
       }
 
       if (isUpdate) {
-        opList.add(Op.setData(nodeCreatePath, identifierData.toByteArray(), -1));
+        trx.setData(nodeCreatePath, identifierData.toByteArray(), -1);
       } else {
-        opList.add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
-            CreateMode.PERSISTENT));
+        trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
+            CreateMode.PERSISTENT);
         // Update Sequence number only while storing DT
         seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
         if (LOG.isDebugEnabled()) {
           LOG.debug((isUpdate ? "Storing " : "Updating ") +
-                    dtSequenceNumberPath + ". SequenceNumber: "
-                    + rmDTIdentifier.getSequenceNumber());
+              dtSequenceNumberPath + ". SequenceNumber: "
+              + rmDTIdentifier.getSequenceNumber());
         }
-        opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
+        trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
       }
     } finally {
       seqOs.close();
@@ -793,7 +716,7 @@ public class ZKRMStateStore extends RMStateStore {
     }
     delegationKey.write(fsOut);
     try {
-      createWithRetries(nodeCreatePath, os.toByteArray(), zkAcl,
+      safeCreate(nodeCreatePath, os.toByteArray(), zkAcl,
           CreateMode.PERSISTENT);
     } finally {
       os.close();
@@ -809,243 +732,174 @@ public class ZKRMStateStore extends RMStateStore {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
     }
-    if (existsWithRetries(nodeRemovePath, false) != null) {
-      doDeleteMultiWithRetries(Op.delete(nodeRemovePath, -1));
-    } else {
-      LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
-    }
+    safeDelete(nodeRemovePath);
   }
 
   @Override
   public synchronized void deleteStore() throws Exception {
-    if (existsWithRetries(zkRootNodePath, false) != null) {
-      deleteWithRetries(zkRootNodePath, false);
-    }
+    delete(zkRootNodePath);
   }
 
   @Override
   public synchronized void removeApplication(ApplicationId removeAppId)
       throws Exception {
     String appIdRemovePath = getNodePath(rmAppRoot, removeAppId.toString());
-    if (existsWithRetries(appIdRemovePath, false) != null) {
-      deleteWithRetries(appIdRemovePath, false);
-    }
-  }
-
-  // ZK related code
-  /**
-   * Watcher implementation which forward events to the ZKRMStateStore This
-   * hides the ZK methods of the store from its public interface
-   */
-  private final class ForwardingWatcher implements Watcher {
-    private ZooKeeper watchedZkClient;
-
-    public ForwardingWatcher(ZooKeeper client) {
-      this.watchedZkClient = client;
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-      try {
-        ZKRMStateStore.this.processWatchEvent(watchedZkClient, event);
-      } catch (Throwable t) {
-        LOG.error("Failed to process watcher event " + event + ": "
-            + StringUtils.stringifyException(t));
-      }
-    }
+    delete(appIdRemovePath);
   }
 
   @VisibleForTesting
-  @Private
-  @Unstable
-  public synchronized void processWatchEvent(ZooKeeper zk,
-      WatchedEvent event) throws Exception {
-    // only process watcher event from current ZooKeeper Client session.
-    if (zk != activeZkClient) {
-      LOG.info("Ignore watcher event type: " + event.getType() +
-          " with state:" + event.getState() + " for path:" +
-          event.getPath() + " from old session");
-      return;
-    }
-
-    Event.EventType eventType = event.getType();
-    LOG.info("Watcher event type: " + eventType + " with state:"
-        + event.getState() + " for path:" + event.getPath() + " for " + this);
-
-    if (eventType == Event.EventType.None) {
-
-      // the connection state has changed
-      switch (event.getState()) {
-        case SyncConnected:
-          LOG.info("ZKRMStateStore Session connected");
-          if (zkClient == null) {
-            // the SyncConnected must be from the client that sent Disconnected
-            zkClient = activeZkClient;
-            ZKRMStateStore.this.notifyAll();
-            LOG.info("ZKRMStateStore Session restored");
-          }
-          break;
-        case Disconnected:
-          LOG.info("ZKRMStateStore Session disconnected");
-          zkClient = null;
-          break;
-        case Expired:
-          // the connection got terminated because of session timeout
-          // call listener to reconnect
-          LOG.info("ZKRMStateStore Session expired");
-          createConnection();
-          break;
-        default:
-          LOG.error("Unexpected Zookeeper" +
-              " watch event state: " + event.getState());
-          break;
-      }
-    }
-  }
-
-  @VisibleForTesting
-  @Private
-  @Unstable
   String getNodePath(String root, String nodeName) {
     return (root + "/" + nodeName);
   }
 
-  /**
-   * Helper method that creates fencing node, executes the passed operations,
-   * and deletes the fencing node.
-   */
-  private synchronized void doStoreMultiWithRetries(
-      final List<Op> opList) throws Exception {
-    final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
-    execOpList.add(createFencingNodePathOp);
-    execOpList.addAll(opList);
-    execOpList.add(deleteFencingNodePathOp);
-    new ZKAction<Void>() {
-      @Override
-      public Void run() throws KeeperException, InterruptedException {
-        zkClient.multi(execOpList);
-        return null;
-      }
-    }.runWithRetries();
+  @Override
+  public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
+      AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
+      throws Exception {
+    AMRMTokenSecretManagerState data =
+        AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
+    byte[] stateData = data.getProto().toByteArray();
+    safeSetData(amrmTokenSecretManagerRoot, stateData, -1);
   }
 
   /**
-   * Helper method that creates fencing node, executes the passed operation,
-   * and deletes the fencing node.
+   * Utility function to ensure that the configured base znode exists.
+   * This recursively creates the znode as well as all of its parents.
    */
-  private void doStoreMultiWithRetries(final Op op) throws Exception {
-    doStoreMultiWithRetries(Collections.singletonList(op));
+  private void createRootDirRecursively(String path) throws Exception {
+    String pathParts[] = path.split("/");
+    Preconditions.checkArgument(pathParts.length >= 1 && pathParts[0].isEmpty(),
+        "Invalid path: %s", path);
+    StringBuilder sb = new StringBuilder();
+    for (int i = 1; i < pathParts.length; i++) {
+      sb.append("/").append(pathParts[i]);
+      create(sb.toString());
+    }
   }
 
-  /**
-   * Helper method that creates fencing node, executes the passed
-   * delete related operations and deletes the fencing node.
+  /*
+   * ZK operations using curator
    */
-  private synchronized void doDeleteMultiWithRetries(
-      final List<Op> opList) throws Exception {
-    final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
-    execOpList.add(createFencingNodePathOp);
-    execOpList.addAll(opList);
-    execOpList.add(deleteFencingNodePathOp);
-    new ZKAction<Void>() {
-      @Override
-      public Void run() throws KeeperException, InterruptedException {
-        setHasDeleteNodeOp(true);
-        zkClient.multi(execOpList);
-        return null;
-      }
-    }.runWithRetries();
-  }
+  private void createConnection() throws Exception {
+    // Curator connection
+    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+    builder = builder.connectString(zkHostPort)
+        .connectionTimeoutMs(zkSessionTimeout)
+        .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval));
+
+    // Set up authorization based on fencing scheme
+    List<AuthInfo> authInfos = new ArrayList<>();
+    for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
+      authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
+    }
+    if (useDefaultFencingScheme) {
+      byte[] defaultFencingAuth =
+          (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(
+              Charset.forName("UTF-8"));
+      authInfos.add(new AuthInfo(zkRootNodeAuthScheme, defaultFencingAuth));
+    }
+    builder = builder.authorization(authInfos);
 
-  private void doDeleteMultiWithRetries(final Op op) throws Exception {
-    doDeleteMultiWithRetries(Collections.singletonList(op));
+    // Connect to ZK
+    curatorFramework = builder.build();
+    curatorFramework.start();
   }
 
   @VisibleForTesting
-  @Private
-  @Unstable
-  public void createWithRetries(
-      final String path, final byte[] data, final List<ACL> acl,
-      final CreateMode mode) throws Exception {
-    doStoreMultiWithRetries(Op.create(path, data, acl, mode));
+  byte[] getData(final String path) throws Exception {
+    return curatorFramework.getData().forPath(path);
   }
 
-  @VisibleForTesting
-  @Private
-  @Unstable
-  public void setDataWithRetries(final String path, final byte[] data,
-                                 final int version) throws Exception {
-    doStoreMultiWithRetries(Op.setData(path, data, version));
+  private List<ACL> getACL(final String path) throws Exception {
+    return curatorFramework.getACL().forPath(path);
+  }
+
+  private List<String> getChildren(final String path) throws Exception {
+    return curatorFramework.getChildren().forPath(path);
+  }
+
+  private boolean exists(final String path) throws Exception {
+    return curatorFramework.checkExists().forPath(path) != null;
   }
 
   @VisibleForTesting
-  @Private
-  @Unstable
-  public byte[] getDataWithRetries(final String path, final boolean watch)
-      throws Exception {
-    return new ZKAction<byte[]>() {
-      @Override
-      public byte[] run() throws KeeperException, InterruptedException {
-        return zkClient.getData(path, watch, null);
-      }
-    }.runWithRetries();
+  void create(final String path) throws Exception {
+    if (!exists(path)) {
+      curatorFramework.create()
+          .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
+          .forPath(path, null);
+    }
   }
 
-  private List<ACL> getACLWithRetries(
-      final String path, final Stat stat) throws Exception {
-    return new ZKAction<List<ACL>>() {
-      @Override
-      public List<ACL> run() throws KeeperException, InterruptedException {
-        return zkClient.getACL(path, stat);
-      }
-    }.runWithRetries();
+  @VisibleForTesting
+  void delete(final String path) throws Exception {
+    if (exists(path)) {
+      curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
+    }
   }
 
-  private List<String> getChildrenWithRetries(
-      final String path, final boolean watch) throws Exception {
-    return new ZKAction<List<String>>() {
-      @Override
-      List<String> run() throws KeeperException, InterruptedException {
-        return zkClient.getChildren(path, watch);
-      }
-    }.runWithRetries();
+  private void safeCreate(String path, byte[] data, List<ACL> acl,
+      CreateMode mode) throws Exception {
+    if (!exists(path)) {
+      SafeTransaction transaction = new SafeTransaction();
+      transaction.create(path, data, acl, mode);
+      transaction.commit();
+    }
   }
 
-  private Stat existsWithRetries(
-      final String path, final boolean watch) throws Exception {
-    return new ZKAction<Stat>() {
-      @Override
-      Stat run() throws KeeperException, InterruptedException {
-        return zkClient.exists(path, watch);
-      }
-    }.runWithRetries();
+  private void safeDelete(final String path) throws Exception {
+    if (exists(path)) {
+      SafeTransaction transaction = new SafeTransaction();
+      transaction.delete(path);
+      transaction.commit();
+    }
   }
 
-  private void deleteWithRetries(
-      final String path, final boolean watch) throws Exception {
-    new ZKAction<Void>() {
-      @Override
-      Void run() throws KeeperException, InterruptedException {
-        recursiveDeleteWithRetriesHelper(path, watch);
-        return null;
-      }
-    }.runWithRetries();
+  private void safeSetData(String path, byte[] data, int version)
+      throws Exception {
+    SafeTransaction transaction = new SafeTransaction();
+    transaction.setData(path, data, version);
+    transaction.commit();
   }
 
   /**
-   * Helper method that deletes znodes recursively
+   * Use curator transactions to ensure zk-operations are performed in an all
+   * or nothing fashion. This is equivalent to using ZooKeeper#multi.
+   *
+   * TODO (YARN-3774): Curator 3.0 introduces CuratorOp similar to Op. We ll
+   * have to rewrite this inner class when we adopt that.
    */
-  private void recursiveDeleteWithRetriesHelper(String path, boolean watch)
-          throws KeeperException, InterruptedException {
-    List<String> children = zkClient.getChildren(path, watch);
-    for (String child : children) {
-      recursiveDeleteWithRetriesHelper(path + "/" + child, false);
+  private class SafeTransaction {
+    private CuratorTransactionFinal transactionFinal;
+
+    SafeTransaction() throws Exception {
+      CuratorTransaction transaction = curatorFramework.inTransaction();
+      transactionFinal =
+          transaction.create()
+              .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
+              .forPath(fencingNodePath, new byte[0]).and();
     }
 
-    try {
-      zkClient.delete(path, -1);
-    } catch (KeeperException.NoNodeException nne) {
-      LOG.info("Node " + path + " doesn't exist to delete");
+    public void commit() throws Exception {
+      transactionFinal = transactionFinal.delete()
+          .forPath(fencingNodePath).and();
+      transactionFinal.commit();
+    }
+
+    public void create(String path, byte[] data, List<ACL> acl, CreateMode mode)
+        throws Exception {
+      transactionFinal = transactionFinal.create()
+          .withMode(mode).withACL(acl).forPath(path, data).and();
+    }
+
+    public void delete(String path) throws Exception {
+      transactionFinal = transactionFinal.delete().forPath(path).and();
+    }
+
+    public void setData(String path, byte[] data, int version)
+        throws Exception {
+      transactionFinal = transactionFinal.setData()
+          .withVersion(version).forPath(path, data).and();
     }
   }
 
@@ -1054,8 +908,6 @@ public class ZKRMStateStore extends RMStateStore {
    * this RM continues to be the Active.
    */
   private class VerifyActiveStatusThread extends Thread {
-    private List<Op> emptyOpList = new ArrayList<Op>();
-
     VerifyActiveStatusThread() {
       super(VerifyActiveStatusThread.class.getName());
     }
@@ -1063,10 +915,11 @@ public class ZKRMStateStore extends RMStateStore {
     public void run() {
       try {
         while (true) {
-          if(isFencedState()) { 
+          if(isFencedState()) {
             break;
           }
-          doStoreMultiWithRetries(emptyOpList);
+          // Create and delete fencing node
+          new SafeTransaction().commit();
           Thread.sleep(zkSessionTimeout);
         }
       } catch (InterruptedException ie) {
@@ -1077,143 +930,4 @@ public class ZKRMStateStore extends RMStateStore {
       }
     }
   }
-
-  private abstract class ZKAction<T> {
-    private boolean hasDeleteNodeOp = false;
-    void setHasDeleteNodeOp(boolean hasDeleteOp) {
-      this.hasDeleteNodeOp = hasDeleteOp;
-    }
-    // run() expects synchronization on ZKRMStateStore.this
-    abstract T run() throws KeeperException, InterruptedException;
-
-    T runWithCheck() throws Exception {
-      long startTime = System.currentTimeMillis();
-      synchronized (ZKRMStateStore.this) {
-        while (zkClient == null) {
-          ZKRMStateStore.this.wait(zkSessionTimeout);
-          if (zkClient != null) {
-            break;
-          }
-          if (System.currentTimeMillis() - startTime > zkSessionTimeout) {
-            throw new IOException("Wait for ZKClient creation timed out");
-          }
-        }
-        return run();
-      }
-    }
-
-    private boolean shouldRetry(Code code) {
-      switch (code) {
-        case CONNECTIONLOSS:
-        case OPERATIONTIMEOUT:
-        case SESSIONEXPIRED:
-        case SESSIONMOVED:
-          return true;
-        default:
-          break;
-      }
-      return false;
-    }
-
-    T runWithRetries() throws Exception {
-      int retry = 0;
-      while (true) {
-        try {
-          return runWithCheck();
-        } catch (KeeperException.NoAuthException nae) {
-          if (HAUtil.isHAEnabled(getConfig())) {
-            // NoAuthException possibly means that this store is fenced due to
-            // another RM becoming active. Even if not,
-            // it is safer to assume we have been fenced
-            throw new StoreFencedException();
-          }
-        } catch (KeeperException ke) {
-          if (ke.code() == Code.NODEEXISTS) {
-            LOG.info("znode already exists!");
-            return null;
-          }
-          if (hasDeleteNodeOp && ke.code() == Code.NONODE) {
-            LOG.info("znode has already been deleted!");
-            return null;
-          }
-
-          LOG.info("Exception while executing a ZK operation.", ke);
-          if (shouldRetry(ke.code()) && ++retry < numRetries) {
-            LOG.info("Retrying operation on ZK. Retry no. " + retry);
-            Thread.sleep(zkRetryInterval);
-            createConnection();
-            continue;
-          }
-          LOG.info("Maxed out ZK retries. Giving up!");
-          throw ke;
-        }
-      }
-    }
-  }
-
-  private synchronized void createConnection()
-      throws IOException, InterruptedException {
-    closeZkClients();
-    for (int retries = 0; retries < numRetries && zkClient == null;
-        retries++) {
-      try {
-        activeZkClient = getNewZooKeeper();
-        zkClient = activeZkClient;
-        for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
-          zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
-        }
-        if (useDefaultFencingScheme) {
-          zkClient.addAuthInfo(zkRootNodeAuthScheme,
-              (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(Charset.forName("UTF-8")));
-        }
-      } catch (IOException ioe) {
-        // Retry in case of network failures
-        LOG.info("Failed to connect to the ZooKeeper on attempt - " +
-            (retries + 1));
-        ioe.printStackTrace();
-      }
-    }
-    if (zkClient == null) {
-      LOG.error("Unable to connect to Zookeeper");
-      throw new YarnRuntimeException("Unable to connect to Zookeeper");
-    }
-    ZKRMStateStore.this.notifyAll();
-    LOG.info("Created new ZK connection");
-  }
-
-  // protected to mock for testing
-  @VisibleForTesting
-  @Private
-  @Unstable
-  protected synchronized ZooKeeper getNewZooKeeper()
-      throws IOException, InterruptedException {
-    ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
-    zk.register(new ForwardingWatcher(zk));
-    return zk;
-  }
-
-  @Override
-  public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
-      AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
-      throws Exception {
-    AMRMTokenSecretManagerState data =
-        AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
-    byte[] stateData = data.getProto().toByteArray();
-    setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1);
-  }
-
-  /**
-   * Utility function to ensure that the configured base znode exists.
-   * This recursively creates the znode as well as all of its parents.
-   */
-  private void createRootDirRecursively(String path) throws Exception {
-    String pathParts[] = path.split("/");
-    Preconditions.checkArgument(pathParts.length >= 1 && pathParts[0].isEmpty(),
-        "Invalid path: %s", path);
-    StringBuilder sb = new StringBuilder();
-    for (int i = 1; i < pathParts.length; i++) {
-      sb.append("/").append(pathParts[i]);
-      createRootDir(sb.toString());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 4d0e560..9e0d22b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -40,7 +40,6 @@ import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.ClientBaseWithFixes;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -76,7 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
-public class RMStateStoreTestBase extends ClientBaseWithFixes{
+public class RMStateStoreTestBase {
 
   public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 333455c..34a4492 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -18,18 +18,10 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.crypto.SecretKey;
-
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -50,6 +42,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -61,22 +54,49 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import javax.crypto.SecretKey;
+
 public class TestZKRMStateStore extends RMStateStoreTestBase {
 
   public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
   private static final int ZK_TIMEOUT_MS = 1000;
+  private TestingServer curatorTestingServer;
+  private CuratorFramework curatorFramework;
+
+  @Before
+  public void setupCuratorServer() throws Exception {
+    curatorTestingServer = new TestingServer();
+    curatorTestingServer.start();
+    curatorFramework = CuratorFrameworkFactory.builder()
+        .connectString(curatorTestingServer.getConnectString())
+        .retryPolicy(new RetryNTimes(100, 100))
+        .build();
+    curatorFramework.start();
+  }
+
+  @After
+  public void cleanupCuratorServer() throws IOException {
+    curatorFramework.close();
+    curatorTestingServer.stop();
+  }
 
   class TestZKRMStateStoreTester implements RMStateStoreHelper {
 
-    ZooKeeper client;
     TestZKRMStateStoreInternal store;
     String workingZnode;
 
+
     class TestZKRMStateStoreInternal extends ZKRMStateStore {
 
       public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
@@ -86,11 +106,6 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
         assertTrue(znodeWorkingPath.equals(workingZnode));
       }
 
-      @Override
-      public ZooKeeper getNewZooKeeper() throws IOException {
-        return client;
-      }
-
       public String getVersionNode() {
         return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
       }
@@ -109,7 +124,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
        * @throws Exception
        */
       public void testRetryingCreateRootDir() throws Exception {
-        createRootDir(znodeWorkingPath);
+        create(znodeWorkingPath);
       }
 
     }
@@ -117,23 +132,24 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
     public RMStateStore getRMStateStore() throws Exception {
       YarnConfiguration conf = new YarnConfiguration();
       workingZnode = "/jira/issue/3077/rmstore";
-      conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+      conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+          curatorTestingServer.getConnectString());
       conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
-      this.client = createClient();
       this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
       return this.store;
     }
 
     @Override
     public boolean isFinalStateValid() throws Exception {
-      List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
-      return nodes.size() == 1;
+      return 1 ==
+          curatorFramework.getChildren().forPath(store.znodeWorkingPath).size();
     }
 
     @Override
     public void writeVersion(Version version) throws Exception {
-      client.setData(store.getVersionNode(), ((VersionPBImpl) version)
-        .getProto().toByteArray(), -1);
+      curatorFramework.setData().withVersion(-1)
+          .forPath(store.getVersionNode(),
+              ((VersionPBImpl) version).getProto().toByteArray());
     }
 
     @Override
@@ -142,10 +158,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
     }
 
     public boolean appExists(RMApp app) throws Exception {
-      Stat node =
-          client.exists(store.getAppNode(app.getApplicationId().toString()),
-            false);
-      return node !=null;
+      return null != curatorFramework.checkExists()
+          .forPath(store.getAppNode(app.getApplicationId().toString()));
     }
   }
 
@@ -178,9 +192,9 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
       public RMStateStore getRMStateStore() throws Exception {
         YarnConfiguration conf = new YarnConfiguration();
         workingZnode = "/jira/issue/3077/rmstore";
-        conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+        conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+            curatorTestingServer.getConnectString());
         conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
-        this.client = createClient();
         this.store = new TestZKRMStateStoreInternal(conf, workingZnode) {
           Version storedVersion = null;
 
@@ -217,7 +231,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
     conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
     conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
     conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
-    conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+    conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+        curatorTestingServer.getConnectString());
     conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
     conf.set(YarnConfiguration.RM_HA_ID, rmId);
     conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
index 654b357..e270404 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
@@ -25,6 +25,8 @@ import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
 import javax.crypto.SecretKey;
+
+import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
@@ -73,10 +75,11 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
   private ZKRMStateStore store;
   private AMRMTokenSecretManager appTokenMgr;
   private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr;
+  private TestingServer curatorTestingServer;
 
   @Before
   public void setUpZKServer() throws Exception {
-    super.setUp();
+    curatorTestingServer = new TestingServer();
   }
 
   @After
@@ -87,7 +90,7 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
     if (appTokenMgr != null) {
       appTokenMgr.stop();
     }
-    super.tearDown();
+    curatorTestingServer.stop();
   }
 
   private void initStore(String hostPort) {
@@ -95,7 +98,8 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
     RMContext rmContext = mock(RMContext.class);
 
     conf = new YarnConfiguration();
-    conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort));
+    conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+        optHostPort.or(curatorTestingServer.getConnectString()));
     conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
 
     store = new ZKRMStateStore();
@@ -140,7 +144,7 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
 
     if (launchLocalZK) {
       try {
-        setUp();
+        setUpZKServer();
       } catch (Exception e) {
         System.err.println("failed to setup. : " + e.getMessage());
         return -1;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
index 62dc5ef..d188450 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
@@ -20,39 +20,32 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.ClientBaseWithFixes;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
 import org.apache.hadoop.util.ZKUtil;
 
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.security.NoSuchAlgorithmException;
-import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class TestZKRMStateStoreZKClientConnections extends
-    ClientBaseWithFixes {
-
-  private static final int ZK_OP_WAIT_TIME = 3000;
-  private static final int ZK_TIMEOUT_MS = 1000;
+public class TestZKRMStateStoreZKClientConnections {
   private Log LOG =
       LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
 
+  private static final int ZK_TIMEOUT_MS = 1000;
   private static final String DIGEST_USER_PASS="test-user:test-password";
   private static final String TEST_AUTH_GOOD = "digest:" + DIGEST_USER_PASS;
   private static final String DIGEST_USER_HASH;
@@ -66,14 +59,22 @@ public class TestZKRMStateStoreZKClientConnections extends
   }
   private static final String TEST_ACL = "digest:" + DIGEST_USER_HASH + ":rwcda";
 
+  private TestingServer testingServer;
+
+  @Before
+  public void setupZKServer() throws Exception {
+    testingServer = new TestingServer();
+    testingServer.start();
+  }
+
+  @After
+  public void cleanupZKServer() throws Exception {
+    testingServer.stop();
+  }
 
   class TestZKClient {
 
     ZKRMStateStore store;
-    boolean forExpire = false;
-    TestForwardingWatcher oldWatcher;
-    TestForwardingWatcher watcher;
-    CyclicBarrier syncBarrier = new CyclicBarrier(2);
 
     protected class TestZKRMStateStore extends ZKRMStateStore {
 
@@ -83,51 +84,12 @@ public class TestZKRMStateStoreZKClientConnections extends
         start();
         assertTrue(znodeWorkingPath.equals(workingZnode));
       }
-
-      @Override
-      public ZooKeeper getNewZooKeeper()
-          throws IOException, InterruptedException {
-        oldWatcher = watcher;
-        watcher = new TestForwardingWatcher();
-        return createClient(watcher, hostPort, ZK_TIMEOUT_MS);
-      }
-
-      @Override
-      public synchronized void processWatchEvent(ZooKeeper zk,
-          WatchedEvent event) throws Exception {
-
-        if (forExpire) {
-          // a hack... couldn't find a way to trigger expired event.
-          WatchedEvent expriredEvent = new WatchedEvent(
-              Watcher.Event.EventType.None,
-              Watcher.Event.KeeperState.Expired, null);
-          super.processWatchEvent(zk, expriredEvent);
-          forExpire = false;
-          syncBarrier.await();
-        } else {
-          super.processWatchEvent(zk, event);
-        }
-      }
-    }
-
-    private class TestForwardingWatcher extends
-        ClientBaseWithFixes.CountdownWatcher {
-      public void process(WatchedEvent event) {
-        super.process(event);
-        try {
-          if (store != null) {
-            store.processWatchEvent(client, event);
-          }
-        } catch (Throwable t) {
-          LOG.error("Failed to process watcher event " + event + ": "
-              + StringUtils.stringifyException(t));
-        }
-      }
     }
 
     public RMStateStore getRMStateStore(Configuration conf) throws Exception {
       String workingZnode = "/Test";
-      conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+      conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+          testingServer.getConnectString());
       conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
       this.store = new TestZKRMStateStore(conf, workingZnode);
       return this.store;
@@ -147,12 +109,12 @@ public class TestZKRMStateStoreZKClientConnections extends
     store.setRMDispatcher(dispatcher);
     final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
 
-    stopServer();
+    testingServer.stop();
     Thread clientThread = new Thread() {
       @Override
       public void run() {
         try {
-          store.getDataWithRetries(path, true);
+          store.getData(path);
         } catch (Exception e) {
           e.printStackTrace();
           assertionFailedInThread.set(true);
@@ -160,114 +122,19 @@ public class TestZKRMStateStoreZKClientConnections extends
       }
     };
     Thread.sleep(2000);
-    startServer();
+    testingServer.start();
     clientThread.join();
     Assert.assertFalse(assertionFailedInThread.get());
   }
 
   @Test(timeout = 20000)
-  public void testZKClientDisconnectAndReconnect()
-      throws Exception {
-
-    TestZKClient zkClientTester = new TestZKClient();
-    String path = "/test";
-    YarnConfiguration conf = new YarnConfiguration();
-    conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
-    ZKRMStateStore store =
-        (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
-    TestDispatcher dispatcher = new TestDispatcher();
-    store.setRMDispatcher(dispatcher);
-
-    // trigger watch
-    store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT);
-    store.getDataWithRetries(path, true);
-    store.setDataWithRetries(path, "newBytes".getBytes(), 0);
-
-    stopServer();
-    zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME);
-    try {
-      store.getDataWithRetries(path, true);
-      fail("Expected ZKClient time out exception");
-    } catch (Exception e) {
-      assertTrue(e.getMessage().contains(
-          "Wait for ZKClient creation timed out"));
-    }
-
-    // ZKRMStateStore Session restored
-    startServer();
-    zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME);
-    byte[] ret = null;
-    try {
-      ret = store.getDataWithRetries(path, true);
-    } catch (Exception e) {
-      String error = "ZKRMStateStore Session restore failed";
-      LOG.error(error, e);
-      fail(error);
-    }
-    assertEquals("newBytes", new String(ret));
-  }
-
-  @Test(timeout = 20000)
-  public void testZKSessionTimeout() throws Exception {
-
-    TestZKClient zkClientTester = new TestZKClient();
-    String path = "/test";
-    YarnConfiguration conf = new YarnConfiguration();
-    conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
-    ZKRMStateStore store =
-        (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
-    TestDispatcher dispatcher = new TestDispatcher();
-    store.setRMDispatcher(dispatcher);
-
-    // a hack to trigger expired event
-    zkClientTester.forExpire = true;
-
-    // trigger watch
-    store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT);
-    store.getDataWithRetries(path, true);
-    store.setDataWithRetries(path, "bytes".getBytes(), 0);
-
-    zkClientTester.syncBarrier.await();
-    // after this point, expired event has already been processed.
-
-    try {
-      byte[] ret = store.getDataWithRetries(path, false);
-      assertEquals("bytes", new String(ret));
-    } catch (Exception e) {
-      String error = "New session creation failed";
-      LOG.error(error, e);
-      fail(error);
-    }
-
-    // send Disconnected event from old client session to ZKRMStateStore
-    // check the current client session is not affected.
-    Assert.assertTrue(zkClientTester.oldWatcher != null);
-    WatchedEvent disconnectedEvent = new WatchedEvent(
-        Watcher.Event.EventType.None,
-        Watcher.Event.KeeperState.Disconnected, null);
-    zkClientTester.oldWatcher.process(disconnectedEvent);
-    Assert.assertTrue(store.zkClient != null);
-
-    zkClientTester.watcher.process(disconnectedEvent);
-    Assert.assertTrue(store.zkClient == null);
-    WatchedEvent connectedEvent = new WatchedEvent(
-        Watcher.Event.EventType.None,
-        Watcher.Event.KeeperState.SyncConnected, null);
-    zkClientTester.watcher.process(connectedEvent);
-    Assert.assertTrue(store.zkClient != null);
-    Assert.assertTrue(store.zkClient == store.activeZkClient);
-  }
-
-  @Test(timeout = 20000)
   public void testSetZKAcl() {
     TestZKClient zkClientTester = new TestZKClient();
     YarnConfiguration conf = new YarnConfiguration();
     conf.set(YarnConfiguration.RM_ZK_ACL, "world:anyone:rwca");
     try {
-      zkClientTester.store.zkClient.delete(zkClientTester.store
-          .znodeWorkingPath, -1);
+      zkClientTester.store.delete(zkClientTester.store
+          .znodeWorkingPath);
       fail("Shouldn't be able to delete path");
     } catch (Exception e) {/* expected behavior */
     }