You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/01/29 00:53:32 UTC

[hbase] 08/09: HBASE-23647: Make MasterRegistry the default impl. (#1039)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch HBASE-18095/client-locate-meta-no-zookeeper
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit a9add3a1d51b3e34b7e464cce0a2f475fcf711c3
Author: Bharath Vissapragada <bh...@apache.org>
AuthorDate: Mon Jan 27 12:36:09 2020 -0800

    HBASE-23647: Make MasterRegistry the default impl. (#1039)
    
    Signed-off-by: Stack <st...@apache.org>
    Signed-off-by: Nick Dimiduk <nd...@apache.org>
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../hbase/client/ConnectionRegistryFactory.java    |   2 +-
 .../apache/hadoop/hbase/client/MasterRegistry.java |   4 +-
 .../replication/ReplicationPeerConfigUtil.java     |   1 -
 .../apache/hadoop/hbase/security/SecurityInfo.java |   2 +
 .../apache/hadoop/hbase/HBaseConfiguration.java    |  13 ++-
 .../java/org/apache/hadoop/hbase/HConstants.java   |   4 +
 .../org/apache/hadoop/hbase/LocalHBaseCluster.java |  43 +++++----
 .../hadoop/hbase/master/ActiveMasterManager.java   |  10 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    |  11 ++-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  13 ++-
 .../hadoop/hbase/security/HBasePolicyProvider.java |   2 +
 .../apache/hadoop/hbase/HBaseTestingUtility.java   |  31 ++++++-
 .../org/apache/hadoop/hbase/MiniClusterRule.java   |   7 ++
 .../org/apache/hadoop/hbase/MiniHBaseCluster.java  |  20 ++--
 .../hadoop/hbase/StartMiniClusterOption.java       |  28 +++++-
 .../hbase/client/AbstractTestRegionLocator.java    |   3 +-
 .../hbase/client/RegionReplicaTestHelper.java      |  24 +++--
 .../hadoop/hbase/client/TestAsyncAdminBase.java    |   5 +-
 .../hbase/client/TestAsyncAdminMasterSwitch.java   |   2 -
 .../client/TestAsyncAdminWithRegionReplicas.java   |   2 +-
 .../hbase/client/TestAsyncMetaRegionLocator.java   |   5 +-
 .../client/TestAsyncTableUseMetaReplicas.java      |   2 +-
 .../hadoop/hbase/client/TestClientTimeouts.java    |  18 +++-
 .../apache/hadoop/hbase/client/TestConnection.java |   3 +
 .../client/TestFromClientSideWithCoprocessor.java  |   1 +
 .../hbase/client/TestMetaRegionLocationCache.java  |   6 +-
 .../hadoop/hbase/client/TestMetaWithReplicas.java  |   5 +-
 .../hbase/client/TestReplicaWithCluster.java       |   7 +-
 .../hadoop/hbase/client/TestReplicasClient.java    |   7 +-
 .../hbase/client/TestZKConnectionRegistry.java     |   2 +-
 .../hadoop/hbase/ipc/TestRpcClientLeaks.java       |  30 +++++-
 .../hadoop/hbase/master/AlwaysStandByHMaster.java  | 101 +++++++++++++++++++++
 .../hbase/master/TestAlwaysStandByHMaster.java     |  67 ++++++++++++++
 .../TestMasterOperationsForRegionReplicas.java     |  16 +++-
 .../hadoop/hbase/master/TestMasterShutdown.java    |  14 ++-
 .../hbase/master/TestMigrateNamespaceTable.java    |   9 +-
 .../assignment/TestRegionMoveAndAbandon.java       |   1 +
 .../hbase/master/assignment/TestRegionSplit.java   |   1 +
 .../hbase/namespace/TestNamespaceAuditor.java      |  23 ++---
 .../regionserver/TestRSKilledWhenInitializing.java |   5 +-
 .../regionserver/TestRegionServerNoMaster.java     |   4 +-
 .../TestSplitTransactionOnCluster.java             |   8 +-
 .../hbase/replication/TestReplicationBase.java     |  27 ++++--
 .../TestReplicationDisableInactivePeer.java        |   4 +-
 .../replication/TestReplicationDroppedTables.java  |  31 +++----
 .../TestReplicationStatusAfterLagging.java         |   2 +-
 ...licationStatusBothNormalAndRecoveryLagging.java |   2 +-
 ...ationStatusSourceStartedTargetStoppedNewOp.java |   2 +-
 ...ationStatusSourceStartedTargetStoppedNoOps.java |   2 +-
 ...atusSourceStartedTargetStoppedWithRecovery.java |   2 +-
 .../replication/TestReplicationSyncUpTool.java     |  18 ++--
 .../replication/TestReplicationSyncUpToolBase.java |  31 ++++++-
 ...estReplicationSyncUpToolWithBulkLoadedData.java |   8 +-
 ...stRegionReplicaReplicationEndpointNoMaster.java |   5 +-
 .../hbase/security/token/SecureTestCluster.java    |   2 +-
 .../token/TestDelegationTokenWithEncryption.java   |   8 +-
 .../token/TestGenerateDelegationToken.java         |   8 +-
 .../security/token/TestTokenAuthentication.java    |   5 +
 58 files changed, 543 insertions(+), 176 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
index 9308443..5688dea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
@@ -36,7 +36,7 @@ final class ConnectionRegistryFactory {
    */
   static ConnectionRegistry getRegistry(Configuration conf) {
     Class<? extends ConnectionRegistry> clazz = conf.getClass(
-        CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
+        CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, MasterRegistry.class,
         ConnectionRegistry.class);
     return ReflectionUtils.newInstance(clazz, conf);
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
index 5680847..9207538 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -90,6 +90,7 @@ public class MasterRegistry implements ConnectionRegistry {
     } else {
       finalConf = conf;
     }
+    finalConf.set(MASTER_ADDRS_KEY, conf.get(MASTER_ADDRS_KEY, MASTER_ADDRS_DEFAULT));
     rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
         HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     masterServers = new HashSet<>();
@@ -146,12 +147,13 @@ public class MasterRegistry implements ConnectionRegistry {
       if (rpcResult == null) {
         future.completeExceptionally(
             new MasterRegistryFetchException(masterServers, hrc.getFailed()));
+        return;
       }
       if (!isValidResp.test(rpcResult)) {
         // Rpc returned ok, but result was malformed.
         future.completeExceptionally(new IOException(
             String.format("Invalid result for request %s. Will be retried", debug)));
-
+        return;
       }
       future.complete(transformResult.apply(rpcResult));
     };
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index fefeea6..f569e47 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -606,7 +606,6 @@ public final class ReplicationPeerConfigUtil {
       compound.addStringMap(peerConfig.getConfiguration());
       return compound;
     }
-
     return otherConf;
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
index d96b676..e5b4de2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
@@ -49,6 +49,8 @@ public class SecurityInfo {
         new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
     infos.put(MasterProtos.HbckService.getDescriptor().getName(),
         new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
+    infos.put(MasterProtos.ClientMetaService.getDescriptor().getName(),
+        new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
     // NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider ALSO ELSE
     // new Service will not be found when all is Kerberized!!!!
   }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
index 0f39e8b..67de5fb 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
@@ -250,7 +250,7 @@ public class HBaseConfiguration extends Configuration {
    * @return the merged configuration with override properties and cluster key applied
    */
   public static Configuration createClusterConf(Configuration baseConf, String clusterKey,
-                                                String overridePrefix) throws IOException {
+      String overridePrefix) throws IOException {
     Configuration clusterConf = HBaseConfiguration.create(baseConf);
     if (clusterKey != null && !clusterKey.isEmpty()) {
       applyClusterKeyToConf(clusterConf, clusterKey);
@@ -268,14 +268,21 @@ public class HBaseConfiguration extends Configuration {
    * used to communicate with distant clusters
    * @param conf configuration object to configure
    * @param key string that contains the 3 required configuratins
-   * @throws IOException
    */
   private static void applyClusterKeyToConf(Configuration conf, String key)
-      throws IOException{
+      throws IOException {
     ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
     conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString());
     conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort());
     conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent());
+    // Without the right registry, the above configs are useless. Also, we don't use setClass()
+    // here because the ConnectionRegistry* classes are not resolvable from this module.
+    // This will be broken if ZkConnectionRegistry class gets renamed or moved. Is there a better
+    // way?
+    LOG.info("Overriding client registry implementation to {}",
+        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
+    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
   }
 
   /**
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index a7acc8e..c2354a9 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -183,6 +183,10 @@ public final class HConstants {
 
   public static final String MASTER_ADDRS_DEFAULT =  "localhost:" + DEFAULT_MASTER_PORT;
 
+  /** Full class name of the Zookeeper based connection registry implementation */
+  public static final String ZK_CONNECTION_REGISTRY_CLASS =
+      "org.apache.hadoop.hbase.client.ZKConnectionRegistry";
+
   /** Configuration to enable hedged reads on master registry **/
   public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
       "hbase.client.master_registry.enable_hedged_reads";
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
index 1fdd464..b67fffa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
@@ -91,7 +91,7 @@ public class LocalHBaseCluster {
    */
   public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
   throws IOException {
-    this(conf, 1, noRegionServers, getMasterImplementation(conf),
+    this(conf, 1, 0, noRegionServers, getMasterImplementation(conf),
         getRegionServerImplementation(conf));
   }
 
@@ -106,7 +106,7 @@ public class LocalHBaseCluster {
   public LocalHBaseCluster(final Configuration conf, final int noMasters,
       final int noRegionServers)
   throws IOException {
-    this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
+    this(conf, noMasters, 0, noRegionServers, getMasterImplementation(conf),
         getRegionServerImplementation(conf));
   }
 
@@ -122,6 +122,12 @@ public class LocalHBaseCluster {
        HMaster.class);
   }
 
+  public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers,
+      final Class<? extends HMaster> masterClass,
+      final Class<? extends HRegionServer> regionServerClass) throws IOException {
+    this(conf, noMasters, 0, noRegionServers, masterClass, regionServerClass);
+  }
+
   /**
    * Constructor.
    * @param conf Configuration to use.  Post construction has the master's
@@ -134,9 +140,9 @@ public class LocalHBaseCluster {
    */
   @SuppressWarnings("unchecked")
   public LocalHBaseCluster(final Configuration conf, final int noMasters,
-    final int noRegionServers, final Class<? extends HMaster> masterClass,
-    final Class<? extends HRegionServer> regionServerClass)
-  throws IOException {
+      final int noAlwaysStandByMasters, final int noRegionServers,
+      final Class<? extends HMaster> masterClass,
+      final Class<? extends HRegionServer> regionServerClass) throws IOException {
     this.conf = conf;
 
     // When active, if a port selection is default then we switch to random
@@ -170,24 +176,22 @@ public class LocalHBaseCluster {
     this.masterClass = (Class<? extends HMaster>)
       conf.getClass(HConstants.MASTER_IMPL, masterClass);
     // Start the HMasters.
-    for (int i = 0; i < noMasters; i++) {
+    int i;
+    for (i = 0; i < noMasters; i++) {
       addMaster(new Configuration(conf), i);
     }
-
-    // Populate the master address host ports in the config. This is needed if a master based
-    // registry is configured for client metadata services (HBASE-18095)
-    List<String> masterHostPorts = new ArrayList<>();
-    getMasters().forEach(masterThread ->
-        masterHostPorts.add(masterThread.getMaster().getServerName().getAddress().toString()));
-    conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts));
-
+    for (int j = 0; j < noAlwaysStandByMasters; j++) {
+      Configuration c = new Configuration(conf);
+      c.set(HConstants.MASTER_IMPL, "org.apache.hadoop.hbase.master.AlwaysStandByHMaster");
+      addMaster(c, i + j);
+    }
     // Start the HRegionServers.
     this.regionServerClass =
       (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
        regionServerClass);
 
-    for (int i = 0; i < noRegionServers; i++) {
-      addRegionServer(new Configuration(conf), i);
+    for (int j = 0; j < noRegionServers; j++) {
+      addRegionServer(new Configuration(conf), j);
     }
   }
 
@@ -233,8 +237,13 @@ public class LocalHBaseCluster {
     // its Connection instance rather than share (see HBASE_INSTANCES down in
     // the guts of ConnectionManager.
     JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
-        (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
+        (Class<? extends HMaster>) c.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
     this.masterThreads.add(mt);
+    // Refresh the master address config.
+    List<String> masterHostPorts = new ArrayList<>();
+    getMasters().forEach(masterThread ->
+        masterHostPorts.add(masterThread.getMaster().getServerName().getAddress().toString()));
+    conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts));
     return mt;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
index 99cab62..606741b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
@@ -56,15 +56,15 @@ public class ActiveMasterManager extends ZKListener {
   final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false);
   final AtomicBoolean clusterShutDown = new AtomicBoolean(false);
 
-  // This server's information.
-  private final ServerName sn;
-  private int infoPort;
-  private final Server master;
+  // This server's information. Package-private for child implementations.
+  int infoPort;
+  final ServerName sn;
+  final Server master;
 
   // Active master's server name. Invalidated anytime active master changes (based on ZK
   // notifications) and lazily fetched on-demand.
   // ServerName is immutable, so we don't need heavy synchronization around it.
-  private volatile ServerName activeMasterServerName;
+  volatile ServerName activeMasterServerName;
 
   /**
    * @param watcher ZK watcher
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index d8f71a9..9527d04 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -575,7 +575,7 @@ public class HMaster extends HRegionServer implements MasterServices {
       // Some unit tests don't need a cluster, so no zookeeper at all
       if (!conf.getBoolean("hbase.testing.nocluster", false)) {
         this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper);
-        this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
+        this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
       } else {
         this.metaRegionLocationCache = null;
         this.activeMasterManager = null;
@@ -589,6 +589,15 @@ public class HMaster extends HRegionServer implements MasterServices {
     }
   }
 
+  /**
+   * Protected to have custom implementations in tests override the default ActiveMaster
+   * implementation.
+   */
+  protected ActiveMasterManager createActiveMasterManager(
+      ZKWatcher zk, ServerName sn, org.apache.hadoop.hbase.Server server) {
+    return new ActiveMasterManager(zk, sn, server);
+  }
+
   @Override
   protected String getUseThisHostnameInstead(Configuration conf) {
     return conf.get(MASTER_HOSTNAME_KEY);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 524fcd9..b20bfe7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -789,8 +789,17 @@ public class HRegionServer extends HasThread implements
     return true;
   }
 
-  private Configuration unsetClientZookeeperQuorum() {
+  private Configuration cleanupConfiguration() {
     Configuration conf = this.conf;
+    // We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
+    // - Decouples RS and master life cycles. RegionServers can continue be up independent of
+    //   masters' availability.
+    // - Configuration management for region servers (cluster internal) is much simpler when adding
+    //   new masters or removing existing masters, since only clients' config needs to be updated.
+    // - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
+    //   other internal connections too.
+    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
     if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
       // Use server ZK cluster for server-issued connections, so we clone
       // the conf and unset the client ZK related properties
@@ -824,7 +833,7 @@ public class HRegionServer extends HasThread implements
    */
   protected final synchronized void setupClusterConnection() throws IOException {
     if (asyncClusterConnection == null) {
-      Configuration conf = unsetClientZookeeperQuorum();
+      Configuration conf = cleanupConfiguration();
       InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0);
       User user = userProvider.getCurrent();
       asyncClusterConnection =
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
index 72e36a8..b7ab7f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
@@ -40,6 +40,8 @@ public class HBasePolicyProvider extends PolicyProvider {
     new Service("security.client.protocol.acl", AdminService.BlockingInterface.class),
     new Service("security.client.protocol.acl",
       MasterProtos.HbckService.BlockingInterface.class),
+    new Service("security.client.protocol.acl",
+      MasterProtos.ClientMetaService.BlockingInterface.class),
     new Service("security.admin.protocol.acl", MasterService.BlockingInterface.class),
     new Service("security.masterregion.protocol.acl",
       RegionServerStatusService.BlockingInterface.class)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index d2f7487..589e4a4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1114,8 +1114,9 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
     Configuration c = new Configuration(this.conf);
     TraceUtil.initTracer(c);
     this.hbaseCluster =
-        new MiniHBaseCluster(c, option.getNumMasters(), option.getNumRegionServers(),
-            option.getRsPorts(), option.getMasterClass(), option.getRsClass());
+        new MiniHBaseCluster(c, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
+            option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
+            option.getRsClass());
     // Populate the master address configuration from mini cluster configuration.
     conf.set(HConstants.MASTER_ADDRS_KEY,
         c.get(HConstants.MASTER_ADDRS_KEY, HConstants.MASTER_ADDRS_DEFAULT));
@@ -1231,6 +1232,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
     StartMiniClusterOption option =
         StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
     restartHBaseCluster(option);
+    invalidateConnection();
   }
 
   public void restartHBaseCluster(StartMiniClusterOption option)
@@ -1244,8 +1246,9 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
       this.asyncConnection = null;
     }
     this.hbaseCluster =
-        new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumRegionServers(),
-            option.getRsPorts(), option.getMasterClass(), option.getRsClass());
+        new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
+            option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
+            option.getRsClass());
     // Don't leave here till we've done a successful scan of the hbase:meta
     Connection conn = ConnectionFactory.createConnection(this.conf);
     Table t = conn.getTable(TableName.META_TABLE_NAME);
@@ -3020,6 +3023,26 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
   }
 
   /**
+   * Resets the connections so that the next time getConnection() is called, a new connection is
+   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
+   * the connection is not valid anymore.
+   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
+   *   written, not all start() stop() calls go through this class. Most tests directly operate on
+   *   the underlying mini/local hbase cluster. That makes it difficult for this wrapper class to
+   *   maintain the connection state automatically. Cleaning this is a much bigger refactor.
+   */
+  public void invalidateConnection() throws IOException {
+    closeConnection();
+    // Update the master addresses if they changed.
+    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
+    final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY);
+    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
+        masterConfigBefore, masterConfAfter);
+    conf.set(HConstants.MASTER_ADDRS_KEY,
+        getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY));
+  }
+
+  /**
    * Get a Connection to the cluster. Not thread-safe (This class needs a lot of work to make it
    * thread-safe).
    * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniClusterRule.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniClusterRule.java
index 6ac4838..89fbded4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniClusterRule.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniClusterRule.java
@@ -66,6 +66,13 @@ public class MiniClusterRule extends ExternalResource {
   }
 
   /**
+   * @return the underlying instance of {@link HBaseTestingUtility}
+   */
+  public HBaseTestingUtility getTestingUtility() {
+    return testingUtility;
+  }
+
+  /**
    * Create a {@link AsyncConnection} to the managed {@link MiniHBaseCluster}. It's up to the caller
    * to {@link AsyncConnection#close() close()} the connection when finished.
    */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
index 84e6f8f..53c590b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
@@ -88,7 +88,7 @@ public class MiniHBaseCluster extends HBaseCluster {
          Class<? extends HMaster> masterClass,
          Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
       throws IOException, InterruptedException {
-    this(conf, numMasters, numRegionServers, null, masterClass, regionserverClass);
+    this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
   }
 
   /**
@@ -99,9 +99,8 @@ public class MiniHBaseCluster extends HBaseCluster {
    * @throws IOException
    * @throws InterruptedException
    */
-  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
-         List<Integer> rsPorts,
-         Class<? extends HMaster> masterClass,
+  public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
+         int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
          Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
       throws IOException, InterruptedException {
     super(conf);
@@ -109,7 +108,8 @@ public class MiniHBaseCluster extends HBaseCluster {
     // Hadoop 2
     CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
 
-    init(numMasters, numRegionServers, rsPorts, masterClass, regionserverClass);
+    init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
+        regionserverClass);
     this.initialClusterStatus = getClusterMetrics();
   }
 
@@ -225,9 +225,9 @@ public class MiniHBaseCluster extends HBaseCluster {
     }
   }
 
-  private void init(final int nMasterNodes, final int nRegionNodes, List<Integer> rsPorts,
-                 Class<? extends HMaster> masterClass,
-                 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
+  private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
+      final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
+      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
   throws IOException, InterruptedException {
     try {
       if (masterClass == null){
@@ -238,7 +238,7 @@ public class MiniHBaseCluster extends HBaseCluster {
       }
 
       // start up a LocalHBaseCluster
-      hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
+      hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0,
           masterClass, regionserverClass);
 
       // manually add the regionservers as other users
@@ -553,6 +553,8 @@ public class MiniHBaseCluster extends HBaseCluster {
     } catch (InterruptedException ie) {
       throw new IOException("Interrupted adding master to cluster", ie);
     }
+    conf.set(HConstants.MASTER_ADDRS_KEY,
+        hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY));
     return t;
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java
index ad70c95..7a9bd68 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java
@@ -46,6 +46,14 @@ public final class StartMiniClusterOption {
    * can find the active/primary master with {@link MiniHBaseCluster#getMaster()}.
    */
   private final int numMasters;
+
+  /**
+   * Number of masters that always remain standby. These set of masters never transition to active
+   * even if an active master does not exist. These are needed for testing scenarios where there are
+   * no active masters in the cluster but the cluster connection (backed by master registry) should
+   * still work.
+   */
+  private final int numAlwaysStandByMasters;
   /**
    * The class to use as HMaster, or null for default.
    */
@@ -99,11 +107,12 @@ public final class StartMiniClusterOption {
   /**
    * Private constructor. Use {@link Builder#build()}.
    */
-  private StartMiniClusterOption(int numMasters, Class<? extends HMaster> masterClass,
-      int numRegionServers, List<Integer> rsPorts,
+  private StartMiniClusterOption(int numMasters, int numAlwaysStandByMasters,
+      Class<? extends HMaster> masterClass, int numRegionServers, List<Integer> rsPorts,
       Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, int numDataNodes,
       String[] dataNodeHosts, int numZkServers, boolean createRootDir, boolean createWALDir) {
     this.numMasters = numMasters;
+    this.numAlwaysStandByMasters = numAlwaysStandByMasters;
     this.masterClass = masterClass;
     this.numRegionServers = numRegionServers;
     this.rsPorts = rsPorts;
@@ -119,6 +128,10 @@ public final class StartMiniClusterOption {
     return numMasters;
   }
 
+  public int getNumAlwaysStandByMasters() {
+    return numAlwaysStandByMasters;
+  }
+
   public Class<? extends HMaster> getMasterClass() {
     return masterClass;
   }
@@ -179,6 +192,7 @@ public final class StartMiniClusterOption {
    */
   public static final class Builder {
     private int numMasters = 1;
+    private int numAlwaysStandByMasters = 0;
     private Class<? extends HMaster> masterClass = null;
     private int numRegionServers = 1;
     private List<Integer> rsPorts = null;
@@ -196,8 +210,9 @@ public final class StartMiniClusterOption {
       if (dataNodeHosts != null && dataNodeHosts.length != 0) {
         numDataNodes = dataNodeHosts.length;
       }
-      return new StartMiniClusterOption(numMasters, masterClass, numRegionServers, rsPorts, rsClass,
-          numDataNodes, dataNodeHosts, numZkServers, createRootDir, createWALDir);
+      return new StartMiniClusterOption(numMasters,numAlwaysStandByMasters, masterClass,
+          numRegionServers, rsPorts, rsClass, numDataNodes, dataNodeHosts, numZkServers,
+          createRootDir, createWALDir);
     }
 
     public Builder numMasters(int numMasters) {
@@ -205,6 +220,11 @@ public final class StartMiniClusterOption {
       return this;
     }
 
+    public Builder numAlwaysStandByMasters(int numAlwaysStandByMasters) {
+      this.numAlwaysStandByMasters = numAlwaysStandByMasters;
+      return this;
+    }
+
     public Builder masterClass(Class<? extends HMaster> masterClass) {
       this.masterClass = masterClass;
       return this;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
index f96daf6..89f287b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
@@ -60,8 +60,7 @@ public abstract class AbstractTestRegionLocator {
     UTIL.waitTableAvailable(TABLE_NAME);
     try (ConnectionRegistry registry =
              ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration())) {
-      RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(UTIL.getConfiguration(),
-        registry, REGION_REPLICATION);
+      RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
     }
     UTIL.getAdmin().balancerSwitch(false, true);
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
index 73924a3..8e562bd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
@@ -20,13 +20,13 @@ package org.apache.hadoop.hbase.client;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionLocations;
@@ -43,24 +43,32 @@ public final class RegionReplicaTestHelper {
   }
 
   // waits for all replicas to have region location
-  static void waitUntilAllMetaReplicasHavingRegionLocation(Configuration conf,
-      ConnectionRegistry registry, int regionReplication) throws IOException {
+  static void waitUntilAllMetaReplicasAreReady(HBaseTestingUtility util,
+      ConnectionRegistry registry) {
+    Configuration conf = util.getConfiguration();
+    int regionReplicaCount = util.getConfiguration().getInt(HConstants.META_REPLICAS_NUM,
+        HConstants.DEFAULT_META_REPLICA_NUM);
     Waiter.waitFor(conf, conf.getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
       new ExplainingPredicate<IOException>() {
         @Override
-        public String explainFailure() throws IOException {
+        public String explainFailure() {
           return "Not all meta replicas get assigned";
         }
 
         @Override
-        public boolean evaluate() throws IOException {
+        public boolean evaluate() {
           try {
             RegionLocations locs = registry.getMetaRegionLocations().get();
-            if (locs.size() < regionReplication) {
+            if (locs.size() < regionReplicaCount) {
               return false;
             }
-            for (int i = 0; i < regionReplication; i++) {
-              if (locs.getRegionLocation(i) == null) {
+            for (int i = 0; i < regionReplicaCount; i++) {
+              HRegionLocation loc = locs.getRegionLocation(i);
+              // Wait until the replica is served by a region server. There could be delay between
+              // the replica being available to the connection and region server opening it.
+              Optional<ServerName> rsCarryingReplica =
+                  getRSCarryingReplica(util, loc.getRegion().getTable(), i);
+              if (!rsCarryingReplica.isPresent()) {
                 return false;
               }
             }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
index a02ee90..6090d64 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
@@ -29,6 +29,7 @@ import java.util.regex.Pattern;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;
@@ -83,7 +84,9 @@ public abstract class TestAsyncAdminBase {
     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
     TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
-    TEST_UTIL.startMiniCluster(2);
+    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).
+        numMasters(2).build();
+    TEST_UTIL.startMiniCluster(option);
     ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminMasterSwitch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminMasterSwitch.java
index 159dce9..ce91e54 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminMasterSwitch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminMasterSwitch.java
@@ -48,8 +48,6 @@ public class TestAsyncAdminMasterSwitch extends TestAsyncAdminBase {
     assertEquals(TEST_UTIL.getHBaseCluster().getRegionServerThreads().size(),
       admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.SERVERS_NAME)).join()
         .getServersName().size());
-    // stop the old master, and start a new one
-    TEST_UTIL.getMiniHBaseCluster().startMaster();
     TEST_UTIL.getMiniHBaseCluster().stopMaster(0).join();
     assertTrue(TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(30000));
     // make sure that we could still call master
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
index 6d7d368..50111f7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
@@ -56,7 +56,7 @@ public class TestAsyncAdminWithRegionReplicas extends TestAsyncAdminBase {
     try (ConnectionRegistry registry =
              ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) {
       RegionReplicaTestHelper
-        .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), registry, 3);
+        .waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry);
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
index 609a129..34683e1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
@@ -51,12 +51,11 @@ public class TestAsyncMetaRegionLocator {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    TEST_UTIL.getConfiguration().set(BaseLoadBalancer.TABLES_ON_MASTER, "none");
+    TEST_UTIL.getConfiguration().setBoolean(BaseLoadBalancer.TABLES_ON_MASTER, false);
     TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
     TEST_UTIL.startMiniCluster(3);
     REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
-    RegionReplicaTestHelper
-      .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3);
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
index ed6c66f..6eadba9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
@@ -93,7 +93,7 @@ public class TestAsyncTableUseMetaReplicas {
       FailPrimaryMetaScanCp.class.getName());
     UTIL.startMiniCluster(3);
     try (ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf)) {
-      RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(conf, registry, 3);
+      RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
     }
     try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
       table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index bd1f7cc..a7991c7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -19,11 +19,11 @@ package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
@@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
 import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
 import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -45,7 +45,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -100,9 +100,9 @@ public class TestClientTimeouts {
           connection = ConnectionFactory.createConnection(conf);
           admin = connection.getAdmin();
           admin.balancerSwitch(false, false);
-        } catch (MasterNotRunningException ex) {
+        } catch (MasterRegistryFetchException ex) {
           // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
-          // a MasterNotRunningException. It's a bug if we get other exceptions.
+          // a MasterRegistryFetchException. It's a bug if we get other exceptions.
           lastFailed = true;
         } finally {
           if (admin != null) {
@@ -146,6 +146,14 @@ public class TestClientTimeouts {
         throws UnknownHostException {
       return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout);
     }
+
+    @Override
+    public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
+        throws UnknownHostException {
+      Preconditions.checkArgument(sns != null && sns.size() == 1);
+      return new RandomTimeoutRpcChannel(this, (ServerName)sns.toArray()[0], user, rpcTimeout);
+    }
+
   }
 
   private static AtomicInteger invokations = new AtomicInteger();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java
index 010b883..b92a4d7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java
@@ -315,6 +315,8 @@ public class TestConnection {
     assertTrue(c1.getConfiguration() == c2.getConfiguration());
   }
 
+  /*
+  ====> With MasterRegistry, connections cannot outlast the masters' lifetime.
   @Test
   public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
     Configuration config = new Configuration(TEST_UTIL.getConfiguration());
@@ -337,6 +339,7 @@ public class TestConnection {
     table.close();
     connection.close();
   }
+   */
 
   @Test
   public void testLocateRegionsWithRegionReplicas() throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
index 8845f9a..d78976e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
@@ -43,6 +43,7 @@ public class TestFromClientSideWithCoprocessor extends TestFromClientSide {
   @Parameterized.Parameters
   public static Collection parameters() {
     return Arrays.asList(new Object[][] {
+        { MasterRegistry.class, 1},
         { ZKConnectionRegistry.class, 1}
     });
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
index 1205b05..abaf092 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
@@ -60,8 +60,7 @@ public class TestMetaRegionLocationCache {
     TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
     TEST_UTIL.startMiniCluster(3);
     REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
-    RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(
-        TEST_UTIL.getConfiguration(), REGISTRY, 3);
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
   }
 
@@ -123,8 +122,7 @@ public class TestMetaRegionLocationCache {
     for (HRegionLocation location: currentMetaLocs) {
       RegionReplicaTestHelper.moveRegion(TEST_UTIL, location);
     }
-    RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(
-        TEST_UTIL.getConfiguration(), REGISTRY, 3);
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
     for (JVMClusterUtil.MasterThread masterThread:
         TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
       verifyCachedMetaLocations(masterThread.getMaster());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java
index 55fc289..809aee8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
@@ -85,7 +86,9 @@ public class TestMetaWithReplicas {
     TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
     TEST_UTIL.getConfiguration().setInt(
         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 1000);
-    TEST_UTIL.startMiniCluster(REGIONSERVERS_COUNT);
+    StartMiniClusterOption option = StartMiniClusterOption.builder().
+        numAlwaysStandByMasters(1).numMasters(1).numRegionServers(REGIONSERVERS_COUNT).build();
+    TEST_UTIL.startMiniCluster(option);
     AssignmentManager am = TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
     Set<ServerName> sns = new HashSet<ServerName>();
     ServerName hbaseMetaServerName =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index acf250c..149f8aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -588,11 +588,12 @@ public class TestReplicaWithCluster {
   }
 
   @Test
-  public void testReplicaGetWithRpcClientImpl() throws IOException {
+  public void testReplicaGetWithAsyncRpcClientImpl() throws IOException {
     HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
-    HTU.getConfiguration().set("hbase.rpc.client.impl", "org.apache.hadoop.hbase.ipc.RpcClientImpl");
+    HTU.getConfiguration().set(
+        "hbase.rpc.client.impl", "org.apache.hadoop.hbase.ipc.AsyncRpcClient");
     // Create table then get the single region for our new table.
-    HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithRpcClientImpl");
+    HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithAsyncRpcClientImpl");
     hdt.setRegionReplication(NB_SERVERS);
     hdt.addCoprocessor(SlowMeCopro.class.getName());
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index cacc39a..3e507d7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -21,7 +21,6 @@ import com.codahale.metrics.Counter;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
-import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +36,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -104,7 +104,6 @@ public class TestReplicasClient {
         new AtomicReference<>(new CountDownLatch(0));
     private static final AtomicReference<CountDownLatch> secondaryCdl =
         new AtomicReference<>(new CountDownLatch(0));
-    Random r = new Random();
     public SlowMeCopro() {
     }
 
@@ -192,7 +191,9 @@ public class TestReplicasClient {
         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
     HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
     HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
-    HTU.startMiniCluster(NB_SERVERS);
+    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(1).
+        numAlwaysStandByMasters(1).numMasters(1).build();
+    HTU.startMiniCluster(option);
 
     // Create table then get the single region for our new table.
     HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
index cb45ee5..d8a228e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
@@ -86,7 +86,7 @@ public class TestZKConnectionRegistry {
     assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
       REGISTRY.getActiveMaster().get());
     RegionReplicaTestHelper
-      .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3);
+      .waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
     RegionLocations locs = REGISTRY.getMetaRegionLocations().get();
     assertEquals(3, locs.getRegionLocations().length);
     IntStream.range(0, 3).forEach(i -> {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
index 7cce8e8..27c1235 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
@@ -24,12 +24,15 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -37,6 +40,7 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
@@ -48,6 +52,8 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
 
 @Category(MediumTests.class)
 public class TestRpcClientLeaks {
@@ -63,6 +69,9 @@ public class TestRpcClientLeaks {
 
   public static class MyRpcClientImpl extends BlockingRpcClient {
 
+    // Exceptions thrown only when this is set to false.
+    private static boolean throwException = false;
+
     public MyRpcClientImpl(Configuration conf) {
       super(conf);
     }
@@ -78,12 +87,26 @@ public class TestRpcClientLeaks {
         @Override
         protected synchronized void setupConnection() throws IOException {
           super.setupConnection();
-          SAVED_SOCKETS.add(socket);
-          throw new IOException(
-            "Sample exception for verifying socket closure in case of exceptions.");
+          if (throwException) {
+            SAVED_SOCKETS.add(socket);
+            throw new IOException(
+                "Sample exception for verifying socket closure in case of exceptions.");
+          }
         }
       };
     }
+
+    // To keep the registry paths happy.
+    @Override
+    public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
+        throws UnknownHostException {
+      Preconditions.checkState(sns != null && sns.size() == 1);
+      return super.createRpcChannel((ServerName)sns.toArray()[0], user, rpcTimeout);
+    }
+
+    public static void enableThrowExceptions() {
+      throwException = true;
+    }
   }
 
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@@ -110,6 +133,7 @@ public class TestRpcClientLeaks {
     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
     try (Connection connection = ConnectionFactory.createConnection(conf);
       Table table = connection.getTable(TableName.valueOf(name.getMethodName()))) {
+      MyRpcClientImpl.enableThrowExceptions();
       table.get(new Get(Bytes.toBytes("asd")));
       fail("Should fail because the injected error");
     } catch (RetriesExhaustedException e) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java
new file mode 100644
index 0000000..41a008a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of HMaster that always runs as a stand by and never transitions to active.
+ */
+public class AlwaysStandByHMaster extends HMaster {
+  /**
+   * An implementation of ActiveMasterManager that never transitions it's master to active state. It
+   * always remains as a stand by master. With the master registry implementation (HBASE-18095) it
+   * is expected to have at least one active / standby master always running at any point in time
+   * since they serve as the gateway for client connections.
+   *
+   * With this implementation, tests can simulate the scenario of not having an active master yet
+   * the client connections to the cluster succeed.
+   */
+  private static class AlwaysStandByMasterManager extends ActiveMasterManager {
+    private static final Logger LOG =
+        LoggerFactory.getLogger(AlwaysStandByMasterManager.class);
+
+    AlwaysStandByMasterManager(ZKWatcher watcher, ServerName sn, Server master) {
+      super(watcher, sn, master);
+    }
+
+    /**
+     * An implementation that never transitions to an active master.
+     */
+    boolean blockUntilBecomingActiveMaster(int checkInterval, MonitoredTask startupStatus) {
+      while (!(master.isAborted() || master.isStopped())) {
+        startupStatus.setStatus("Forever looping to stay as a standby master.");
+        try {
+          activeMasterServerName = null;
+          try {
+            if (MasterAddressTracker.getMasterAddress(watcher) != null) {
+              clusterHasActiveMaster.set(true);
+            }
+            Threads.sleepWithoutInterrupt(100);
+          } catch (IOException e) {
+            // pass, we will get notified when some other active master creates the znode.
+          }
+        } catch (KeeperException e) {
+          master.abort("Received an unexpected KeeperException, aborting", e);
+          return false;
+        }
+        synchronized (this.clusterHasActiveMaster) {
+          while (clusterHasActiveMaster.get() && !master.isStopped()) {
+            try {
+              clusterHasActiveMaster.wait(checkInterval);
+            } catch (InterruptedException e) {
+              // We expect to be interrupted when a master dies,
+              //  will fall out if so
+              LOG.debug("Interrupted waiting for master to die", e);
+            }
+          }
+          if (clusterShutDown.get()) {
+            this.master.stop(
+                "Cluster went down before this master became active");
+          }
+        }
+      }
+      return false;
+    }
+  }
+
+  public AlwaysStandByHMaster(Configuration conf) throws IOException {
+    super(conf);
+  }
+
+  protected ActiveMasterManager createActiveMasterManager(
+      ZKWatcher zk, ServerName sn, org.apache.hadoop.hbase.Server server) {
+    return new AlwaysStandByMasterManager(zk, sn, server);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAlwaysStandByHMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAlwaysStandByHMaster.java
new file mode 100644
index 0000000..a49ae50
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAlwaysStandByHMaster.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MiniClusterRule;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MediumTests.class, MasterTests.class})
+public class TestAlwaysStandByHMaster {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestAlwaysStandByHMaster.class);
+
+  private static final StartMiniClusterOption OPTION = StartMiniClusterOption.builder().
+      numAlwaysStandByMasters(1).numMasters(1).numRegionServers(1).build();
+
+  @ClassRule
+  public static final MiniClusterRule miniClusterRule = new MiniClusterRule(OPTION);
+
+  /**
+   * Tests that the AlwaysStandByHMaster does not transition to active state even if no active
+   * master exists.
+   */
+  @Test  public void testAlwaysStandBy() throws Exception {
+    HBaseTestingUtility testUtil = miniClusterRule.getTestingUtility();
+    // Make sure there is an active master.
+    assertNotNull(testUtil.getMiniHBaseCluster().getMaster());
+    assertEquals(2, testUtil.getMiniHBaseCluster().getMasterThreads().size());
+    // Kill the only active master.
+    testUtil.getMiniHBaseCluster().stopMaster(0).join();
+    // Wait for 5s to make sure the always standby doesn't transition to active state.
+    assertFalse(testUtil.getMiniHBaseCluster().waitForActiveAndReadyMaster(5000));
+    // Add a new master.
+    HMaster newActive = testUtil.getMiniHBaseCluster().startMaster().getMaster();
+    assertTrue(testUtil.getMiniHBaseCluster().waitForActiveAndReadyMaster(5000));
+    // Newly added master should be the active.
+    assertEquals(newActive.getServerName(),
+        testUtil.getMiniHBaseCluster().getMaster().getServerName());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java
index 5fdecae..952cc0c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterMetrics.Option;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -86,6 +87,8 @@ public class TestMasterOperationsForRegionReplicas {
   private static Connection CONNECTION = null;
   private static Admin ADMIN;
   private static int numSlaves = 2;
+  private final static StartMiniClusterOption option = StartMiniClusterOption.builder().
+      numRegionServers(numSlaves).numMasters(1).numAlwaysStandByMasters(1).build();
   private static Configuration conf;
 
   @Rule
@@ -95,16 +98,21 @@ public class TestMasterOperationsForRegionReplicas {
   public static void setupBeforeClass() throws Exception {
     conf = TEST_UTIL.getConfiguration();
     conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
-    TEST_UTIL.startMiniCluster(numSlaves);
+    TEST_UTIL.startMiniCluster(option);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
-    CONNECTION = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
-    ADMIN = CONNECTION.getAdmin();
+    resetConnections();
     while (ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics()
       .size() < numSlaves) {
       Thread.sleep(100);
     }
   }
 
+  private static void resetConnections() throws IOException {
+    IOUtils.closeQuietly(ADMIN, CONNECTION);
+    CONNECTION = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+    ADMIN = CONNECTION.getAdmin();
+  }
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     Closeables.close(ADMIN, true);
@@ -199,6 +207,7 @@ public class TestMasterOperationsForRegionReplicas {
       TEST_UTIL.startMiniHBaseCluster(option);
       TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
       TEST_UTIL.waitUntilNoRegionsInTransition();
+      resetConnections();
       validateFromSnapshotFromMeta(TEST_UTIL, tableName, numRegions, numReplica,
         ADMIN.getConnection());
 
@@ -208,6 +217,7 @@ public class TestMasterOperationsForRegionReplicas {
       TEST_UTIL.startMiniHBaseCluster();
       TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
       TEST_UTIL.waitUntilNoRegionsInTransition();
+      resetConnections();
       validateSingleRegionServerAssignment(ADMIN.getConnection(), numRegions, numReplica);
       for (int i = 1; i < numSlaves; i++) { // restore the cluster
         TEST_UTIL.getMiniHBaseCluster().startRegionServer();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java
index 5ce7886..6bbf2c0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterShutdown.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.LocalHBaseCluster;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -132,12 +133,19 @@ public class TestMasterShutdown {
     util.startMiniDFSCluster(3);
     util.startMiniZKCluster();
     util.createRootDir();
-    final LocalHBaseCluster cluster =
-        new LocalHBaseCluster(conf, NUM_MASTERS, NUM_RS, HMaster.class,
-            MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
+    final LocalHBaseCluster cluster = new LocalHBaseCluster(conf, NUM_MASTERS, NUM_RS,
+        HMaster.class, MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
     final int MASTER_INDEX = 0;
     final MasterThread master = cluster.getMasters().get(MASTER_INDEX);
     master.start();
+    // Switching to master registry exposed a race in the master bootstrap that can result in a
+    // lost shutdown command (HBASE-8422). The race is essentially because the server manager in
+    // HMaster is not initialized by the time shutdown() RPC (below) is made to
+    // the master. The reason it was not happening earlier is because the connection creation with
+    // ZK registry is so slow that by then the server manager is init'ed thus masking the problem.
+    // For now, I'm putting a wait() here to workaround the issue, I think the fix for it is a
+    // little delicate and needs to be done separately.
+    Waiter.waitFor(conf, 5000, () -> master.getMaster().getServerManager() != null);
     LOG.info("Called master start on " + master.getName());
     Thread shutdownThread = new Thread("Shutdown-Thread") {
       @Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMigrateNamespaceTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMigrateNamespaceTable.java
index d114317..b4d378f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMigrateNamespaceTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMigrateNamespaceTable.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
@@ -55,11 +56,13 @@ public class TestMigrateNamespaceTable {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    UTIL.startMiniCluster(1);
+    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(1).
+        numAlwaysStandByMasters(1).numRegionServers(1).build();
+    UTIL.startMiniCluster(option);
   }
 
   @AfterClass
-  public static void tearDow() throws Exception {
+  public static void tearDown() throws Exception {
     UTIL.shutdownMiniCluster();
   }
 
@@ -82,6 +85,7 @@ public class TestMigrateNamespaceTable {
     masterThread.getMaster().stop("For testing");
     masterThread.join();
     UTIL.getMiniHBaseCluster().startMaster();
+
     // 5 + default and system('hbase')
     assertEquals(7, UTIL.getAdmin().listNamespaceDescriptors().length);
     for (int i = 0; i < 5; i++) {
@@ -95,7 +99,6 @@ public class TestMigrateNamespaceTable {
     masterThread = UTIL.getMiniHBaseCluster().getMasterThread();
     masterThread.getMaster().stop("For testing");
     masterThread.join();
-
     UTIL.getMiniHBaseCluster().startMaster();
 
     // make sure that we could still restart the cluster after disabling the namespace table.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionMoveAndAbandon.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionMoveAndAbandon.java
index bb95a6f..61e55fa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionMoveAndAbandon.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionMoveAndAbandon.java
@@ -116,6 +116,7 @@ public class TestRegionMoveAndAbandon {
     // Start up everything again
     LOG.info("Starting cluster");
     UTIL.getMiniHBaseCluster().startMaster();
+    UTIL.invalidateConnection();
     UTIL.ensureSomeRegionServersAvailable(2);
 
     UTIL.waitFor(30_000, new Waiter.Predicate<Exception>() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionSplit.java
index f723af8..5156ce9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionSplit.java
@@ -141,6 +141,7 @@ public class TestRegionSplit {
     JVMClusterUtil.MasterThread t = UTIL.getHBaseCluster().startMaster();
     Thread.sleep(500);
 
+    UTIL.invalidateConnection();
     // enable table
     UTIL.getAdmin().enableTable(tableName);
     Thread.sleep(500);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java
index c961ad3..be95714 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java
@@ -32,7 +32,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +44,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
@@ -117,7 +117,8 @@ public class TestNamespaceAuditor {
     conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
     conf.setClass("hbase.coprocessor.regionserver.classes", CPRegionServerObserver.class,
       RegionServerObserver.class);
-    UTIL.startMiniCluster();
+    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(2).build();
+    UTIL.startMiniCluster(option);
     waitForQuotaInitialize(UTIL);
     ADMIN = UTIL.getAdmin();
   }
@@ -483,17 +484,6 @@ public class TestNamespaceAuditor {
     return getQuotaManager().getState(namespace);
   }
 
-  byte[] getSplitKey(byte[] startKey, byte[] endKey) {
-    String skey = Bytes.toString(startKey);
-    int key;
-    if (StringUtils.isBlank(skey)) {
-      key = Integer.parseInt(Bytes.toString(endKey))/2 ;
-    } else {
-      key = (int) (Integer.parseInt(skey) * 1.5);
-    }
-    return Bytes.toBytes("" + key);
-  }
-
   public static class CustomObserver implements RegionCoprocessor, RegionObserver {
     volatile CountDownLatch postCompact;
 
@@ -546,11 +536,11 @@ public class TestNamespaceAuditor {
     UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
       @Override
       public boolean evaluate() throws Exception {
-       return (getNamespaceState(nsp1).getTables().size() == 2);
+        return (getNamespaceState(nsp1).getTables().size() == 2);
       }
     });
     NamespaceTableAndRegionInfo before = getNamespaceState(nsp1);
-    restartMaster();
+    killActiveMaster();
     NamespaceTableAndRegionInfo after = getNamespaceState(nsp1);
     assertEquals("Expected: " + before.getTables() + " Found: " + after.getTables(), before
         .getTables().size(), after.getTables().size());
@@ -570,10 +560,9 @@ public class TestNamespaceAuditor {
     });
   }
 
-  private void restartMaster() throws Exception {
+  private void killActiveMaster() throws Exception {
     UTIL.getHBaseCluster().getMaster(0).stop("Stopping to start again");
     UTIL.getHBaseCluster().waitOnMaster(0);
-    UTIL.getHBaseCluster().startMaster();
     waitForQuotaInitialize(UTIL);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
index 16ce590..647feec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
@@ -95,9 +95,8 @@ public class TestRSKilledWhenInitializing {
     TEST_UTIL.startMiniDFSCluster(3);
     TEST_UTIL.startMiniZKCluster();
     TEST_UTIL.createRootDir();
-    final LocalHBaseCluster cluster =
-        new LocalHBaseCluster(conf, NUM_MASTERS, NUM_RS, HMaster.class,
-            RegisterAndDieRegionServer.class);
+    final LocalHBaseCluster cluster = new LocalHBaseCluster(conf, NUM_MASTERS, NUM_RS,
+        HMaster.class, RegisterAndDieRegionServer.class);
     final MasterThread master = startMaster(cluster.getMasters().get(0));
     try {
       // Master is up waiting on RegionServers to check in. Now start RegionServers.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index f0c7f2f..4882cad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -146,7 +146,9 @@ public class TestRegionServerNoMaster {
   @AfterClass
   public static void afterClass() throws Exception {
     HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
-    table.close();
+    if (table != null) {
+      table.close();
+    }
     HTU.shutdownMiniCluster();
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 65bd4f5..d9f5e83 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -142,7 +143,8 @@ public class TestSplitTransactionOnCluster {
   @BeforeClass public static void before() throws Exception {
     TESTING_UTIL.getConfiguration().setInt(HConstants.HBASE_BALANCER_PERIOD, 60000);
     StartMiniClusterOption option = StartMiniClusterOption.builder()
-        .masterClass(MyMaster.class).numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
+        .masterClass(MyMaster.class).numRegionServers(NB_SERVERS).
+            numDataNodes(NB_SERVERS).build();
     TESTING_UTIL.startMiniCluster(option);
   }
 
@@ -814,6 +816,10 @@ public class TestSplitTransactionOnCluster {
     cluster.waitOnMaster(0);
     HMaster master = cluster.startMaster().getMaster();
     cluster.waitForActiveAndReadyMaster();
+    // reset the connections
+    IOUtils.closeQuietly(admin);
+    TESTING_UTIL.invalidateConnection();
+    admin = TESTING_UTIL.getAdmin();
     return master;
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 94150b8..6065454 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -75,8 +76,8 @@ public class TestReplicationBase {
 
   protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
   protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
-  protected static final Configuration CONF1 = UTIL1.getConfiguration();
-  protected static final Configuration CONF2 = UTIL2.getConfiguration();
+  protected static Configuration CONF1 = UTIL1.getConfiguration();
+  protected static Configuration CONF2 = UTIL2.getConfiguration();
 
   protected static final int NUM_SLAVES1 = 2;
   protected static final int NUM_SLAVES2 = 4;
@@ -209,13 +210,27 @@ public class TestReplicationBase {
     conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
   }
 
-  protected static void restartHBaseCluster(HBaseTestingUtility util, int numSlaves)
+  static void restartSourceCluster(int numSlaves)
       throws Exception {
-    util.shutdownMiniHBaseCluster();
-    util.restartHBaseCluster(numSlaves);
+    IOUtils.closeQuietly(hbaseAdmin, htable1);
+    UTIL1.shutdownMiniHBaseCluster();
+    UTIL1.restartHBaseCluster(numSlaves);
+    // Invalidate the cached connection state.
+    CONF1 = UTIL1.getConfiguration();
+    hbaseAdmin = UTIL1.getAdmin();
+    Connection connection1 = UTIL1.getConnection();
+    htable1 = connection1.getTable(tableName);
+  }
+
+  static void restartTargetHBaseCluster(int numSlaves) throws Exception {
+    IOUtils.closeQuietly(htable2);
+    UTIL2.restartHBaseCluster(numSlaves);
+    // Invalidate the cached connection state
+    CONF2 = UTIL2.getConfiguration();
+    htable2 = UTIL2.getConnection().getTable(tableName);
   }
 
-  protected static void startClusters() throws Exception {
+  private static void startClusters() throws Exception {
     UTIL1.startMiniZKCluster();
     MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
     LOG.info("Setup first Zk");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java
index 4a5cfc1..4ea0bcf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.fail;
 
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -66,8 +65,7 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase {
 
     // disable and start the peer
     hbaseAdmin.disableReplicationPeer("2");
-    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build();
-    UTIL2.startMiniHBaseCluster(option);
+    restartTargetHBaseCluster(2);
     Get get = new Get(rowkey);
     for (int i = 0; i < NB_RETRIES; i++) {
       Result res = htable2.get(get);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
index 1d391d3..2d039b0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
@@ -121,20 +121,13 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
   @Test
   public void testEditsDroppedWithDroppedTableNS() throws Exception {
     // also try with a namespace
-    Connection connection1 = ConnectionFactory.createConnection(CONF1);
-    try (Admin admin1 = connection1.getAdmin()) {
-      admin1.createNamespace(NamespaceDescriptor.create("NS").build());
-    }
-    Connection connection2 = ConnectionFactory.createConnection(CONF2);
-    try (Admin admin2 = connection2.getAdmin()) {
-      admin2.createNamespace(NamespaceDescriptor.create("NS").build());
-    }
-    testEditsBehindDroppedTable(true, "NS:test_dropped");
-    try (Admin admin1 = connection1.getAdmin()) {
-      admin1.deleteNamespace("NS");
-    }
-    try (Admin admin2 = connection2.getAdmin()) {
-      admin2.deleteNamespace("NS");
+    UTIL1.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
+    UTIL2.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
+    try {
+      testEditsBehindDroppedTable(true, "NS:test_dropped");
+    } finally {
+      UTIL1.getAdmin().deleteNamespace("NS");
+      UTIL2.getAdmin().deleteNamespace("NS");
     }
   }
 
@@ -148,8 +141,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
 
     // make sure we have a single region server only, so that all
     // edits for all tables go there
-    UTIL1.shutdownMiniHBaseCluster();
-    UTIL1.startMiniHBaseCluster();
+    restartSourceCluster(1);
 
     TableName tablename = TableName.valueOf(tName);
     byte[] familyName = Bytes.toBytes("fam");
@@ -161,8 +153,8 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
                 .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
             .build();
 
-    Connection connection1 = ConnectionFactory.createConnection(CONF1);
-    Connection connection2 = ConnectionFactory.createConnection(CONF2);
+    Connection connection1 = ConnectionFactory.createConnection(UTIL1.getConfiguration());
+    Connection connection2 = ConnectionFactory.createConnection(UTIL2.getConfiguration());
     try (Admin admin1 = connection1.getAdmin()) {
       admin1.createTable(table);
     }
@@ -223,8 +215,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
 
     // make sure we have a single region server only, so that all
     // edits for all tables go there
-    UTIL1.shutdownMiniHBaseCluster();
-    UTIL1.startMiniHBaseCluster();
+    restartSourceCluster(1);
 
     TableName tablename = TableName.valueOf("testdroppedtimed");
     byte[] familyName = Bytes.toBytes("fam");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java
index edeaf9d..79520d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java
@@ -45,7 +45,7 @@ public class TestReplicationStatusAfterLagging extends TestReplicationBase {
   @Test
   public void testReplicationStatusAfterLagging() throws Exception {
     UTIL2.shutdownMiniHBaseCluster();
-    restartHBaseCluster(UTIL1, 1);
+    restartSourceCluster(1);
     // add some values to cluster 1
     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
       Put p = new Put(Bytes.toBytes("row" + i));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
index 16d3822..26f836c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
@@ -52,7 +52,7 @@ public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestRepli
       htable1.put(p);
     }
     Thread.sleep(10000);
-    restartHBaseCluster(UTIL1, 1);
+    restartSourceCluster(1);
     Admin hbaseAdmin = UTIL1.getAdmin();
     ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
     Thread.sleep(10000);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
index 6deb095..243e245 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
@@ -46,7 +46,7 @@ public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestRe
   @Test
   public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
     UTIL2.shutdownMiniHBaseCluster();
-    restartHBaseCluster(UTIL1, 1);
+    restartSourceCluster(1);
     Admin hbaseAdmin = UTIL1.getAdmin();
     // add some values to source cluster
     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
index 01f49f4..24c5051 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
@@ -43,7 +43,7 @@ public class TestReplicationStatusSourceStartedTargetStoppedNoOps extends TestRe
   @Test
   public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception {
     UTIL2.shutdownMiniHBaseCluster();
-    restartHBaseCluster(UTIL1, 1);
+    restartSourceCluster(1);
     Admin hbaseAdmin = UTIL1.getAdmin();
     ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
     Thread.sleep(10000);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
index fde87bc..0f3450e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
@@ -54,7 +54,7 @@ public class TestReplicationStatusSourceStartedTargetStoppedWithRecovery
       htable1.put(p);
     }
     Thread.sleep(10000);
-    restartHBaseCluster(UTIL1, 1);
+    restartSourceCluster(1);
     Admin hbaseAdmin = UTIL1.getAdmin();
     ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
     Thread.sleep(10000);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index c3bbca9..fa6109a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -144,7 +144,7 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
 
   private void mimicSyncUpAfterDelete() throws Exception {
     LOG.debug("mimicSyncUpAfterDelete");
-    UTIL2.shutdownMiniHBaseCluster();
+    shutDownTargetHBaseCluster();
 
     List<Delete> list = new ArrayList<>();
     // delete half of the rows
@@ -170,8 +170,8 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
     assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101,
       rowCount_ht2Source);
 
-    UTIL1.shutdownMiniHBaseCluster();
-    UTIL2.restartHBaseCluster(1);
+    shutDownSourceHBaseCluster();
+    restartTargetHBaseCluster(1);
 
     Thread.sleep(SLEEP_TIME);
 
@@ -189,7 +189,7 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
       if (i == NB_RETRIES - 1) {
         if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) {
           // syncUP still failed. Let's look at the source in case anything wrong there
-          UTIL1.restartHBaseCluster(1);
+          restartSourceHBaseCluster(1);
           rowCount_ht1Source = countRows(ht1Source);
           LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
           rowCount_ht2Source = countRows(ht2Source);
@@ -213,8 +213,8 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
 
   private void mimicSyncUpAfterPut() throws Exception {
     LOG.debug("mimicSyncUpAfterPut");
-    UTIL1.restartHBaseCluster(1);
-    UTIL2.shutdownMiniHBaseCluster();
+    restartSourceHBaseCluster(1);
+    shutDownTargetHBaseCluster();
 
     Put p;
     // another 100 + 1 row to t1_syncup
@@ -244,8 +244,8 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
     int rowCount_ht2Source = countRows(ht2Source);
     assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
 
-    UTIL1.shutdownMiniHBaseCluster();
-    UTIL2.restartHBaseCluster(1);
+    shutDownSourceHBaseCluster();
+    restartTargetHBaseCluster(1);
 
     Thread.sleep(SLEEP_TIME);
 
@@ -265,7 +265,7 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
       if (i == NB_RETRIES - 1) {
         if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) {
           // syncUP still failed. Let's look at the source in case anything wrong there
-          UTIL1.restartHBaseCluster(1);
+          restartSourceHBaseCluster(1);
           rowCount_ht1Source = countRows(ht1Source);
           LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
           rowCount_ht2Source = countRows(ht2Source);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
index bf3941d..ee5276d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
@@ -108,7 +109,7 @@ public abstract class TestReplicationSyncUpToolBase {
     UTIL1.shutdownMiniCluster();
   }
 
-  protected final void setupReplication() throws Exception {
+  final void setupReplication() throws Exception {
     Admin admin1 = UTIL1.getAdmin();
     admin1.createTable(t1SyncupSource);
     admin1.createTable(t2SyncupSource);
@@ -135,7 +136,33 @@ public abstract class TestReplicationSyncUpToolBase {
     admin1.addReplicationPeer("1", rpc);
   }
 
-  protected final void syncUp(HBaseTestingUtility util) throws Exception {
+  final void syncUp(HBaseTestingUtility util) throws Exception {
     ToolRunner.run(util.getConfiguration(), new ReplicationSyncUp(), new String[0]);
   }
+
+  // Utilities that manager shutdown / restart of source / sink clusters. They take care of
+  // invalidating stale connections after shutdown / restarts.
+  final void shutDownSourceHBaseCluster() throws Exception {
+    IOUtils.closeQuietly(ht1Source, ht2Source);
+    UTIL1.shutdownMiniHBaseCluster();
+  }
+
+  final void shutDownTargetHBaseCluster() throws Exception {
+    IOUtils.closeQuietly(ht1TargetAtPeer1, ht2TargetAtPeer1);
+    UTIL2.shutdownMiniHBaseCluster();
+  }
+
+  final void restartSourceHBaseCluster(int numServers) throws Exception {
+    IOUtils.closeQuietly(ht1Source, ht2Source);
+    UTIL1.restartHBaseCluster(numServers);
+    ht1Source = UTIL1.getConnection().getTable(TN1);
+    ht2Source = UTIL1.getConnection().getTable(TN2);
+  }
+
+  final void restartTargetHBaseCluster(int numServers) throws Exception {
+    IOUtils.closeQuietly(ht1TargetAtPeer1, ht2TargetAtPeer1);
+    UTIL2.restartHBaseCluster(numServers);
+    ht1TargetAtPeer1 = UTIL2.getConnection().getTable(TN1);
+    ht2TargetAtPeer1 = UTIL2.getConnection().getTable(TN2);
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
index 6247c22..5c4fc91 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -112,7 +112,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
   private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
       throws Exception {
     LOG.debug("mimicSyncUpAfterBulkLoad");
-    UTIL2.shutdownMiniHBaseCluster();
+    shutDownTargetHBaseCluster();
 
     loadAndReplicateHFiles(false, randomHFileRangeListIterator);
 
@@ -124,8 +124,8 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
     assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
       rowCount_ht2Source);
 
-    UTIL1.shutdownMiniHBaseCluster();
-    UTIL2.restartHBaseCluster(1);
+    shutDownSourceHBaseCluster();
+    restartTargetHBaseCluster(1);
 
     Thread.sleep(SLEEP_TIME);
 
@@ -146,7 +146,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
       if (i == NB_RETRIES - 1) {
         if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) {
           // syncUP still failed. Let's look at the source in case anything wrong there
-          UTIL1.restartHBaseCluster(1);
+          restartSourceHBaseCluster(1);
           rowCount_ht1Source = countRows(ht1Source);
           LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
           rowCount_ht2Source = countRows(ht2Source);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index 953b0c9..af41e05 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
@@ -114,7 +115,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
     }
     HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
       walCoprocs);
-    HTU.startMiniCluster(NB_SERVERS);
+    StartMiniClusterOption option = StartMiniClusterOption.builder().numAlwaysStandByMasters(1).
+        numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
+    HTU.startMiniCluster(option);
 
     // Create table then get the single region for our new table.
     HTableDescriptor htd = HTU.createTableDescriptor(tableName.getNameAsString());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/SecureTestCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/SecureTestCluster.java
index 2263bde..964ef14 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/SecureTestCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/SecureTestCluster.java
@@ -80,8 +80,8 @@ public class SecureTestCluster {
   public static void tearDown() throws Exception {
     if (CLUSTER != null) {
       CLUSTER.shutdown();
+      CLUSTER.join();
     }
-    CLUSTER.join();
     if (KDC != null) {
       KDC.stop();
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestDelegationTokenWithEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestDelegationTokenWithEncryption.java
index 9353576..0a13df0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestDelegationTokenWithEncryption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestDelegationTokenWithEncryption.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.security.token;
 
 import static org.junit.Assert.assertArrayEquals;
-
 import java.util.Arrays;
 import java.util.Collection;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -32,7 +31,6 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
 import org.apache.hadoop.hbase.ipc.NettyRpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -73,8 +71,10 @@ public class TestDelegationTokenWithEncryption extends SecureTestCluster {
   }
 
   @Parameters(name = "{index}: rpcClientImpl={0}")
-  public static Collection<Object[]> parameters() {
-    return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() },
+  public static Collection<Object> parameters() {
+    // Client connection supports only non-blocking RPCs (due to master registry restriction), hence
+    // we only test NettyRpcClient.
+    return Arrays.asList(
       new Object[] { NettyRpcClient.class.getName() });
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestGenerateDelegationToken.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestGenerateDelegationToken.java
index ee5f18f..bb6208d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestGenerateDelegationToken.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestGenerateDelegationToken.java
@@ -21,7 +21,6 @@ import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
-
 import com.google.protobuf.ServiceException;
 import java.io.IOException;
 import java.util.Arrays;
@@ -32,7 +31,6 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.NettyRpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
@@ -76,8 +74,10 @@ public class TestGenerateDelegationToken extends SecureTestCluster {
   }
 
   @Parameters(name = "{index}: rpcClientImpl={0}")
-  public static Collection<Object[]> parameters() {
-    return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() },
+  public static Collection<Object> parameters() {
+    // Client connection supports only non-blocking RPCs (due to master registry restriction), hence
+    // we only test NettyRpcClient.
+    return Arrays.asList(
       new Object[] { NettyRpcClient.class.getName() });
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index a2981fb..d0fbd3f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
@@ -382,6 +383,10 @@ public class TestTokenAuthentication {
   @Before
   public void setUp() throws Exception {
     TEST_UTIL = new HBaseTestingUtility();
+    // Override the connection registry to avoid spinning up a mini cluster for the connection below
+    // to go through.
+    TEST_UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
     TEST_UTIL.startMiniZKCluster();
     // register token type for protocol
     SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(),