You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2016/01/20 23:48:42 UTC

hadoop git commit: YARN-4559. Make leader elector and zk store share the same curator client. Contributed by Jian He

Repository: hadoop
Updated Branches:
  refs/heads/trunk 2ec438e8f -> 890a2ebd1


YARN-4559. Make leader elector and zk store share the same curator
client. Contributed by Jian He


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/890a2ebd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/890a2ebd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/890a2ebd

Branch: refs/heads/trunk
Commit: 890a2ebd1af51d24ccbbc5d1d65d17b24ad8ab9b
Parents: 2ec438e
Author: Xuan <xg...@apache.org>
Authored: Wed Jan 20 14:48:10 2016 -0800
Committer: Xuan <xg...@apache.org>
Committed: Wed Jan 20 14:48:10 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../dev-support/findbugs-exclude.xml            |   5 +
 .../resourcemanager/LeaderElectorService.java   |  23 +---
 .../server/resourcemanager/ResourceManager.java |  98 +++++++++++++---
 .../resourcemanager/recovery/RMStateStore.java  |   2 +-
 .../recovery/ZKRMStateStore.java                | 111 +++++--------------
 .../recovery/TestZKRMStateStore.java            |   1 +
 .../recovery/TestZKRMStateStorePerf.java        |   2 +
 .../TestZKRMStateStoreZKClientConnections.java  |  22 +---
 9 files changed, 128 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/890a2ebd/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 29ee57b..ee78d22 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -98,6 +98,9 @@ Release 2.9.0 - UNRELEASED
     YARN-4526. Make SystemClock singleton so AppSchedulingInfo could use it. 
     (kasha)
 
+    YARN-4559. Make leader elector and zk store share the same curator client.
+    (Jian He via xgong)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/890a2ebd/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 6fb3945..c12377b 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -272,6 +272,11 @@
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
   <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore" />
+    <Field name="resourceManager"/>
+    <Bug pattern="IS2_INCONSISTENT_SYNC" />
+  </Match>
+  <Match>
     <Class name="org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer"/>
     <Field name="renewalTimer" />
     <Bug code="IS"/>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/890a2ebd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
index 3766676..8c1a6eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
@@ -19,14 +19,11 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
-import org.apache.curator.retry.RetryNTimes;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.service.AbstractService;
@@ -44,35 +41,23 @@ public class LeaderElectorService extends AbstractService implements
   private RMContext rmContext;
   private String latchPath;
   private String rmId;
+  private ResourceManager rm;
 
-  public LeaderElectorService(RMContext rmContext) {
+  public LeaderElectorService(RMContext rmContext, ResourceManager rm) {
     super(LeaderElectorService.class.getName());
     this.rmContext = rmContext;
-
+    this.rm = rm;
   }
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
-    Preconditions.checkNotNull(zkHostPort,
-        YarnConfiguration.RM_ZK_ADDRESS + " is not set");
-
     rmId = HAUtil.getRMHAId(conf);
     String clusterId = YarnConfiguration.getClusterId(conf);
-
-    int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
-        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
-    int maxRetryNum = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
-        YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
-
     String zkBasePath = conf.get(
         YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
         YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
     latchPath = zkBasePath + "/" + clusterId;
-
-    curator = CuratorFrameworkFactory.builder().connectString(zkHostPort)
-        .retryPolicy(new RetryNTimes(maxRetryNum, zkSessionTimeout)).build();
-    curator.start();
+    curator = rm.getCurator();
     initAndStartLeaderLatch();
     super.serviceInit(conf);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/890a2ebd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 33431f8..40d627e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
@@ -28,7 +32,11 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.http.lib.StaticUserWebFilter;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
-import org.apache.hadoop.security.*;
+import org.apache.hadoop.security.AuthenticationFilterInitializer;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.service.AbstractService;
@@ -40,6 +48,7 @@ import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ZKUtil;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -58,8 +67,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMaste
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -78,7 +87,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -96,12 +107,15 @@ import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
 import java.security.PrivilegedExceptionAction;
+import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -158,6 +172,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
   protected ResourceTrackerService resourceTracker;
   private JvmPauseMonitor pauseMonitor;
   private boolean curatorEnabled = false;
+  private CuratorFramework curator;
+  private final String zkRootNodePassword =
+      Long.toString(new SecureRandom().nextLong());
+  private boolean recoveryEnabled;
 
   @VisibleForTesting
   protected String webAppAddress;
@@ -232,7 +250,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
       curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
           YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
       if (curatorEnabled) {
-        LeaderElectorService elector = new LeaderElectorService(rmContext);
+        this.curator = createAndStartCurator(conf);
+        LeaderElectorService elector = new LeaderElectorService(rmContext, this);
         addService(elector);
         rmContext.setLeaderElectorService(elector);
       }
@@ -276,7 +295,58 @@ public class ResourceManager extends CompositeService implements Recoverable {
 
     super.serviceInit(this.conf);
   }
-  
+
+  public CuratorFramework createAndStartCurator(Configuration conf)
+      throws Exception {
+    String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
+    if (zkHostPort == null) {
+      throw new YarnRuntimeException(
+          YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
+    }
+    int numRetries = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
+        YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
+    int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
+    int zkRetryInterval = conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
+        YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
+
+    // set up zk auths
+    List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
+    List<AuthInfo> authInfos = new ArrayList<>();
+    for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
+      authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
+    }
+
+    if (HAUtil.isHAEnabled(conf) && HAUtil.getConfValueForRMInstance(
+        YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf) == null) {
+      String zkRootNodeUsername = HAUtil
+          .getConfValueForRMInstance(YarnConfiguration.RM_ADDRESS,
+              YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
+      byte[] defaultFencingAuth =
+          (zkRootNodeUsername + ":" + zkRootNodePassword)
+              .getBytes(Charset.forName("UTF-8"));
+      authInfos.add(new AuthInfo(new DigestAuthenticationProvider().getScheme(),
+          defaultFencingAuth));
+    }
+
+    CuratorFramework client =  CuratorFrameworkFactory.builder()
+        .connectString(zkHostPort)
+        .sessionTimeoutMs(zkSessionTimeout)
+        .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval))
+        .authorization(authInfos).build();
+    client.start();
+    return client;
+  }
+
+  public CuratorFramework getCurator() {
+    return this.curator;
+  }
+
+  public String getZkRootNodePassword() {
+    return this.zkRootNodePassword;
+  }
+
+
   protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler,
       Configuration conf) {
     return new QueueACLsManager(scheduler, conf);
@@ -412,7 +482,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
     private ApplicationMasterLauncher applicationMasterLauncher;
     private ContainerAllocationExpirer containerAllocationExpirer;
     private ResourceManager rm;
-    private boolean recoveryEnabled;
     private RMActiveServiceContext activeServiceContext;
 
     RMActiveServices(ResourceManager rm) {
@@ -453,29 +522,26 @@ public class ResourceManager extends CompositeService implements Recoverable {
         rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
       }
 
-      boolean isRecoveryEnabled = conf.getBoolean(
-          YarnConfiguration.RECOVERY_ENABLED,
+      recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
           YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
 
       RMStateStore rmStore = null;
-      if (isRecoveryEnabled) {
-        recoveryEnabled = true;
+      if (recoveryEnabled) {
         rmStore = RMStateStoreFactory.getStore(conf);
         boolean isWorkPreservingRecoveryEnabled =
             conf.getBoolean(
               YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
               YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
         rmContext
-          .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
+            .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
       } else {
-        recoveryEnabled = false;
         rmStore = new NullRMStateStore();
       }
 
       try {
+        rmStore.setResourceManager(rm);
         rmStore.init(conf);
         rmStore.setRMDispatcher(rmDispatcher);
-        rmStore.setResourceManager(rm);
       } catch (Exception e) {
         // the Exception from stateStore.init() needs to be handled for
         // HA and we need to give up master status if we got fenced
@@ -1130,6 +1196,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
       configurationProvider.close();
     }
     super.serviceStop();
+    if (curator != null) {
+      curator.close();
+    }
     transitionToStandby(false);
     rmContext.setHAServiceState(HAServiceState.STOPPING);
   }
@@ -1177,7 +1246,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   public ClientRMService getClientRMService() {
     return this.clientRM;
   }
-  
+
   /**
    * return the scheduler.
    * @return the scheduler for the Resource Manager.
@@ -1348,5 +1417,4 @@ public class ResourceManager extends CompositeService implements Recoverable {
     out.println("                            "
         + "[-remove-application-from-state-store <appId>]" + "\n");
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/890a2ebd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index ae17aaa..159c11c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -95,7 +95,7 @@ public abstract class RMStateStore extends AbstractService {
       "ReservationSystemRoot";
   protected static final String VERSION_NODE = "RMVersionNode";
   protected static final String EPOCH_NODE = "EpochNode";
-  private ResourceManager resourceManager;
+  protected ResourceManager resourceManager;
   private final ReadLock readLock;
   private final WriteLock writeLock;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/890a2ebd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index ddb8a0b..51e5829 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -18,26 +18,13 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.nio.charset.Charset;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.curator.framework.AuthInfo;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.transaction.CuratorTransaction;
 import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
-import org.apache.curator.retry.RetryNTimes;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -77,7 +64,15 @@ import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 
 /**
  * {@link RMStateStore} implementation backed by ZooKeeper.
@@ -140,12 +135,6 @@ public class ZKRMStateStore extends RMStateStore {
   private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
       "RMDTMasterKeysRoot";
 
-  private String zkHostPort = null;
-  private int numRetries;
-  private int zkSessionTimeout;
-  @VisibleForTesting
-  int zkRetryInterval;
-
   /** Znode paths */
   private String zkRootNodePath;
   private String rmAppRoot;
@@ -160,17 +149,15 @@ public class ZKRMStateStore extends RMStateStore {
 
   /** Fencing related variables */
   private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
-  private boolean useDefaultFencingScheme = false;
   private String fencingNodePath;
   private Thread verifyActiveStatusThread;
+  private int zkSessionTimeout;
 
   /** ACL and auth info */
   private List<ACL> zkAcl;
-  private List<ZKUtil.ZKAuthInfo> zkAuths;
   @VisibleForTesting
   List<ACL> zkRootNodeAcl;
   private String zkRootNodeUsername;
-  private final String zkRootNodePassword = Long.toString(random.nextLong());
   public static final int CREATE_DELETE_PERMS =
       ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
   private final String zkRootNodeAuthScheme =
@@ -204,45 +191,25 @@ public class ZKRMStateStore extends RMStateStore {
         YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
     Id rmId = new Id(zkRootNodeAuthScheme,
         DigestAuthenticationProvider.generateDigest(
-            zkRootNodeUsername + ":" + zkRootNodePassword));
+            zkRootNodeUsername + ":" + resourceManager.getZkRootNodePassword()));
     zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
     return zkRootNodeAcl;
   }
 
   @Override
   public synchronized void initInternal(Configuration conf) throws Exception {
-    zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
-    if (zkHostPort == null) {
-      throw new YarnRuntimeException("No server address specified for " +
-          "zookeeper state store for Resource Manager recovery. " +
-          YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
-    }
-    numRetries =
-        conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
-            YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
+
+    /* Initialize fencing related paths, acls, and ops */
     znodeWorkingPath =
         conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
             YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
-    zkSessionTimeout =
-        conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
-            YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
-
-    if (HAUtil.isHAEnabled(conf)) {
-      zkRetryInterval = zkSessionTimeout / numRetries;
-    } else {
-      zkRetryInterval =
-          conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
-              YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
-    }
-
-    zkAcl = RMZKUtils.getZKAcls(conf);
-    zkAuths = RMZKUtils.getZKAuths(conf);
-
     zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
+    fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
     rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
+    zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
 
-    /* Initialize fencing related paths, acls, and ops */
-    fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
+    zkAcl = RMZKUtils.getZKAcls(conf);
     if (HAUtil.isHAEnabled(conf)) {
       String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
           (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
@@ -256,7 +223,6 @@ public class ZKRMStateStore extends RMStateStore {
           throw bafe;
         }
       } else {
-        useDefaultFencingScheme = true;
         zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
       }
     }
@@ -272,19 +238,22 @@ public class ZKRMStateStore extends RMStateStore {
     amrmTokenSecretManagerRoot =
         getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
     reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
+    curatorFramework = resourceManager.getCurator();
+    if (curatorFramework == null) {
+      curatorFramework = resourceManager.createAndStartCurator(conf);
+    }
   }
 
   @Override
   public synchronized void startInternal() throws Exception {
-    // createConnection for future API calls
-    createConnection();
 
     // ensure root dirs exist
     createRootDirRecursively(znodeWorkingPath);
     create(zkRootNodePath);
     setRootNodeAcls();
     delete(fencingNodePath);
-    if (HAUtil.isHAEnabled(getConfig())) {
+    if (HAUtil.isHAEnabled(getConfig()) && !HAUtil
+        .isAutomaticFailoverEnabled(getConfig())) {
       verifyActiveStatusThread = new VerifyActiveStatusThread();
       verifyActiveStatusThread.start();
     }
@@ -332,7 +301,9 @@ public class ZKRMStateStore extends RMStateStore {
       verifyActiveStatusThread.interrupt();
       verifyActiveStatusThread.join(1000);
     }
-    IOUtils.closeStream(curatorFramework);
+    if (!HAUtil.isHAEnabled(getConfig())) {
+      IOUtils.closeStream(curatorFramework);
+    }
   }
 
   @Override
@@ -909,34 +880,6 @@ public class ZKRMStateStore extends RMStateStore {
     }
   }
 
-  /*
-   * ZK operations using curator
-   */
-  private void createConnection() throws Exception {
-    // Curator connection
-    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
-    builder = builder.connectString(zkHostPort)
-        .connectionTimeoutMs(zkSessionTimeout)
-        .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval));
-
-    // Set up authorization based on fencing scheme
-    List<AuthInfo> authInfos = new ArrayList<>();
-    for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
-      authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
-    }
-    if (useDefaultFencingScheme) {
-      byte[] defaultFencingAuth =
-          (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(
-              Charset.forName("UTF-8"));
-      authInfos.add(new AuthInfo(zkRootNodeAuthScheme, defaultFencingAuth));
-    }
-    builder = builder.authorization(authInfos);
-
-    // Connect to ZK
-    curatorFramework = builder.build();
-    curatorFramework.start();
-  }
-
   @VisibleForTesting
   byte[] getData(final String path) throws Exception {
     return curatorFramework.getData().forPath(path);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/890a2ebd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 406fcf6..7df31cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -105,6 +105,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
 
       public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
           throws Exception {
+        setResourceManager(new ResourceManager());
         init(conf);
         start();
         assertTrue(znodeWorkingPath.equals(workingZnode));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/890a2ebd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
index e270404..4b0b06a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
@@ -103,6 +104,7 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
     conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
 
     store = new ZKRMStateStore();
+    store.setResourceManager(new ResourceManager());
     store.init(conf);
     store.start();
     when(rmContext.getStateStore()).thenReturn(store);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/890a2ebd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
index d188450..6b19be3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
 import org.apache.hadoop.util.ZKUtil;
 
@@ -80,6 +81,7 @@ public class TestZKRMStateStoreZKClientConnections {
 
       public TestZKRMStateStore(Configuration conf, String workingZnode)
           throws Exception {
+        setResourceManager(new ResourceManager());
         init(conf);
         start();
         assertTrue(znodeWorkingPath.equals(workingZnode));
@@ -168,24 +170,4 @@ public class TestZKRMStateStoreZKClientConnections {
 
     zkClientTester.getRMStateStore(conf);
   }
-
-  @Test
-  public void testZKRetryInterval() throws Exception {
-    TestZKClient zkClientTester = new TestZKClient();
-    YarnConfiguration conf = new YarnConfiguration();
-
-    ZKRMStateStore store =
-        (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
-    assertEquals(YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS,
-        store.zkRetryInterval);
-    store.stop();
-
-    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
-    store =
-        (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
-    assertEquals(YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS /
-            YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES,
-        store.zkRetryInterval);
-    store.stop();
-  }
 }