You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/08/04 16:16:41 UTC

[hbase] branch branch-2 updated: HBASE-26150 Let region server also carry ClientMetaService (#3550)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 620806e  HBASE-26150 Let region server also carry ClientMetaService (#3550)
620806e is described below

commit 620806e3fb27947fa3b36f24921060e328f4b39d
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Aug 4 23:44:10 2021 +0800

    HBASE-26150 Let region server also carry ClientMetaService (#3550)
    
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
 .../client/AbstractRpcBasedConnectionRegistry.java | 272 ++++++++++++++++++++
 .../hbase/client/ConnectionRegistryFactory.java    |   4 +-
 .../hbase/client/MasterAddressRefresher.java       | 126 ---------
 .../apache/hadoop/hbase/client/MasterRegistry.java | 283 +++------------------
 .../hbase/client/RegistryEndpointsRefresher.java   | 137 ++++++++++
 .../hadoop/hbase/client/RpcConnectionRegistry.java |  99 +++++++
 .../apache/hadoop/hbase/security/SecurityInfo.java |   9 +-
 ...s.java => TestRpcBasedRegistryHedgedReads.java} |  66 +++--
 .../src/main/protobuf/Master.proto                 |  62 -----
 .../src/main/protobuf/Registry.proto               | 108 ++++++++
 .../{master => }/MetaRegionLocationCache.java      |  14 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    |  12 +-
 .../hadoop/hbase/master/MasterRpcServices.java     |  57 +++--
 .../hadoop/hbase/regionserver/HRegionServer.java   |  22 ++
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 106 ++++++--
 .../hadoop/hbase/security/HBasePolicyProvider.java |  14 +-
 .../hadoop/hbase/client/FromClientSideBase.java    |   2 +-
 .../hbase/client/TestMasterAddressRefresher.java   | 113 --------
 .../hadoop/hbase/client/TestMasterRegistry.java    |  16 +-
 .../hbase/client/TestMetaRegionLocationCache.java  |  19 +-
 .../client/TestRegistryEndpointsRefresher.java     | 114 +++++++++
 .../hbase/client/TestRpcConnectionRegistry.java    | 103 ++++++++
 .../hbase/client/TestScannersFromClientSide.java   |   2 +-
 .../hbase/master/TestClientMetaServiceRPCs.java    |  16 +-
 .../zookeeper/RegionServerAddressTracker.java      |  78 ++++++
 .../zookeeper/TestRegionServerAddressTracker.java  | 121 +++++++++
 26 files changed, 1304 insertions(+), 671 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java
new file mode 100644
index 0000000..6a2919e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
+import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
+
+/**
+ * Base class for rpc based connection registry implementation.
+ * <p/>
+ * The implementation needs a bootstrap node list in configuration, and then it will use the methods
+ * in {@link ClientMetaService} to refresh the connection registry end points.
+ * <p/>
+ * It also supports hedged reads, the default fan out value is 2.
+ * <p/>
+ * For the actual configuration names, see javadoc of sub classes.
+ */
+@InterfaceAudience.Private
+abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry {
+
+  /** Default value for the fan out of hedged requests. **/
+  public static final int HEDGED_REQS_FANOUT_DEFAULT = 2;
+
+  private final int hedgedReadFanOut;
+
+  // Configured list of end points to probe the meta information from.
+  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> addr2Stub;
+
+  // RPC client used to talk to the masters.
+  private final RpcClient rpcClient;
+  private final RpcControllerFactory rpcControllerFactory;
+  private final int rpcTimeoutMs;
+
+  private final RegistryEndpointsRefresher registryEndpointRefresher;
+
+  protected AbstractRpcBasedConnectionRegistry(Configuration conf,
+    String hedgedReqsFanoutConfigName, String refreshIntervalSecsConfigName,
+    String minRefreshIntervalSecsConfigName) throws IOException {
+    this.hedgedReadFanOut =
+      Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+      conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+    // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
+    // this through the master registry...
+    // This is a problem as we will use the cluster id to determine the authentication method
+    rpcClient = RpcClientFactory.createClient(conf, null);
+    rpcControllerFactory = RpcControllerFactory.instantiate(conf);
+    populateStubs(getBootstrapNodes(conf));
+    registryEndpointRefresher = new RegistryEndpointsRefresher(conf, refreshIntervalSecsConfigName,
+      minRefreshIntervalSecsConfigName, this::refreshStubs);
+    registryEndpointRefresher.start();
+  }
+
+  protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
+
+  protected abstract CompletableFuture<Set<ServerName>> fetchEndpoints();
+
+  private void refreshStubs() throws IOException {
+    populateStubs(FutureUtils.get(fetchEndpoints()));
+  }
+
+  private void populateStubs(Set<ServerName> addrs) throws IOException {
+    Preconditions.checkNotNull(addrs);
+    ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
+      ImmutableMap.builderWithExpectedSize(addrs.size());
+    User user = User.getCurrent();
+    for (ServerName masterAddr : addrs) {
+      builder.put(masterAddr,
+        ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
+    }
+    addr2Stub = builder.build();
+  }
+
+  /**
+   * For describing the actual asynchronous rpc call.
+   * <p/>
+   * Typically, you can use lambda expression to implement this interface as
+   *
+   * <pre>
+   * (c, s, d) -> s.xxx(c, your request here, d)
+   * </pre>
+   */
+  @FunctionalInterface
+  protected interface Callable<T> {
+    void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
+  }
+
+  private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
+    Callable<T> callable) {
+    HBaseRpcController controller = rpcControllerFactory.newController();
+    CompletableFuture<T> future = new CompletableFuture<>();
+    callable.call(controller, stub, resp -> {
+      if (controller.failed()) {
+        IOException failureReason = controller.getFailed();
+        future.completeExceptionally(failureReason);
+        if (ClientExceptionsUtil.isConnectionException(failureReason)) {
+          // RPC has failed, trigger a refresh of end points. We can have some spurious
+          // refreshes, but that is okay since the RPC is not expensive and not in a hot path.
+          registryEndpointRefresher.refreshNow();
+        }
+      } else {
+        future.complete(resp);
+      }
+    });
+    return future;
+  }
+
+  private IOException badResponse(String debug) {
+    return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
+  }
+
+  /**
+   * send requests concurrently to hedgedReadsFanout end points. If any of the request is succeeded,
+   * we will complete the future and quit. If all the requests in one round are failed, we will
+   * start another round to send requests concurrently tohedgedReadsFanout end points. If all end
+   * points have been tried and all of them are failed, we will fail the future.
+   */
+  private <T extends Message> void groupCall(CompletableFuture<T> future, Set<ServerName> servers,
+    List<ClientMetaService.Interface> stubs, int startIndexInclusive, Callable<T> callable,
+    Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
+    int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, stubs.size());
+    AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
+    for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
+      addListener(call(stubs.get(i), callable), (r, e) -> {
+        // a simple check to skip all the later operations earlier
+        if (future.isDone()) {
+          return;
+        }
+        if (e == null && !isValidResp.test(r)) {
+          e = badResponse(debug);
+        }
+        if (e != null) {
+          // make sure when remaining reaches 0 we have all exceptions in the errors queue
+          errors.add(e);
+          if (remaining.decrementAndGet() == 0) {
+            if (endIndexExclusive == stubs.size()) {
+              // we are done, complete the future with exception
+              RetriesExhaustedException ex =
+                new RetriesExhaustedException("masters", stubs.size(), new ArrayList<>(errors));
+              future.completeExceptionally(new MasterRegistryFetchException(servers, ex));
+            } else {
+              groupCall(future, servers, stubs, endIndexExclusive, callable, isValidResp, debug,
+                errors);
+            }
+          }
+        } else {
+          // do not need to decrement the counter any more as we have already finished the future.
+          future.complete(r);
+        }
+      });
+    }
+  }
+
+  protected final <T extends Message> CompletableFuture<T> call(Callable<T> callable,
+    Predicate<T> isValidResp, String debug) {
+    ImmutableMap<ServerName, ClientMetaService.Interface> addr2StubRef = addr2Stub;
+    Set<ServerName> servers = addr2StubRef.keySet();
+    List<ClientMetaService.Interface> stubs = new ArrayList<>(addr2StubRef.values());
+    Collections.shuffle(stubs, ThreadLocalRandom.current());
+    CompletableFuture<T> future = new CompletableFuture<>();
+    groupCall(future, servers, stubs, 0, callable, isValidResp, debug,
+      new ConcurrentLinkedQueue<>());
+    return future;
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+    allowedOnPath = ".*/src/test/.*")
+  Set<ServerName> getParsedServers() {
+    return addr2Stub.keySet();
+  }
+
+  /**
+   * Simple helper to transform the result of getMetaRegionLocations() rpc.
+   */
+  private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
+    List<HRegionLocation> regionLocations = new ArrayList<>();
+    resp.getMetaLocationsList()
+      .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
+    return new RegionLocations(regionLocations);
+  }
+
+  @Override
+  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
+    return this
+      .<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
+        GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
+        "getMetaLocationsCount")
+      .thenApply(AbstractRpcBasedConnectionRegistry::transformMetaRegionLocations);
+  }
+
+  @Override
+  public CompletableFuture<String> getClusterId() {
+    return this
+      .<GetClusterIdResponse> call(
+        (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
+        GetClusterIdResponse::hasClusterId, "getClusterId()")
+      .thenApply(GetClusterIdResponse::getClusterId);
+  }
+
+  @Override
+  public CompletableFuture<ServerName> getActiveMaster() {
+    return this
+      .<GetActiveMasterResponse> call(
+        (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
+        GetActiveMasterResponse::hasServerName, "getActiveMaster()")
+      .thenApply(resp -> ProtobufUtil.toServerName(resp.getServerName()));
+  }
+
+  @Override
+  public void close() {
+    if (registryEndpointRefresher != null) {
+      registryEndpointRefresher.stop();
+    }
+    if (rpcClient != null) {
+      rpcClient.close();
+    }
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
index 9308443..e9af7e7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
@@ -35,8 +35,8 @@ final class ConnectionRegistryFactory {
    * @return The connection registry implementation to use.
    */
   static ConnectionRegistry getRegistry(Configuration conf) {
-    Class<? extends ConnectionRegistry> clazz = conf.getClass(
-        CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
+    Class<? extends ConnectionRegistry> clazz =
+      conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
         ConnectionRegistry.class);
     return ReflectionUtils.newInstance(clazz, conf);
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java
deleted file mode 100644
index 3cbb9f7..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
-
-/**
- * Thread safe utility that keeps master end points used by {@link MasterRegistry} up to date. This
- * uses the RPC {@link ClientMetaService#getMasters} to fetch the latest list of registered masters.
- * By default the refresh happens periodically (configured via
- * {@link #PERIODIC_REFRESH_INTERVAL_SECS}). The refresh can also be triggered on demand via
- * {@link #refreshNow()}. To prevent a flood of on-demand refreshes we expect that any attempts two
- * should be spaced at least {@link #MIN_SECS_BETWEEN_REFRESHES} seconds apart.
- */
-@InterfaceAudience.Private
-public class MasterAddressRefresher implements Closeable {
-  private static final Logger LOG = LoggerFactory.getLogger(MasterAddressRefresher.class);
-  public static final String PERIODIC_REFRESH_INTERVAL_SECS =
-      "hbase.client.master_registry.refresh_interval_secs";
-  private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
-  public static final String MIN_SECS_BETWEEN_REFRESHES =
-      "hbase.client.master_registry.min_secs_between_refreshes";
-  private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
-
-  private final ExecutorService pool;
-  private final MasterRegistry registry;
-  private final long periodicRefreshMs;
-  private final long timeBetweenRefreshesMs;
-  private final Object refreshMasters = new Object();
-
-  @Override
-  public void close() {
-    pool.shutdownNow();
-  }
-
-  /**
-   * Thread that refreshes the master end points until it is interrupted via {@link #close()}.
-   * Multiple callers attempting to refresh at the same time synchronize on {@link #refreshMasters}.
-   */
-  private class RefreshThread implements Runnable {
-    @Override
-    public void run() {
-      long lastRpcTs = 0;
-      while (!Thread.interrupted()) {
-        try {
-          // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
-          // have duplicate refreshes because once the thread is past the wait(), notify()s are
-          // ignored until the thread is back to the waiting state.
-          synchronized (refreshMasters) {
-            refreshMasters.wait(periodicRefreshMs);
-          }
-          long currentTs = EnvironmentEdgeManager.currentTime();
-          if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) {
-            continue;
-          }
-          lastRpcTs = currentTs;
-          LOG.debug("Attempting to refresh master address end points.");
-          Set<ServerName> newMasters = new HashSet<>(registry.getMasters().get());
-          registry.populateMasterStubs(newMasters);
-          LOG.debug("Finished refreshing master end points. {}", newMasters);
-        } catch (InterruptedException e) {
-          LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
-          break;
-        } catch (ExecutionException | IOException e) {
-          LOG.debug("Error populating latest list of masters.", e);
-        }
-      }
-      LOG.info("Master end point refresher loop exited.");
-    }
-  }
-
-  MasterAddressRefresher(Configuration conf, MasterRegistry registry) {
-    pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat("master-registry-refresh-end-points").setDaemon(true).build());
-    periodicRefreshMs = TimeUnit.SECONDS.toMillis(conf.getLong(PERIODIC_REFRESH_INTERVAL_SECS,
-        PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
-    timeBetweenRefreshesMs = TimeUnit.SECONDS.toMillis(conf.getLong(MIN_SECS_BETWEEN_REFRESHES,
-        MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
-    Preconditions.checkArgument(periodicRefreshMs > 0);
-    Preconditions.checkArgument(timeBetweenRefreshesMs < periodicRefreshMs);
-    this.registry = registry;
-    pool.submit(new RefreshThread());
-  }
-
-  /**
-   * Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh.
-   * See class comment for details.
-   */
-  void refreshNow() {
-    synchronized (refreshMasters) {
-      refreshMasters.notify();
-    }
-  }
-}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
index 1a46579..76477aa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -19,50 +19,27 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
 import static org.apache.hadoop.hbase.util.DNS.getHostname;
-import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
-import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.RpcClient;
-import org.apache.hadoop.hbase.ipc.RpcClientFactory;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
 import org.apache.hadoop.hbase.util.DNS.ServerType;
+import org.apache.yetus.audience.InterfaceAudience;
+
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.base.Strings;
-import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
-import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
 
 /**
  * Master based registry implementation. Makes RPCs to the configured master addresses from config
@@ -70,40 +47,31 @@ import org.apache.yetus.audience.InterfaceAudience;
  * <p/>
  * It supports hedged reads, set the fan out of the requests batch by
  * {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
- * it(the default value is {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT}).
+ * it(the default value is {@link AbstractRpcBasedConnectionRegistry#HEDGED_REQS_FANOUT_DEFAULT}).
  * <p/>
  * TODO: Handle changes to the configuration dynamically without having to restart the client.
  */
 @InterfaceAudience.Private
-public class MasterRegistry implements ConnectionRegistry {
+public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
 
   /** Configuration key that controls the fan out of requests **/
   public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
     "hbase.client.master_registry.hedged.fanout";
 
-  /** Default value for the fan out of hedged requests. **/
-  public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2;
-
-  private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
+  public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS =
+    "hbase.client.master_registry.refresh_interval_secs";
 
-  private final int hedgedReadFanOut;
+  public static final String MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES =
+    "hbase.client.master_registry.min_secs_between_refreshes";
 
-  // Configured list of masters to probe the meta information from.
-  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
-
-  // RPC client used to talk to the masters.
-  private final RpcClient rpcClient;
-  private final RpcControllerFactory rpcControllerFactory;
-  private final int rpcTimeoutMs;
-
-  protected final MasterAddressRefresher masterAddressRefresher;
+  private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
 
   /**
    * Parses the list of master addresses from the provided configuration. Supported format is comma
    * separated host[:port] values. If no port number if specified, default master port is assumed.
    * @param conf Configuration to parse from.
    */
-  private static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException {
+  public static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException {
     Set<ServerName> masterAddrs = new HashSet<>();
     String configuredMasters = getMasterAddr(conf);
     for (String masterAddr : configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
@@ -116,31 +84,18 @@ public class MasterRegistry implements ConnectionRegistry {
   }
 
   MasterRegistry(Configuration conf) throws IOException {
-    this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
-      MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
-      conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
-    // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
-    // this through the master registry...
-    // This is a problem as we will use the cluster id to determine the authentication method
-    rpcClient = RpcClientFactory.createClient(conf, null);
-    rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
-    // by fetching the end points from this list.
-    populateMasterStubs(parseMasterAddrs(conf));
-    masterAddressRefresher = new MasterAddressRefresher(conf, this);
+    super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
+      MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
   }
 
-  void populateMasterStubs(Set<ServerName> masters) throws IOException {
-    Preconditions.checkNotNull(masters);
-    ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
-        ImmutableMap.builderWithExpectedSize(masters.size());
-    User user = User.getCurrent();
-    for (ServerName masterAddr : masters) {
-      builder.put(masterAddr,
-          ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
-    }
-    masterAddr2Stub = builder.build();
+  @Override
+  protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
+    return parseMasterAddrs(conf);
+  }
+
+  @Override
+  protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
+    return getMasters();
   }
 
   /**
@@ -158,186 +113,18 @@ public class MasterRegistry implements ConnectionRegistry {
     return String.format("%s:%d", hostname, port);
   }
 
-  /**
-   * For describing the actual asynchronous rpc call.
-   * <p/>
-   * Typically, you can use lambda expression to implement this interface as
-   *
-   * <pre>
-   * (c, s, d) -> s.xxx(c, your request here, d)
-   * </pre>
-   */
-  @FunctionalInterface
-  private interface Callable<T> {
-    void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
-  }
-
-  private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
-    Callable<T> callable) {
-    HBaseRpcController controller = rpcControllerFactory.newController();
-    CompletableFuture<T> future = new CompletableFuture<>();
-    callable.call(controller, stub, resp -> {
-      if (controller.failed()) {
-        IOException failureReason = controller.getFailed();
-        future.completeExceptionally(failureReason);
-        if (ClientExceptionsUtil.isConnectionException(failureReason)) {
-          // RPC has failed, trigger a refresh of master end points. We can have some spurious
-          // refreshes, but that is okay since the RPC is not expensive and not in a hot path.
-          masterAddressRefresher.refreshNow();
-        }
-      } else {
-        future.complete(resp);
-      }
-    });
-    return future;
-  }
-
-  private IOException badResponse(String debug) {
-    return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
-  }
-
-  /**
-   * send requests concurrently to hedgedReadsFanout masters. If any of the request is succeeded, we
-   * will complete the future and quit. If all the requests in one round are failed, we will start
-   * another round to send requests concurrently tohedgedReadsFanout masters. If all masters have
-   * been tried and all of them are failed, we will fail the future.
-   */
-  private <T extends Message> void groupCall(CompletableFuture<T> future,
-      Set<ServerName> masterServers, List<ClientMetaService.Interface> masterStubs,
-      int startIndexInclusive, Callable<T> callable, Predicate<T> isValidResp, String debug,
-      ConcurrentLinkedQueue<Throwable> errors) {
-    int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
-    AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
-    for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
-      addListener(call(masterStubs.get(i), callable), (r, e) -> {
-        // a simple check to skip all the later operations earlier
-        if (future.isDone()) {
-          return;
-        }
-        if (e == null && !isValidResp.test(r)) {
-          e = badResponse(debug);
-        }
-        if (e != null) {
-          // make sure when remaining reaches 0 we have all exceptions in the errors queue
-          errors.add(e);
-          if (remaining.decrementAndGet() == 0) {
-            if (endIndexExclusive == masterStubs.size()) {
-              // we are done, complete the future with exception
-              RetriesExhaustedException ex = new RetriesExhaustedException("masters",
-                masterStubs.size(), new ArrayList<>(errors));
-              future.completeExceptionally(
-                new MasterRegistryFetchException(masterServers, ex));
-            } else {
-              groupCall(future, masterServers, masterStubs, endIndexExclusive, callable,
-                  isValidResp, debug, errors);
-            }
-          }
-        } else {
-          // do not need to decrement the counter any more as we have already finished the future.
-          future.complete(r);
-        }
-      });
-    }
-  }
-
-  private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
-    Predicate<T> isValidResp, String debug) {
-    ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2StubRef = masterAddr2Stub;
-    Set<ServerName> masterServers = masterAddr2StubRef.keySet();
-    List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2StubRef.values());
-    Collections.shuffle(masterStubs, ThreadLocalRandom.current());
-    CompletableFuture<T> future = new CompletableFuture<>();
-    groupCall(future, masterServers, masterStubs, 0, callable, isValidResp, debug,
-        new ConcurrentLinkedQueue<>());
-    return future;
-  }
-
-  /**
-   * Simple helper to transform the result of getMetaRegionLocations() rpc.
-   */
-  private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
-    List<HRegionLocation> regionLocations = new ArrayList<>();
-    resp.getMetaLocationsList()
-      .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
-    return new RegionLocations(regionLocations);
-  }
-
-  @Override
-  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
-    return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
-      GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
-      "getMetaLocationsCount").thenApply(MasterRegistry::transformMetaRegionLocations);
+  private static Set<ServerName> transformServerNames(GetMastersResponse resp) {
+    return resp.getMasterServersList().stream()
+      .map(s -> ProtobufUtil.toServerName(s.getServerName())).collect(Collectors.toSet());
   }
 
-  @Override
-  public CompletableFuture<String> getClusterId() {
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+    allowedOnPath = ".*/(.*/MasterRegistry.java|src/test/.*)")
+  CompletableFuture<Set<ServerName>> getMasters() {
     return this
-      .<GetClusterIdResponse> call(
-        (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
-        GetClusterIdResponse::hasClusterId, "getClusterId()")
-      .thenApply(GetClusterIdResponse::getClusterId);
-  }
-
-  private static boolean hasActiveMaster(GetMastersResponse resp) {
-    List<GetMastersResponseEntry> activeMasters =
-        resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
-        Collectors.toList());
-    return activeMasters.size() == 1;
-  }
-
-  private static ServerName filterActiveMaster(GetMastersResponse resp) throws IOException {
-    List<GetMastersResponseEntry> activeMasters =
-        resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
-            Collectors.toList());
-    if (activeMasters.size() != 1) {
-      throw new IOException(String.format("Incorrect number of active masters encountered." +
-          " Expected: 1 found: %d. Content: %s", activeMasters.size(), activeMasters));
-    }
-    return ProtobufUtil.toServerName(activeMasters.get(0).getServerName());
-  }
-
-  @Override
-  public CompletableFuture<ServerName> getActiveMaster() {
-    CompletableFuture<ServerName> future = new CompletableFuture<>();
-    addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
-      MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> {
-        if (ex != null) {
-          future.completeExceptionally(ex);
-        }
-        ServerName result = null;
-        try {
-          result = filterActiveMaster((GetMastersResponse)resp);
-        } catch (IOException e) {
-          future.completeExceptionally(e);
-        }
-        future.complete(result);
-      });
-    return future;
-  }
-
-  private static List<ServerName> transformServerNames(GetMastersResponse resp) {
-    return resp.getMasterServersList().stream().map(s -> ProtobufUtil.toServerName(
-        s.getServerName())).collect(Collectors.toList());
-  }
-
-  CompletableFuture<List<ServerName>> getMasters() {
-    return this
-        .<GetMastersResponse> call((c, s, d) -> s.getMasters(
-            c, GetMastersRequest.getDefaultInstance(), d), r -> r.getMasterServersCount() != 0,
-            "getMasters()").thenApply(MasterRegistry::transformServerNames);
-  }
-
-  Set<ServerName> getParsedMasterServers() {
-    return masterAddr2Stub.keySet();
-  }
-
-  @Override
-  public void close() {
-    if (masterAddressRefresher != null) {
-      masterAddressRefresher.close();
-    }
-    if (rpcClient != null) {
-      rpcClient.close();
-    }
+      .<GetMastersResponse> call(
+        (c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
+        r -> r.getMasterServersCount() != 0, "getMasters()")
+      .thenApply(MasterRegistry::transformServerNames);
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java
new file mode 100644
index 0000000..7eb81b0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * Thread safe utility that keeps registry end points used by {@link ConnectionRegistry} up to date.
+ * By default the refresh happens periodically (configured via {@code intervalSecsConfigName}). The
+ * refresh can also be triggered on demand via {@link #refreshNow()}. To prevent a flood of
+ * on-demand refreshes we expect that any attempts two should be spaced at least
+ * {@code minIntervalSecsConfigName} seconds apart.
+ */
+@InterfaceAudience.Private
+class RegistryEndpointsRefresher {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class);
+
+  public static final String PERIODIC_REFRESH_INTERVAL_SECS =
+    "hbase.client.rpc_registry.refresh_interval_secs";
+  private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
+
+  public static final String MIN_SECS_BETWEEN_REFRESHES =
+    "hbase.client.rpc_registry.min_secs_between_refreshes";
+  private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
+
+  private final Thread thread;
+  private final Refresher refresher;
+  private final long periodicRefreshMs;
+  private final long minTimeBetweenRefreshesMs;
+
+  private boolean refreshNow = false;
+  private boolean stopped = false;
+
+  public void start() {
+    thread.start();
+  }
+
+  public synchronized void stop() {
+    stopped = true;
+    notifyAll();
+  }
+
+  // The main loop for the refresh thread.
+  private void mainLoop() {
+    long lastRefreshTime = EnvironmentEdgeManager.currentTime();
+    for (;;) {
+      synchronized (this) {
+        for (;;) {
+          if (stopped) {
+            LOG.info("Registry end points refresher loop exited.");
+            return;
+          }
+          // if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed,
+          // otherwise wait until periodicRefreshMs elapsed
+          long waitTime = (refreshNow ? minTimeBetweenRefreshesMs : periodicRefreshMs) -
+            (EnvironmentEdgeManager.currentTime() - lastRefreshTime);
+          if (waitTime <= 0) {
+            break;
+          }
+          try {
+            wait(waitTime);
+          } catch (InterruptedException e) {
+            LOG.warn("Interrupted during wait", e);
+            Thread.currentThread().interrupt();
+            continue;
+          }
+        }
+        // we are going to refresh, reset this flag
+        refreshNow = false;
+      }
+      LOG.debug("Attempting to refresh registry end points");
+      try {
+        refresher.refresh();
+      } catch (IOException e) {
+        LOG.warn("Error refresh registry end points", e);
+      }
+      // We do not think it is a big deal to fail one time, so no matter what is refresh result, we
+      // just update this refresh time and wait for the next round. If later this becomes critical,
+      // could change to only update this value when we have done a successful refreshing.
+      lastRefreshTime = EnvironmentEdgeManager.currentTime();
+      LOG.debug("Finished refreshing registry end points");
+    }
+  }
+
+  @FunctionalInterface
+  public interface Refresher {
+
+    void refresh() throws IOException;
+  }
+
+  RegistryEndpointsRefresher(Configuration conf, String intervalSecsConfigName,
+    String minIntervalSecsConfigName, Refresher refresher) {
+    periodicRefreshMs = TimeUnit.SECONDS
+      .toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
+    minTimeBetweenRefreshesMs = TimeUnit.SECONDS
+      .toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
+    Preconditions.checkArgument(periodicRefreshMs > 0);
+    Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
+    thread = new Thread(this::mainLoop);
+    thread.setName("Registry-endpoints-refresh-end-points");
+    thread.setDaemon(true);
+    this.refresher = refresher;
+  }
+
+  /**
+   * Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh.
+   * See class comment for details.
+   */
+  synchronized void refreshNow() {
+    refreshNow = true;
+    notifyAll();
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java
new file mode 100644
index 0000000..0096bfc
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
+
+/**
+ * Rpc based connection registry. It will make use of the {@link ClientMetaService} to get registry
+ * information.
+ * <p/>
+ * It needs bootstrap node list when start up, and then it will use {@link ClientMetaService} to
+ * refresh the bootstrap node list periodically.
+ * <p/>
+ * Usually, you could set masters as the bootstrap nodes,as they will also implement the
+ * {@link ClientMetaService}, and then, we will switch to use region servers after refreshing the
+ * bootstrap nodes.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
+
+  /** Configuration key that controls the fan out of requests **/
+  public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.rpc_registry.hedged.fanout";
+
+  public static final String PERIODIC_REFRESH_INTERVAL_SECS =
+    "hbase.client.bootstrap.refresh_interval_secs";
+
+  public static final String MIN_SECS_BETWEEN_REFRESHES =
+    "hbase.client.bootstrap.min_secs_between_refreshes";
+
+  public static final String BOOTSTRAP_NODES = "hbase.client.bootstrap.servers";
+
+  private static final char ADDRS_CONF_SEPARATOR = ',';
+
+  RpcConnectionRegistry(Configuration conf) throws IOException {
+    super(conf, HEDGED_REQS_FANOUT_KEY, PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES);
+  }
+
+  @Override
+  protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
+    // try get bootstrap nodes config first
+    String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES);
+    if (!StringUtils.isBlank(configuredBootstrapNodes)) {
+      return Splitter.on(ADDRS_CONF_SEPARATOR).trimResults().splitToStream(configuredBootstrapNodes)
+        .map(addr -> ServerName.valueOf(addr, ServerName.NON_STARTCODE))
+        .collect(Collectors.toSet());
+    } else {
+      // otherwise, just use master addresses
+      return MasterRegistry.parseMasterAddrs(conf);
+    }
+  }
+
+  private static Set<ServerName> transformServerNames(GetBootstrapNodesResponse resp) {
+    return resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
+      .collect(Collectors.toSet());
+  }
+
+  private CompletableFuture<Set<ServerName>> getBootstrapNodes() {
+    return this
+      .<GetBootstrapNodesResponse> call(
+        (c, s, d) -> s.getBootstrapNodes(c, GetBootstrapNodesRequest.getDefaultInstance(), d),
+        r -> r.getServerNameCount() != 0, "getBootstrapNodes()")
+      .thenApply(RpcConnectionRegistry::transformServerNames);
+  }
+
+  @Override
+  protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
+    return getBootstrapNodes();
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
index e5b4de2..a2f086f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
@@ -19,14 +19,15 @@ package org.apache.hadoop.hbase.security;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
+import org.apache.yetus.audience.InterfaceAudience;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos;
 
 /**
  * Maps RPC protocol interfaces to required configuration
@@ -49,7 +50,7 @@ public class SecurityInfo {
         new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
     infos.put(MasterProtos.HbckService.getDescriptor().getName(),
         new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
-    infos.put(MasterProtos.ClientMetaService.getDescriptor().getName(),
+    infos.put(RegistryProtos.ClientMetaService.getDescriptor().getName(),
         new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
     // NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider ALSO ELSE
     // new Service will not be found when all is Kerberized!!!!
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java
similarity index 78%
rename from hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java
rename to hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java
index 40a38c7..dacfbd1 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java
@@ -33,7 +33,6 @@ import java.util.stream.IntStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
@@ -58,22 +57,30 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
 
 @Category({ ClientTests.class, SmallTests.class })
-public class TestMasterRegistryHedgedReads {
+public class TestRpcBasedRegistryHedgedReads {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestMasterRegistryHedgedReads.class);
+    HBaseClassTestRule.forClass(TestRpcBasedRegistryHedgedReads.class);
 
-  private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class);
+
+  private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout";
+  private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
+    "hbase.test.refresh.interval.secs";
+  private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
+    "hbase.test.min.refresh.interval.secs";
 
   private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
 
   private static final ExecutorService EXECUTOR =
     Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
 
+  private static Set<ServerName> BOOTSTRAP_NODES;
+
   private static AtomicInteger CALLED = new AtomicInteger(0);
 
   private static volatile int BAD_RESP_INDEX;
@@ -142,14 +149,35 @@ public class TestMasterRegistryHedgedReads {
     }
   }
 
+  private AbstractRpcBasedConnectionRegistry createRegistry(int hedged) throws IOException {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged);
+    return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME,
+      REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
+
+      @Override
+      protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
+        return BOOTSTRAP_NODES;
+      }
+
+      @Override
+      protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
+        return CompletableFuture.completedFuture(BOOTSTRAP_NODES);
+      }
+    };
+  }
+
   @BeforeClass
   public static void setUpBeforeClass() {
     Configuration conf = UTIL.getConfiguration();
     conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
       RpcClient.class);
-    String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i))
-      .collect(Collectors.joining(","));
-    conf.set(HConstants.MASTER_ADDRS_KEY, masters);
+    // disable refresh, we do not need to refresh in this test
+    conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
+    conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
+    BOOTSTRAP_NODES = IntStream.range(0, 10)
+      .mapToObj(i -> ServerName.valueOf("localhost", (10000 + 100 * i), ServerName.NON_STARTCODE))
+      .collect(Collectors.toSet());
   }
 
   @AfterClass
@@ -175,9 +203,7 @@ public class TestMasterRegistryHedgedReads {
 
   @Test
   public void testAllFailNoHedged() throws IOException {
-    Configuration conf = UTIL.getConfiguration();
-    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1);
-    try (MasterRegistry registry = new MasterRegistry(conf)) {
+    try (AbstractRpcBasedConnectionRegistry registry = createRegistry(1)) {
       assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
       assertEquals(10, CALLED.get());
     }
@@ -185,10 +211,8 @@ public class TestMasterRegistryHedgedReads {
 
   @Test
   public void testAllFailHedged3() throws IOException {
-    Configuration conf = UTIL.getConfiguration();
-    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3);
     BAD_RESP_INDEX = 5;
-    try (MasterRegistry registry = new MasterRegistry(conf)) {
+    try (AbstractRpcBasedConnectionRegistry registry = createRegistry(3)) {
       assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
       assertEquals(10, CALLED.get());
     }
@@ -196,12 +220,10 @@ public class TestMasterRegistryHedgedReads {
 
   @Test
   public void testFirstSucceededNoHedge() throws IOException {
-    Configuration conf = UTIL.getConfiguration();
-    // will be set to 1
-    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0);
     GOOD_RESP_INDEXS =
       IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet());
-    try (MasterRegistry registry = new MasterRegistry(conf)) {
+    // will be set to 1
+    try (AbstractRpcBasedConnectionRegistry registry = createRegistry(0)) {
       String clusterId = logIfError(registry.getClusterId());
       assertEquals(RESP.getClusterId(), clusterId);
       assertEquals(1, CALLED.get());
@@ -210,10 +232,8 @@ public class TestMasterRegistryHedgedReads {
 
   @Test
   public void testSecondRoundSucceededHedge4() throws IOException {
-    Configuration conf = UTIL.getConfiguration();
-    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
     GOOD_RESP_INDEXS = Collections.singleton(6);
-    try (MasterRegistry registry = new MasterRegistry(conf)) {
+    try (AbstractRpcBasedConnectionRegistry registry = createRegistry(4)) {
       String clusterId = logIfError(registry.getClusterId());
       assertEquals(RESP.getClusterId(), clusterId);
       UTIL.waitFor(5000, () -> CALLED.get() == 8);
@@ -222,10 +242,8 @@ public class TestMasterRegistryHedgedReads {
 
   @Test
   public void testSucceededWithLargestHedged() throws IOException, InterruptedException {
-    Configuration conf = UTIL.getConfiguration();
-    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE);
     GOOD_RESP_INDEXS = Collections.singleton(5);
-    try (MasterRegistry registry = new MasterRegistry(conf)) {
+    try (AbstractRpcBasedConnectionRegistry registry = createRegistry(Integer.MAX_VALUE)) {
       String clusterId = logIfError(registry.getClusterId());
       assertEquals(RESP.getClusterId(), clusterId);
       UTIL.waitFor(5000, () -> CALLED.get() == 10);
diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto
index 5e2e6ce..045feb4 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto
@@ -1251,65 +1251,3 @@ service HbckService {
   rpc FixMeta(FixMetaRequest)
     returns(FixMetaResponse);
 }
-
-/** Request and response to get the clusterID for this cluster */
-message GetClusterIdRequest {
-}
-message GetClusterIdResponse {
-  /** Not set if cluster ID could not be determined. */
-  optional string cluster_id = 1;
-}
-
-/** Request and response to get the currently active master name for this cluster */
-message GetActiveMasterRequest {
-}
-message GetActiveMasterResponse {
-  /** Not set if an active master could not be determined. */
-  optional ServerName server_name = 1;
-}
-
-/** Request and response to get the current list of all registers master servers */
-message GetMastersRequest {
-}
-message GetMastersResponseEntry {
-  required ServerName server_name = 1;
-  required bool is_active = 2;
-}
-message GetMastersResponse {
-  repeated GetMastersResponseEntry master_servers = 1;
-}
-
-/** Request and response to get the current list of meta region locations */
-message GetMetaRegionLocationsRequest {
-}
-message GetMetaRegionLocationsResponse {
-  /** Not set if meta region locations could not be determined. */
-  repeated RegionLocation meta_locations = 1;
-}
-
-/**
- * Implements all the RPCs needed by clients to look up cluster meta information needed for
- * connection establishment.
- */
-service ClientMetaService {
-  /**
-   * Get Cluster ID for this cluster.
-   */
-  rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse);
-
-  /**
-   * Get active master server name for this cluster. Retained for out of sync client and master
-   * rolling upgrades. Newer clients switched to GetMasters RPC request.
-   */
-  rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse);
-
-  /**
-   * Get registered list of master servers in this cluster.
-   */
-  rpc GetMasters(GetMastersRequest) returns(GetMastersResponse);
-
-  /**
-   * Get current meta replicas' region locations.
-   */
-  rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
-}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Registry.proto b/hbase-protocol-shaded/src/main/protobuf/Registry.proto
new file mode 100644
index 0000000..8dd0d1a
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/Registry.proto
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto2";
+
+// The protos for ConnectionRegistry.
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "RegistryProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "HBase.proto";
+
+/** Request and response to get the clusterID for this cluster */
+message GetClusterIdRequest {
+}
+message GetClusterIdResponse {
+  /** Not set if cluster ID could not be determined. */
+  optional string cluster_id = 1;
+}
+
+/** Request and response to get the currently active master name for this cluster */
+message GetActiveMasterRequest {
+}
+message GetActiveMasterResponse {
+  /** Not set if an active master could not be determined. */
+  optional ServerName server_name = 1;
+}
+
+/** Request and response to get the current list of all registers master servers */
+message GetMastersRequest {
+  option deprecated = true;
+}
+message GetMastersResponseEntry {
+  option deprecated = true;
+  required ServerName server_name = 1;
+  required bool is_active = 2;
+}
+message GetMastersResponse {
+  option deprecated = true;
+  repeated GetMastersResponseEntry master_servers = 1;
+}
+
+/** Request and response to get the current list of meta region locations */
+message GetMetaRegionLocationsRequest {
+}
+message GetMetaRegionLocationsResponse {
+  /** Not set if meta region locations could not be determined. */
+  repeated RegionLocation meta_locations = 1;
+}
+
+/** Request and response to get the nodes which could be used to as ClientMetaService */
+message GetBootstrapNodesRequest {
+}
+message GetBootstrapNodesResponse {
+  repeated ServerName server_name = 1;
+}
+
+/**
+ * Implements all the RPCs needed by clients to look up cluster meta information needed for
+ * connection establishment.
+ */
+service ClientMetaService {
+  /**
+   * Get Cluster ID for this cluster.
+   */
+  rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse);
+
+  /**
+   * Get active master server name for this cluster. Retained for out of sync client and master
+   * rolling upgrades. Newer clients switched to GetMasters RPC request.
+   */
+  rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse);
+
+  /**
+   * Get registered list of master servers in this cluster.
+   */
+  rpc GetMasters(GetMastersRequest) returns(GetMastersResponse) {
+    option deprecated = true;
+  };
+
+  /**
+   * Get current meta replicas' region locations.
+   */
+  rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
+
+  /**
+   * Get nodes which could be used as ClientMetaService
+   */
+  rpc GetBootstrapNodes(GetBootstrapNodesRequest) returns (GetBootstrapNodesResponse);
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java
similarity index 96%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java
index 07512d1..b294f7be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java
@@ -15,15 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.master;
+package org.apache.hadoop.hbase;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ThreadFactory;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
@@ -35,7 +35,9 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
 /**
@@ -87,10 +89,10 @@ public class MetaRegionLocationCache extends ZKListener {
     // are established. Subsequent updates are handled by the registered listener. Also, this runs
     // in a separate thread in the background to not block master init.
     ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
-    RetryCounterFactory retryFactory = new RetryCounterFactory(
-        Integer.MAX_VALUE, SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX);
-    threadFactory.newThread(
-      ()->loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT)).start();
+    RetryCounterFactory retryFactory = new RetryCounterFactory(Integer.MAX_VALUE,
+      SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX);
+    threadFactory.newThread(() -> loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT))
+      .start();
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 66ca847..8b1638a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.InvalidFamilyOperationException;
 import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MetaRegionLocationCache;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.PleaseHoldException;
 import org.apache.hadoop.hbase.PleaseRestartMasterException;
@@ -302,12 +303,6 @@ public class HMaster extends HRegionServer implements MasterServices {
   // manager of assignment nodes in zookeeper
   private AssignmentManager assignmentManager;
 
-  /**
-   * Cache for the meta region replica's locations. Also tracks their changes to avoid stale
-   * cache entries.
-   */
-  private final MetaRegionLocationCache metaRegionLocationCache;
-
   // manager of replication
   private ReplicationPeerManager replicationPeerManager;
 
@@ -474,7 +469,6 @@ public class HMaster extends HRegionServer implements MasterServices {
         }
       }
 
-      this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper);
       this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
 
       cachedClusterId = new CachedClusterId(this, conf);
@@ -3793,10 +3787,6 @@ public class HMaster extends HRegionServer implements MasterServices {
     return cachedClusterId.getFromCacheOrFetch();
   }
 
-  public MetaRegionLocationCache getMetaRegionLocationCache() {
-    return this.metaRegionLocationCache;
-  }
-
   /**
    * Get the compaction state of the table
    *
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 1c3137f..da40aaa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ClusterMetricsBuilder;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.Server;
@@ -76,7 +75,6 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
-import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
 import org.apache.hadoop.hbase.master.janitor.MetaFixer;
 import org.apache.hadoop.hbase.master.locking.LockProcedure;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -178,7 +176,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceReq
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
@@ -203,21 +200,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProced
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
@@ -354,6 +342,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
@@ -2986,9 +2984,11 @@ public class MasterRpcServices extends RSRpcServices implements
     return true;
   }
 
+  // Override this method since for backup master we will not set the clusterId field, which means
+  // we need to find another way to get cluster id for backup masters.
   @Override
   public GetClusterIdResponse getClusterId(RpcController rpcController, GetClusterIdRequest request)
-      throws ServiceException {
+    throws ServiceException {
     GetClusterIdResponse.Builder resp = GetClusterIdResponse.newBuilder();
     String clusterId = master.getClusterId();
     if (clusterId != null) {
@@ -2997,40 +2997,43 @@ public class MasterRpcServices extends RSRpcServices implements
     return resp.build();
   }
 
+  // Override this method since we use ActiveMasterManager to get active master on HMaster while in
+  // HRegionServer we use MasterAddressTracker
   @Override
   public GetActiveMasterResponse getActiveMaster(RpcController rpcController,
-      GetActiveMasterRequest request) throws ServiceException {
+    GetActiveMasterRequest request) throws ServiceException {
     GetActiveMasterResponse.Builder resp = GetActiveMasterResponse.newBuilder();
     Optional<ServerName> serverName = master.getActiveMaster();
     serverName.ifPresent(name -> resp.setServerName(ProtobufUtil.toServerName(name)));
     return resp.build();
   }
 
+  // Override this method since we use ActiveMasterManager to get backup masters on HMaster while in
+  // HRegionServer we use MasterAddressTracker
   @Override
   public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request)
-      throws ServiceException {
+    throws ServiceException {
     GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
     // Active master
     Optional<ServerName> serverName = master.getActiveMaster();
     serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder()
-        .setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
+      .setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
     // Backup masters
-    for (ServerName backupMaster: master.getBackupMasters()) {
-      resp.addMasterServers(GetMastersResponseEntry.newBuilder().setServerName(
-          ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
+    for (ServerName backupMaster : master.getBackupMasters()) {
+      resp.addMasterServers(GetMastersResponseEntry.newBuilder()
+        .setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
     }
     return resp.build();
   }
 
   @Override
-  public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController rpcController,
-      GetMetaRegionLocationsRequest request) throws ServiceException {
-    GetMetaRegionLocationsResponse.Builder response = GetMetaRegionLocationsResponse.newBuilder();
-    Optional<List<HRegionLocation>> metaLocations =
-        master.getMetaRegionLocationCache().getMetaRegionLocations();
-    metaLocations.ifPresent(hRegionLocations -> hRegionLocations.forEach(
-      location -> response.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
-    return response.build();
+  public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
+    GetBootstrapNodesRequest request) throws ServiceException {
+    GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
+    for (ServerName sn : master.getServerManager().getOnlineServers().keySet()) {
+      builder.addServerName(ProtobufUtil.toServerName(sn));
+    }
+    return builder.build();
   }
   @Override
   public HBaseProtos.LogEntry getLogEntries(RpcController controller,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 5582702..f30a8cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HealthCheckChore;
+import org.apache.hadoop.hbase.MetaRegionLocationCache;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.PleaseHoldException;
@@ -181,6 +182,7 @@ import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.RegionServerAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -427,6 +429,16 @@ public class HRegionServer extends Thread implements
   // master address tracker
   private final MasterAddressTracker masterAddressTracker;
 
+  /**
+   * Cache for the meta region replica's locations. Also tracks their changes to avoid stale cache
+   * entries. Used for serving ClientMetaService.
+   */
+  private final MetaRegionLocationCache metaRegionLocationCache;
+  /**
+   * Cache for all the region servers in the cluster. Used for serving ClientMetaService.
+   */
+  private final RegionServerAddressTracker regionServerAddressTracker;
+
   // Cluster Status Tracker
   protected final ClusterStatusTracker clusterStatusTracker;
 
@@ -677,6 +689,8 @@ public class HRegionServer extends Thread implements
         clusterStatusTracker = null;
       }
       this.rpcServices.start(zooKeeper);
+      this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);
+      this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this);
       // This violates 'no starting stuff in Constructor' but Master depends on the below chore
       // and executor being created and takes a different startup route. Lots of overlap between HRS
       // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
@@ -3970,4 +3984,12 @@ public class HRegionServer extends Thread implements
   public long getRetryPauseTime() {
     return this.retryPauseTime;
   }
+
+  public MetaRegionLocationCache getMetaRegionLocationCache() {
+    return this.metaRegionLocationCache;
+  }
+
+  RegionServerAddressTracker getRegionServerAddressTracker() {
+    return regionServerAddressTracker;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 92a10c8..06e7ccf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MultiActionResultTooLarge;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.PrivateCellUtil;
@@ -156,6 +157,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -254,6 +256,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -266,8 +280,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
 @InterfaceAudience.Private
 @SuppressWarnings("deprecation")
 public class RSRpcServices implements HBaseRPCErrorHandler,
-    AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
-    ConfigurationObserver {
+    AdminService.BlockingInterface, ClientService.BlockingInterface,
+    ClientMetaService.BlockingInterface, PriorityFunction, ConfigurationObserver {
   protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
 
   /** RPC scheduler to use for the region server. */
@@ -373,9 +387,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * where you would ever turn off one or the other).
    */
   public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
-      "hbase.regionserver.admin.executorService";
+    "hbase.regionserver.admin.executorService";
   public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
-      "hbase.regionserver.client.executorService";
+    "hbase.regionserver.client.executorService";
+  public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG =
+    "hbase.regionserver.client.meta.executorService";
 
   /**
    * An Rpc callback for closing a RegionScanner.
@@ -1578,23 +1594,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * supports
    */
   protected List<BlockingServiceAndInterface> getServices() {
-    boolean admin =
-      getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
-    boolean client =
-      getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
+    boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
+    boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
+    boolean clientMeta =
+      getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true);
     List<BlockingServiceAndInterface> bssi = new ArrayList<>();
     if (client) {
-      bssi.add(new BlockingServiceAndInterface(
-      ClientService.newReflectiveBlockingService(this),
-      ClientService.BlockingInterface.class));
+      bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this),
+        ClientService.BlockingInterface.class));
     }
     if (admin) {
-      bssi.add(new BlockingServiceAndInterface(
-      AdminService.newReflectiveBlockingService(this),
-      AdminService.BlockingInterface.class));
+      bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this),
+        AdminService.BlockingInterface.class));
     }
-    return new org.apache.hbase.thirdparty.com.google.common.collect.
-        ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
+    if (clientMeta) {
+      bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
+        ClientMetaService.BlockingInterface.class));
+    }
+    return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
   }
 
   public InetSocketAddress getSocketAddress() {
@@ -4008,4 +4025,61 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   protected ZKPermissionWatcher getZkPermissionWatcher() {
     return zkPermissionWatcher;
   }
+
+  @Override
+  public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request)
+    throws ServiceException {
+    return GetClusterIdResponse.newBuilder().setClusterId(regionServer.getClusterId()).build();
+  }
+
+  @Override
+  public GetActiveMasterResponse getActiveMaster(RpcController controller,
+    GetActiveMasterRequest request) throws ServiceException {
+    GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder();
+    ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
+    if (activeMaster != null) {
+      builder.setServerName(ProtobufUtil.toServerName(activeMaster));
+    }
+    return builder.build();
+  }
+
+  @Override
+  public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request)
+    throws ServiceException {
+    try {
+      GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
+      ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
+      if (activeMaster != null) {
+        builder.addMasterServers(GetMastersResponseEntry.newBuilder()
+          .setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true));
+      }
+      for (ServerName backupMaster : regionServer.getMasterAddressTracker().getBackupMasters()) {
+        builder.addMasterServers(GetMastersResponseEntry.newBuilder()
+          .setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false));
+      }
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller,
+    GetMetaRegionLocationsRequest request) throws ServiceException {
+    GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder();
+    Optional<List<HRegionLocation>> metaLocations =
+      regionServer.getMetaRegionLocationCache().getMetaRegionLocations();
+    metaLocations.ifPresent(hRegionLocations -> hRegionLocations
+      .forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
+    return builder.build();
+  }
+
+  @Override
+  public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
+    GetBootstrapNodesRequest request) throws ServiceException {
+    GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
+    regionServer.getRegionServerAddressTracker().getRegionServers().stream()
+      .map(ProtobufUtil::toServerName).forEach(builder::addServerName);
+    return builder.build();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
index b7ab7f0..8fbe6ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
@@ -17,18 +17,20 @@
  */
 package org.apache.hadoop.hbase.security;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos;
+
 /**
  * Implementation of secure Hadoop policy provider for mapping
  * protocol interfaces to hbase-policy.xml entries.
@@ -41,7 +43,7 @@ public class HBasePolicyProvider extends PolicyProvider {
     new Service("security.client.protocol.acl",
       MasterProtos.HbckService.BlockingInterface.class),
     new Service("security.client.protocol.acl",
-      MasterProtos.ClientMetaService.BlockingInterface.class),
+      RegistryProtos.ClientMetaService.BlockingInterface.class),
     new Service("security.admin.protocol.acl", MasterService.BlockingInterface.class),
     new Service("security.masterregion.protocol.acl",
       RegionServerStatusService.BlockingInterface.class)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java
index 3c539a0..be8e432 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java
@@ -95,7 +95,7 @@ class FromClientSideBase {
     Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
       ZKConnectionRegistry.class);
     int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
-      MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
+      AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT);
     return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterAddressRefresher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterAddressRefresher.java
deleted file mode 100644
index ad1e738..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterAddressRefresher.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
-
-@Category({ClientTests.class, SmallTests.class})
-public class TestMasterAddressRefresher {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestMasterAddressRefresher.class);
-
-  private class DummyMasterRegistry extends MasterRegistry {
-
-    private final AtomicInteger getMastersCallCounter = new AtomicInteger(0);
-    private final List<Long> callTimeStamps = new ArrayList<>();
-
-    DummyMasterRegistry(Configuration conf) throws IOException {
-      super(conf);
-    }
-
-    @Override
-    CompletableFuture<List<ServerName>> getMasters() {
-      getMastersCallCounter.incrementAndGet();
-      callTimeStamps.add(EnvironmentEdgeManager.currentTime());
-      return CompletableFuture.completedFuture(new ArrayList<>());
-    }
-
-    public int getMastersCount() {
-      return getMastersCallCounter.get();
-    }
-
-    public List<Long> getCallTimeStamps() {
-      return callTimeStamps;
-    }
-  }
-
-  @Test
-  public void testPeriodicMasterEndPointRefresh() throws IOException {
-    Configuration conf = HBaseConfiguration.create();
-    // Refresh every 1 second.
-    conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, 1);
-    conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0);
-    try (DummyMasterRegistry registry = new DummyMasterRegistry(conf)) {
-      // Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made.
-      Waiter.waitFor(
-          conf, 5000, (Waiter.Predicate<Exception>) () -> registry.getMastersCount() > 3);
-    }
-  }
-
-  @Test
-  public void testDurationBetweenRefreshes() throws IOException {
-    Configuration conf = HBaseConfiguration.create();
-    // Disable periodic refresh
-    conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, Integer.MAX_VALUE);
-    // A minimum duration of 1s between refreshes
-    conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 1);
-    try (DummyMasterRegistry registry = new DummyMasterRegistry(conf)) {
-      // Issue a ton of manual refreshes.
-      for (int i = 0; i < 10000; i++) {
-        registry.masterAddressRefresher.refreshNow();
-        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
-      }
-      // Overall wait time is 10000 ms, so the number of requests should be <=10
-      List<Long> callTimeStamps = registry.getCallTimeStamps();
-      // Actual calls to getMasters() should be much lower than the refresh count.
-      Assert.assertTrue(
-          String.valueOf(registry.getMastersCount()), registry.getMastersCount() <= 20);
-      Assert.assertTrue(callTimeStamps.size() > 0);
-      // Verify that the delta between subsequent RPCs is at least 1sec as configured.
-      for (int i = 1; i < callTimeStamps.size() - 1; i++) {
-        long delta = callTimeStamps.get(i) - callTimeStamps.get(i - 1);
-        // Few ms cushion to account for any env jitter.
-        Assert.assertTrue(callTimeStamps.toString(), delta > 990);
-      }
-    }
-
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
index 359ad61..6301b05 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
@@ -53,7 +53,7 @@ public class TestMasterRegistry {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestMasterRegistry.class);
+    HBaseClassTestRule.forClass(TestMasterRegistry.class);
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
   @BeforeClass
@@ -90,7 +90,7 @@ public class TestMasterRegistry {
     int numMasters = 10;
     conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters));
     try (MasterRegistry registry = new MasterRegistry(conf)) {
-      List<ServerName> parsedMasters = new ArrayList<>(registry.getParsedMasterServers());
+      List<ServerName> parsedMasters = new ArrayList<>(registry.getParsedServers());
       // Half of them would be without a port, duplicates are removed.
       assertEquals(numMasters / 2 + 1, parsedMasters.size());
       // Sort in the increasing order of port numbers.
@@ -149,17 +149,17 @@ public class TestMasterRegistry {
     // Set the hedging fan out so that all masters are queried.
     conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
     // Do not limit the number of refreshes during the test run.
-    conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0);
+    conf.setLong(MasterRegistry.MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES, 0);
     try (MasterRegistry registry = new MasterRegistry(conf)) {
-      final Set<ServerName> masters = registry.getParsedMasterServers();
+      final Set<ServerName> masters = registry.getParsedServers();
       assertTrue(masters.contains(badServer));
       // Make a registry RPC, this should trigger a refresh since one of the hedged RPC fails.
       assertEquals(registry.getClusterId().get(), clusterId);
       // Wait for new set of masters to be populated.
       TEST_UTIL.waitFor(5000,
-          (Waiter.Predicate<Exception>) () -> !registry.getParsedMasterServers().equals(masters));
+        (Waiter.Predicate<Exception>) () -> !registry.getParsedServers().equals(masters));
       // new set of masters should not include the bad server
-      final Set<ServerName> newMasters = registry.getParsedMasterServers();
+      final Set<ServerName> newMasters = registry.getParsedServers();
       // Bad one should be out.
       assertEquals(3, newMasters.size());
       assertFalse(newMasters.contains(badServer));
@@ -170,8 +170,8 @@ public class TestMasterRegistry {
       TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(10000);
       // Wait until the killed master de-registered. This should also trigger another refresh.
       TEST_UTIL.waitFor(10000, () -> registry.getMasters().get().size() == 2);
-      TEST_UTIL.waitFor(20000, () -> registry.getParsedMasterServers().size() == 2);
-      final Set<ServerName> newMasters2 = registry.getParsedMasterServers();
+      TEST_UTIL.waitFor(20000, () -> registry.getParsedServers().size() == 2);
+      final Set<ServerName> newMasters2 = registry.getParsedServers();
       assertEquals(2, newMasters2.size());
       assertFalse(newMasters2.contains(activeMaster.getServerName()));
     } finally {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
index 24e8823..4aadeb3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
@@ -28,11 +28,11 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MetaRegionLocationCache;
 import org.apache.hadoop.hbase.MultithreadedTestUtil;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.MetaRegionLocationCache;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -94,20 +94,20 @@ public class TestMetaRegionLocationCache {
         break;
       }
     }
-    List<HRegionLocation> metaHRLs =
-        master.getMetaRegionLocationCache().getMetaRegionLocations().get();
-    assertFalse(metaHRLs.isEmpty());
     ZKWatcher zk = master.getZooKeeper();
     List<String> metaZnodes = zk.getMetaReplicaNodes();
     // Wait till all replicas available.
     retries = 0;
-    while (master.getMetaRegionLocationCache().getMetaRegionLocations().get().size() !=
-        metaZnodes.size()) {
+    while (master.getMetaRegionLocationCache().getMetaRegionLocations().get().size() != metaZnodes
+      .size()) {
       Thread.sleep(1000);
       if (++retries == 10) {
         break;
       }
     }
+    List<HRegionLocation> metaHRLs =
+      master.getMetaRegionLocationCache().getMetaRegionLocations().get();
+    assertFalse(metaHRLs.isEmpty());
     assertEquals(metaZnodes.size(), metaHRLs.size());
     List<HRegionLocation> actualHRLs = getCurrentMetaLocations(zk);
     Collections.sort(metaHRLs);
@@ -115,13 +115,14 @@ public class TestMetaRegionLocationCache {
     assertEquals(actualHRLs, metaHRLs);
   }
 
-  @Test public void testInitialMetaLocations() throws Exception {
+  @Test
+  public void testInitialMetaLocations() throws Exception {
     verifyCachedMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster());
   }
 
-  @Test public void testStandByMetaLocations() throws Exception {
+  @Test
+  public void testStandByMetaLocations() throws Exception {
     HMaster standBy = TEST_UTIL.getMiniHBaseCluster().startMaster().getMaster();
-    standBy.isInitialized();
     verifyCachedMetaLocations(standBy);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java
new file mode 100644
index 0000000..1447099
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestRegistryEndpointsRefresher {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class);
+
+  private static final String INTERVAL_SECS_CONFIG_NAME =
+    "hbase.test.registry.refresh.interval.secs";
+  private static final String MIN_INTERVAL_SECS_CONFIG_NAME =
+    "hbase.test.registry.refresh.min.interval.secs";
+
+  private Configuration conf;
+  private RegistryEndpointsRefresher refresher;
+  private AtomicInteger getMastersCallCounter;
+  private CopyOnWriteArrayList<Long> callTimestamps;
+
+  @Before
+  public void setUp() {
+    conf = HBaseConfiguration.create();
+    getMastersCallCounter = new AtomicInteger(0);
+    callTimestamps = new CopyOnWriteArrayList<>();
+  }
+
+  @After
+  public void tearDown() {
+    if (refresher != null) {
+      refresher.stop();
+    }
+  }
+
+  private void refresh() {
+    getMastersCallCounter.incrementAndGet();
+    callTimestamps.add(EnvironmentEdgeManager.currentTime());
+  }
+
+  private void createAndStartRefresher(long intervalSecs, long minIntervalSecs) {
+    conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
+    conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
+    refresher = new RegistryEndpointsRefresher(conf, INTERVAL_SECS_CONFIG_NAME,
+      MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
+    refresher.start();
+  }
+
+  @Test
+  public void testPeriodicMasterEndPointRefresh() throws IOException {
+    // Refresh every 1 second.
+    createAndStartRefresher(1, 0);
+    // Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made.
+    Waiter.waitFor(conf, 5000, () -> getMastersCallCounter.get() > 3);
+  }
+
+  @Test
+  public void testDurationBetweenRefreshes() throws IOException {
+    // Disable periodic refresh
+    // A minimum duration of 1s between refreshes
+    createAndStartRefresher(Integer.MAX_VALUE, 1);
+    // Issue a ton of manual refreshes.
+    for (int i = 0; i < 10000; i++) {
+      refresher.refreshNow();
+      Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
+    }
+    // Overall wait time is 10000 ms, so the number of requests should be <=10
+    // Actual calls to getMasters() should be much lower than the refresh count.
+    assertTrue(String.valueOf(getMastersCallCounter.get()), getMastersCallCounter.get() <= 20);
+    assertTrue(callTimestamps.size() > 0);
+    // Verify that the delta between subsequent RPCs is at least 1sec as configured.
+    for (int i = 1; i < callTimestamps.size() - 1; i++) {
+      long delta = callTimestamps.get(i) - callTimestamps.get(i - 1);
+      // Few ms cushion to account for any env jitter.
+      assertTrue(callTimestamps.toString(), delta > 990);
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java
new file mode 100644
index 0000000..20e77a8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestRpcConnectionRegistry {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRpcConnectionRegistry.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private RpcConnectionRegistry registry;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // allow refresh immediately so we will switch to use region servers soon.
+    UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
+    UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
+    UTIL.startMiniCluster(3);
+    HBaseTestingUtility.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    registry = new RpcConnectionRegistry(UTIL.getConfiguration());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    Closeables.close(registry, true);
+  }
+
+  @Test
+  public void testRegistryRPCs() throws Exception {
+    HMaster activeMaster = UTIL.getHBaseCluster().getMaster();
+    // wait until we switch to use region servers
+    UTIL.waitFor(10000, () -> registry.getParsedServers().size() == 3);
+    assertThat(registry.getParsedServers(),
+      hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0])));
+
+    // Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
+    // because not all replicas had made it up before test started.
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
+
+    assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
+    assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
+    List<HRegionLocation> metaLocations =
+      Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
+    List<HRegionLocation> actualMetaLocations =
+      activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
+    Collections.sort(metaLocations);
+    Collections.sort(actualMetaLocations);
+    assertEquals(actualMetaLocations, metaLocations);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index 88b4fb7..39e8191 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -146,7 +146,7 @@ public class TestScannersFromClientSide {
     Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
       ZKConnectionRegistry.class);
     int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
-      MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
+      AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT);
     return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java
index 428aee2..dfc35f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
 import static org.junit.Assert.assertEquals;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -45,14 +46,15 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
 
 @Category({MediumTests.class, MasterTests.class})
 public class TestClientMetaServiceRPCs {
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerAddressTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerAddressTracker.java
new file mode 100644
index 0000000..e478639
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerAddressTracker.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+/**
+ * Class for tracking the region servers for a cluster.
+ */
+@InterfaceAudience.Private
+public class RegionServerAddressTracker extends ZKListener {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RegionServerAddressTracker.class);
+
+  private volatile List<ServerName> regionServers = Collections.emptyList();
+
+  private final Abortable abortable;
+
+  public RegionServerAddressTracker(ZKWatcher watcher, Abortable abortable) {
+    super(watcher);
+    this.abortable = abortable;
+    watcher.registerListener(this);
+    loadRegionServerList();
+  }
+
+  private void loadRegionServerList() {
+    List<String> names;
+    try {
+      names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
+    } catch (KeeperException e) {
+      LOG.error("failed to list region servers", e);
+      abortable.abort("failed to list region servers", e);
+      return;
+    }
+    if (CollectionUtils.isEmpty(names)) {
+      regionServers = Collections.emptyList();
+    } else {
+      regionServers = names.stream().map(ServerName::parseServerName)
+        .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
+    }
+  }
+
+  @Override
+  public void nodeChildrenChanged(String path) {
+    if (path.equals(watcher.getZNodePaths().rsZNode)) {
+      loadRegionServerList();
+    }
+  }
+
+  public List<ServerName> getRegionServers() {
+    return regionServers;
+  }
+}
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionServerAddressTracker.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionServerAddressTracker.java
new file mode 100644
index 0000000..6d2cc2b
--- /dev/null
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionServerAddressTracker.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ZKTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ ZKTests.class, MediumTests.class })
+public class TestRegionServerAddressTracker {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionServerAddressTracker.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerAddressTracker.class);
+
+  private static final HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility();
+
+  private ZKWatcher zk;
+
+  private RegionServerAddressTracker tracker;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniZKCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniZKCluster();
+  }
+
+  @Before
+  public void setUp() throws ZooKeeperConnectionException, IOException, KeeperException {
+    TEST_UTIL.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + name.getMethodName());
+    zk = new ZKWatcher(TEST_UTIL.getConfiguration(), name.getMethodName(), null);
+    ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
+    tracker = new RegionServerAddressTracker(zk, new WarnOnlyAbortable());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    Closeables.close(zk, true);
+  }
+
+  @Test
+  public void test() throws KeeperException {
+    ServerName rs1 = ServerName.valueOf("127.0.0.1", 16000, EnvironmentEdgeManager.currentTime());
+    ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString()));
+    TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1);
+    assertEquals(rs1, tracker.getRegionServers().get(0));
+
+    ServerName rs2 = ServerName.valueOf("127.0.0.2", 16000, EnvironmentEdgeManager.currentTime());
+    ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString()));
+    TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 2);
+    assertThat(tracker.getRegionServers(), hasItems(rs1, rs2));
+
+    ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString()));
+    TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1);
+    assertEquals(rs2, tracker.getRegionServers().get(0));
+
+    ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString()));
+    TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().isEmpty());
+  }
+
+  private static final class WarnOnlyAbortable implements Abortable {
+    @Override
+    public void abort(String why, Throwable e) {
+      LOG.warn("RegionServerAddressTracker received abort, ignoring.  Reason: {}", why, e);
+    }
+
+    @Override
+    public boolean isAborted() {
+      return false;
+    }
+  }
+}