You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2020/08/26 16:13:54 UTC

[hbase] branch branch-2 updated: HBASE-24765: Dynamic master discovery (#2314)

This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new bb9121d  HBASE-24765: Dynamic master discovery (#2314)
bb9121d is described below

commit bb9121da77c7b881a3cc4c389029a610fc2b0925
Author: Bharath Vissapragada <bh...@apache.org>
AuthorDate: Wed Aug 26 09:13:34 2020 -0700

    HBASE-24765: Dynamic master discovery (#2314)
    
    This patch adds the ability to discover newly added masters
    dynamically on the master registry side. The trigger for the
    re-fetch is either periodic (5 mins) or any registry RPC failure.
    Master server information is cached in masters to avoid repeated
    ZK lookups.
    
    Updates the client side connection metrics to maintain a counter
    per RPC type so that clients have visibility into counts grouped
    by RPC method name.
    
    I didn't add the method to ZK registry interface since there
    is a design discussion going on in splittable meta doc. We can
    add it later if needed.
    
    Signed-off-by: Nick Dimiduk <nd...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
    (cherry picked from commit 275a38e1533eafa1d4bd1d50c13bcecd9a397ea8)
---
 .../hbase/client/MasterAddressRefresher.java       | 126 ++++++++++
 .../apache/hadoop/hbase/client/MasterRegistry.java | 111 ++++++--
 .../hadoop/hbase/client/MetricsConnection.java     |  11 +-
 .../client/TestMasterRegistryHedgedReads.java      |  12 +-
 .../hadoop/hbase/client/TestMetricsConnection.java |   6 +
 .../src/main/protobuf/Master.proto                 |  22 +-
 .../hadoop/hbase/master/ActiveMasterManager.java   |  40 ++-
 .../org/apache/hadoop/hbase/master/HMaster.java    |  58 +----
 .../hadoop/hbase/master/MasterRpcServices.java     |  19 ++
 .../hbase/client/TestMasterAddressRefresher.java   | 113 +++++++++
 .../hadoop/hbase/client/TestMasterRegistry.java    |  52 ++++
 .../hadoop/hbase/master/AlwaysStandByHMaster.java  |   8 +-
 .../hbase/master/TestActiveMasterManager.java      | 278 ++++++++++++---------
 .../regionserver/TestMasterAddressTracker.java     |  40 ++-
 .../hbase/zookeeper/MasterAddressTracker.java      |  58 +++++
 15 files changed, 741 insertions(+), 213 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java
new file mode 100644
index 0000000..3cbb9f7
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
+
+/**
+ * Thread safe utility that keeps master end points used by {@link MasterRegistry} up to date. This
+ * uses the RPC {@link ClientMetaService#getMasters} to fetch the latest list of registered masters.
+ * By default the refresh happens periodically (configured via
+ * {@link #PERIODIC_REFRESH_INTERVAL_SECS}). The refresh can also be triggered on demand via
+ * {@link #refreshNow()}. To prevent a flood of on-demand refreshes we expect that any attempts two
+ * should be spaced at least {@link #MIN_SECS_BETWEEN_REFRESHES} seconds apart.
+ */
+@InterfaceAudience.Private
+public class MasterAddressRefresher implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(MasterAddressRefresher.class);
+  public static final String PERIODIC_REFRESH_INTERVAL_SECS =
+      "hbase.client.master_registry.refresh_interval_secs";
+  private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
+  public static final String MIN_SECS_BETWEEN_REFRESHES =
+      "hbase.client.master_registry.min_secs_between_refreshes";
+  private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
+
+  private final ExecutorService pool;
+  private final MasterRegistry registry;
+  private final long periodicRefreshMs;
+  private final long timeBetweenRefreshesMs;
+  private final Object refreshMasters = new Object();
+
+  @Override
+  public void close() {
+    pool.shutdownNow();
+  }
+
+  /**
+   * Thread that refreshes the master end points until it is interrupted via {@link #close()}.
+   * Multiple callers attempting to refresh at the same time synchronize on {@link #refreshMasters}.
+   */
+  private class RefreshThread implements Runnable {
+    @Override
+    public void run() {
+      long lastRpcTs = 0;
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
+          // have duplicate refreshes because once the thread is past the wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(periodicRefreshMs);
+          }
+          long currentTs = EnvironmentEdgeManager.currentTime();
+          if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) {
+            continue;
+          }
+          lastRpcTs = currentTs;
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(registry.getMasters().get());
+          registry.populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
+          break;
+        } catch (ExecutionException | IOException e) {
+          LOG.debug("Error populating latest list of masters.", e);
+        }
+      }
+      LOG.info("Master end point refresher loop exited.");
+    }
+  }
+
+  MasterAddressRefresher(Configuration conf, MasterRegistry registry) {
+    pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+        .setNameFormat("master-registry-refresh-end-points").setDaemon(true).build());
+    periodicRefreshMs = TimeUnit.SECONDS.toMillis(conf.getLong(PERIODIC_REFRESH_INTERVAL_SECS,
+        PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
+    timeBetweenRefreshesMs = TimeUnit.SECONDS.toMillis(conf.getLong(MIN_SECS_BETWEEN_REFRESHES,
+        MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
+    Preconditions.checkArgument(periodicRefreshMs > 0);
+    Preconditions.checkArgument(timeBetweenRefreshesMs < periodicRefreshMs);
+    this.registry = registry;
+    pool.submit(new RefreshThread());
+  }
+
+  /**
+   * Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh.
+   * See class comment for details.
+   */
+  void refreshNow() {
+    synchronized (refreshMasters) {
+      refreshMasters.notify();
+    }
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
index 4d0a591..2a7ae16 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -33,11 +33,13 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
 import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcClient;
@@ -57,10 +59,11 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
 
@@ -89,11 +92,14 @@ public class MasterRegistry implements ConnectionRegistry {
   private final int hedgedReadFanOut;
 
   // Configured list of masters to probe the meta information from.
-  private final ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
+  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
 
   // RPC client used to talk to the masters.
   private final RpcClient rpcClient;
   private final RpcControllerFactory rpcControllerFactory;
+  private final int rpcTimeoutMs;
+
+  protected final MasterAddressRefresher masterAddressRefresher;
 
   /**
    * Parses the list of master addresses from the provided configuration. Supported format is comma
@@ -115,20 +121,27 @@ public class MasterRegistry implements ConnectionRegistry {
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    masterAddressRefresher = new MasterAddressRefresher(conf, this);
+  }
+
+  void populateMasterStubs(Set<ServerName> masters) throws IOException {
+    Preconditions.checkNotNull(masters);
     ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
-      ImmutableMap.builderWithExpectedSize(masterAddrs.size());
+        ImmutableMap.builderWithExpectedSize(masters.size());
     User user = User.getCurrent();
-    for (ServerName masterAddr : masterAddrs) {
+    for (ServerName masterAddr : masters) {
       builder.put(masterAddr,
-        ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
+          ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
     }
     masterAddr2Stub = builder.build();
   }
@@ -169,7 +182,13 @@ public class MasterRegistry implements ConnectionRegistry {
     CompletableFuture<T> future = new CompletableFuture<>();
     callable.call(controller, stub, resp -> {
       if (controller.failed()) {
-        future.completeExceptionally(controller.getFailed());
+        IOException failureReason = controller.getFailed();
+        future.completeExceptionally(failureReason);
+        if (ClientExceptionsUtil.isConnectionException(failureReason)) {
+          // RPC has failed, trigger a refresh of master end points. We can have some spurious
+          // refreshes, but that is okay since the RPC is not expensive and not in a hot path.
+          masterAddressRefresher.refreshNow();
+        }
       } else {
         future.complete(resp);
       }
@@ -188,8 +207,9 @@ public class MasterRegistry implements ConnectionRegistry {
    * been tried and all of them are failed, we will fail the future.
    */
   private <T extends Message> void groupCall(CompletableFuture<T> future,
-    List<ClientMetaService.Interface> masterStubs, int startIndexInclusive, Callable<T> callable,
-    Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
+      Set<ServerName> masterServers, List<ClientMetaService.Interface> masterStubs,
+      int startIndexInclusive, Callable<T> callable, Predicate<T> isValidResp, String debug,
+      ConcurrentLinkedQueue<Throwable> errors) {
     int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
     AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
     for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
@@ -210,10 +230,10 @@ public class MasterRegistry implements ConnectionRegistry {
               RetriesExhaustedException ex = new RetriesExhaustedException("masters",
                 masterStubs.size(), new ArrayList<>(errors));
               future.completeExceptionally(
-                new MasterRegistryFetchException(masterAddr2Stub.keySet(), ex));
+                new MasterRegistryFetchException(masterServers, ex));
             } else {
-              groupCall(future, masterStubs, endIndexExclusive, callable, isValidResp, debug,
-                errors);
+              groupCall(future, masterServers, masterStubs, endIndexExclusive, callable,
+                  isValidResp, debug, errors);
             }
           }
         } else {
@@ -226,17 +246,20 @@ public class MasterRegistry implements ConnectionRegistry {
 
   private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
     Predicate<T> isValidResp, String debug) {
-    List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2Stub.values());
+    ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2StubRef = masterAddr2Stub;
+    Set<ServerName> masterServers = masterAddr2StubRef.keySet();
+    List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2StubRef.values());
     Collections.shuffle(masterStubs, ThreadLocalRandom.current());
     CompletableFuture<T> future = new CompletableFuture<>();
-    groupCall(future, masterStubs, 0, callable, isValidResp, debug, new ConcurrentLinkedQueue<>());
+    groupCall(future, masterServers, masterStubs, 0, callable, isValidResp, debug,
+        new ConcurrentLinkedQueue<>());
     return future;
   }
 
   /**
    * Simple helper to transform the result of getMetaRegionLocations() rpc.
    */
-  private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
+  private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
     List<HRegionLocation> regionLocations = new ArrayList<>();
     resp.getMetaLocationsList()
       .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
@@ -247,7 +270,7 @@ public class MasterRegistry implements ConnectionRegistry {
   public CompletableFuture<RegionLocations> getMetaRegionLocations() {
     return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
       GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
-      "getMetaLocationsCount").thenApply(this::transformMetaRegionLocations);
+      "getMetaLocationsCount").thenApply(MasterRegistry::transformMetaRegionLocations);
   }
 
   @Override
@@ -259,17 +282,54 @@ public class MasterRegistry implements ConnectionRegistry {
       .thenApply(GetClusterIdResponse::getClusterId);
   }
 
-  private ServerName transformServerName(GetActiveMasterResponse resp) {
-    return ProtobufUtil.toServerName(resp.getServerName());
+  private static boolean hasActiveMaster(GetMastersResponse resp) {
+    List<GetMastersResponseEntry> activeMasters =
+        resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
+        Collectors.toList());
+    return activeMasters.size() == 1;
+  }
+
+  private static ServerName filterActiveMaster(GetMastersResponse resp) throws IOException {
+    List<GetMastersResponseEntry> activeMasters =
+        resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
+            Collectors.toList());
+    if (activeMasters.size() != 1) {
+      throw new IOException(String.format("Incorrect number of active masters encountered." +
+          " Expected: 1 found: %d. Content: %s", activeMasters.size(), activeMasters));
+    }
+    return ProtobufUtil.toServerName(activeMasters.get(0).getServerName());
   }
 
   @Override
   public CompletableFuture<ServerName> getActiveMaster() {
+    CompletableFuture<ServerName> future = new CompletableFuture<>();
+    addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
+      MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> {
+        if (ex != null) {
+          future.completeExceptionally(ex);
+        }
+        ServerName result = null;
+        try {
+          result = filterActiveMaster((GetMastersResponse)resp);
+        } catch (IOException e) {
+          future.completeExceptionally(e);
+        }
+        future.complete(result);
+      });
+    return future;
+  }
+
+  private static List<ServerName> transformServerNames(GetMastersResponse resp) {
+    return resp.getMasterServersList().stream().map(s -> ProtobufUtil.toServerName(
+        s.getServerName())).collect(Collectors.toList());
+  }
+
+  CompletableFuture<List<ServerName>> getMasters() {
+    System.out.println("getMasters()");
     return this
-      .<GetActiveMasterResponse> call(
-        (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
-        GetActiveMasterResponse::hasServerName, "getActiveMaster()")
-      .thenApply(this::transformServerName);
+        .<GetMastersResponse> call((c, s, d) -> s.getMasters(
+            c, GetMastersRequest.getDefaultInstance(), d), r -> r.getMasterServersCount() != 0,
+            "getMasters()").thenApply(MasterRegistry::transformServerNames);
   }
 
   @VisibleForTesting
@@ -279,6 +339,9 @@ public class MasterRegistry implements ConnectionRegistry {
 
   @Override
   public void close() {
+    if (masterAddressRefresher != null) {
+      masterAddressRefresher.close();
+    }
     if (rpcClient != null) {
       rpcClient.close();
     }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index 5d46344..e9f4c61 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -58,6 +58,7 @@ public class MetricsConnection implements StatisticTrackable {
   /** Set this key to {@code true} to enable metrics collection of client requests. */
   public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
 
+  private static final String CNT_BASE = "rpcCount_";
   private static final String DRTN_BASE = "rpcCallDurationMs_";
   private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
   private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
@@ -303,6 +304,8 @@ public class MetricsConnection implements StatisticTrackable {
           LOAD_FACTOR, CONCURRENCY_LEVEL);
   private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
     new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
+  @VisibleForTesting protected final ConcurrentMap<String, Counter>  rpcCounters =
+      new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
 
   MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
       Supplier<ThreadPoolExecutor> metaPool) {
@@ -434,8 +437,7 @@ public class MetricsConnection implements StatisticTrackable {
   }
 
   /** Update call stats for non-critical-path methods */
-  private void updateRpcGeneric(MethodDescriptor method, CallStats stats) {
-    final String methodName = method.getService().getName() + "_" + method.getName();
+  private void updateRpcGeneric(String methodName, CallStats stats) {
     getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
         .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
     getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
@@ -450,6 +452,9 @@ public class MetricsConnection implements StatisticTrackable {
     if (callsPerServer > 0) {
       concurrentCallsPerServerHist.update(callsPerServer);
     }
+    // Update the counter that tracks RPCs by type.
+    final String methodName = method.getService().getName() + "_" + method.getName();
+    getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc();
     // this implementation is tied directly to protobuf implementation details. would be better
     // if we could dispatch based on something static, ie, request Message type.
     if (method.getService() == ClientService.getDescriptor()) {
@@ -511,7 +516,7 @@ public class MetricsConnection implements StatisticTrackable {
       }
     }
     // Fallback to dynamic registry lookup for DDL methods.
-    updateRpcGeneric(method, stats);
+    updateRpcGeneric(methodName, stats);
   }
 
   public void incrCacheDroppingExceptions(Object exception) {
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java
index 8bbdce6..0af0198 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.security.User;
@@ -116,11 +115,20 @@ public class TestMasterRegistryHedgedReads {
     }
   }
 
+  /**
+   * A dummy RpcChannel implementation that intercepts the GetClusterId() RPC calls and injects
+   * errors. All other RPCs are ignored.
+   */
   public static final class RpcChannelImpl implements RpcChannel {
 
     @Override
     public void callMethod(MethodDescriptor method, RpcController controller, Message request,
       Message responsePrototype, RpcCallback<Message> done) {
+      if (!method.getName().equals("GetClusterId")) {
+        // On RPC failures, MasterRegistry internally runs getMasters() RPC to keep the master list
+        // fresh. We do not want to intercept those RPCs here and double count.
+        return;
+      }
       // simulate the asynchronous behavior otherwise all logic will perform in the same thread...
       EXECUTOR.execute(() -> {
         int index = CALLED.getAndIncrement();
@@ -129,7 +137,7 @@ public class TestMasterRegistryHedgedReads {
         } else if (GOOD_RESP_INDEXS.contains(index)) {
           done.run(RESP);
         } else {
-          ((HBaseRpcController) controller).setFailed("inject error");
+          controller.setFailed("inject error");
           done.run(null);
         }
       });
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
index 3f4afad..d48806d 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import com.codahale.metrics.RatioGauge;
 import com.codahale.metrics.RatioGauge.Ratio;
@@ -117,6 +118,11 @@ public class TestMetricsConnection {
               .build(),
           MetricsConnection.newCallStats());
     }
+    for (String method: new String[]{"Get", "Scan", "Mutate"}) {
+      final String metricKey = "rpcCount_" + ClientService.getDescriptor().getName() + "_" + method;
+      final long metricVal = METRICS.rpcCounters.get(metricKey).getCount();
+      assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal >= loop);
+    }
     for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] {
       METRICS.getTracker, METRICS.scanTracker, METRICS.multiTracker, METRICS.appendTracker,
       METRICS.deleteTracker, METRICS.incrementTracker, METRICS.putTracker
diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto
index 78674ab..4f9d75e 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto
@@ -1220,6 +1220,17 @@ message GetActiveMasterResponse {
   optional ServerName server_name = 1;
 }
 
+/** Request and response to get the current list of all registers master servers */
+message GetMastersRequest {
+}
+message GetMastersResponseEntry {
+  required ServerName server_name = 1;
+  required bool is_active = 2;
+}
+message GetMastersResponse {
+  repeated GetMastersResponseEntry master_servers = 1;
+}
+
 /** Request and response to get the current list of meta region locations */
 message GetMetaRegionLocationsRequest {
 }
@@ -1229,7 +1240,8 @@ message GetMetaRegionLocationsResponse {
 }
 
 /**
- * Implements all the RPCs needed by clients to look up cluster meta information needed for connection establishment.
+ * Implements all the RPCs needed by clients to look up cluster meta information needed for
+ * connection establishment.
  */
 service ClientMetaService {
   /**
@@ -1238,11 +1250,17 @@ service ClientMetaService {
   rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse);
 
   /**
-   * Get active master server name for this cluster.
+   * Get active master server name for this cluster. Retained for out of sync client and master
+   * rolling upgrades. Newer clients switched to GetMasters RPC request.
    */
   rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse);
 
   /**
+   * Get registered list of master servers in this cluster.
+   */
+  rpc GetMasters(GetMastersRequest) returns(GetMastersResponse);
+
+  /**
    * Get current meta replicas' region locations.
    */
   rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
index 606741b..34e7f75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.master;
 import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.hbase.Server;
@@ -34,12 +36,14 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
 /**
- * Handles everything on master-side related to master election.
+ * Handles everything on master-side related to master election. Keeps track of
+ * currently active master and registered backup masters.
  *
- * <p>Listens and responds to ZooKeeper notifications on the master znode,
+ * <p>Listens and responds to ZooKeeper notifications on the master znodes,
  * both <code>nodeCreated</code> and <code>nodeDeleted</code>.
  *
  * <p>Contains blocking methods which will hold up backup masters, waiting
@@ -65,17 +69,22 @@ public class ActiveMasterManager extends ZKListener {
   // notifications) and lazily fetched on-demand.
   // ServerName is immutable, so we don't need heavy synchronization around it.
   volatile ServerName activeMasterServerName;
+  // Registered backup masters. List is kept up to date based on ZK change notifications to
+  // backup znode.
+  private volatile ImmutableList<ServerName> backupMasters;
 
   /**
    * @param watcher ZK watcher
    * @param sn ServerName
    * @param master In an instance of a Master.
    */
-  ActiveMasterManager(ZKWatcher watcher, ServerName sn, Server master) {
+  ActiveMasterManager(ZKWatcher watcher, ServerName sn, Server master)
+      throws InterruptedIOException {
     super(watcher);
     watcher.registerListener(this);
     this.sn = sn;
     this.master = master;
+    updateBackupMasters();
   }
 
   // will be set after jetty server is started
@@ -89,8 +98,18 @@ public class ActiveMasterManager extends ZKListener {
   }
 
   @Override
-  public void nodeDeleted(String path) {
+  public void nodeChildrenChanged(String path) {
+    if (path.equals(watcher.getZNodePaths().backupMasterAddressesZNode)) {
+      try {
+        updateBackupMasters();
+      } catch (InterruptedIOException ioe) {
+        LOG.error("Error updating backup masters", ioe);
+      }
+    }
+  }
 
+  @Override
+  public void nodeDeleted(String path) {
     // We need to keep track of the cluster's shutdown status while
     // we wait on the current master. We consider that, if the cluster
     // was already in a "shutdown" state when we started, that this master
@@ -101,7 +120,6 @@ public class ActiveMasterManager extends ZKListener {
     if(path.equals(watcher.getZNodePaths().clusterStateZNode) && !master.isStopped()) {
       clusterShutDown.set(true);
     }
-
     handle(path);
   }
 
@@ -111,6 +129,11 @@ public class ActiveMasterManager extends ZKListener {
     }
   }
 
+  private void updateBackupMasters() throws InterruptedIOException {
+    backupMasters =
+        ImmutableList.copyOf(MasterAddressTracker.getBackupMastersAndRenewWatch(watcher));
+  }
+
   /**
    * Fetches the active master's ServerName from zookeeper.
    */
@@ -318,4 +341,11 @@ public class ActiveMasterManager extends ZKListener {
           e.getMessage()));
     }
   }
+
+  /**
+   * @return list of registered backup masters.
+   */
+  public List<ServerName> getBackupMasters() {
+    return backupMasters;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 9efe849..c01eaf3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED
 import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
 import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
 import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;
-
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Service;
 import java.io.IOException;
@@ -93,7 +92,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.favored.FavoredNodesManager;
 import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
@@ -220,7 +218,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
@@ -229,14 +226,10 @@ import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
 import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
 import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
 import org.apache.hbase.thirdparty.org.eclipse.jetty.webapp.WebAppContext;
-
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 
 /**
- * HMaster is the "master server" for HBase. An HBase cluster has one active
- * master.  If many masters are started, all compete.  Whichever wins goes on to
  * run the cluster.  All others park themselves in their constructor until
  * master or cluster shutdown or until the active master loses its lease in
  * zookeeper.  Thereafter, all running master jostle to take over master role.
@@ -602,8 +595,8 @@ public class HMaster extends HRegionServer implements MasterServices {
    * Protected to have custom implementations in tests override the default ActiveMaster
    * implementation.
    */
-  protected ActiveMasterManager createActiveMasterManager(
-      ZKWatcher zk, ServerName sn, org.apache.hadoop.hbase.Server server) {
+  protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn,
+      org.apache.hadoop.hbase.Server server) throws InterruptedIOException {
     return new ActiveMasterManager(zk, sn, server);
   }
 
@@ -2702,51 +2695,8 @@ public class HMaster extends HRegionServer implements MasterServices {
     return status;
   }
 
-  private List<ServerName> getBackupMasters() throws InterruptedIOException {
-    // Build Set of backup masters from ZK nodes
-    List<String> backupMasterStrings;
-    try {
-      backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
-        this.zooKeeper.getZNodePaths().backupMasterAddressesZNode);
-    } catch (KeeperException e) {
-      LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
-      backupMasterStrings = null;
-    }
-
-    List<ServerName> backupMasters = Collections.emptyList();
-    if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
-      backupMasters = new ArrayList<>(backupMasterStrings.size());
-      for (String s: backupMasterStrings) {
-        try {
-          byte [] bytes;
-          try {
-            bytes = ZKUtil.getData(this.zooKeeper, ZNodePaths.joinZNode(
-                this.zooKeeper.getZNodePaths().backupMasterAddressesZNode, s));
-          } catch (InterruptedException e) {
-            throw new InterruptedIOException();
-          }
-          if (bytes != null) {
-            ServerName sn;
-            try {
-              sn = ProtobufUtil.parseServerNameFrom(bytes);
-            } catch (DeserializationException e) {
-              LOG.warn("Failed parse, skipping registering backup server", e);
-              continue;
-            }
-            backupMasters.add(sn);
-          }
-        } catch (KeeperException e) {
-          LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
-                   "backup servers"), e);
-        }
-      }
-      Collections.sort(backupMasters, new Comparator<ServerName>() {
-        @Override
-        public int compare(ServerName s1, ServerName s2) {
-          return s1.getServerName().compareTo(s2.getServerName());
-        }});
-    }
-    return backupMasters;
+  List<ServerName> getBackupMasters() {
+    return activeMasterManager.getBackupMasters();
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index de5f79a..fc7a753 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -202,6 +202,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetComplet
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
@@ -2952,6 +2955,22 @@ public class MasterRpcServices extends RSRpcServices implements
   }
 
   @Override
+  public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request)
+      throws ServiceException {
+    GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
+    // Active master
+    Optional<ServerName> serverName = master.getActiveMaster();
+    serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder()
+        .setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
+    // Backup masters
+    for (ServerName backupMaster: master.getBackupMasters()) {
+      resp.addMasterServers(GetMastersResponseEntry.newBuilder().setServerName(
+          ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
+    }
+    return resp.build();
+  }
+
+  @Override
   public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController rpcController,
       GetMetaRegionLocationsRequest request) throws ServiceException {
     GetMetaRegionLocationsResponse.Builder response = GetMetaRegionLocationsResponse.newBuilder();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterAddressRefresher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterAddressRefresher.java
new file mode 100644
index 0000000..ad1e738
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterAddressRefresher.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+
+@Category({ClientTests.class, SmallTests.class})
+public class TestMasterAddressRefresher {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestMasterAddressRefresher.class);
+
+  private class DummyMasterRegistry extends MasterRegistry {
+
+    private final AtomicInteger getMastersCallCounter = new AtomicInteger(0);
+    private final List<Long> callTimeStamps = new ArrayList<>();
+
+    DummyMasterRegistry(Configuration conf) throws IOException {
+      super(conf);
+    }
+
+    @Override
+    CompletableFuture<List<ServerName>> getMasters() {
+      getMastersCallCounter.incrementAndGet();
+      callTimeStamps.add(EnvironmentEdgeManager.currentTime());
+      return CompletableFuture.completedFuture(new ArrayList<>());
+    }
+
+    public int getMastersCount() {
+      return getMastersCallCounter.get();
+    }
+
+    public List<Long> getCallTimeStamps() {
+      return callTimeStamps;
+    }
+  }
+
+  @Test
+  public void testPeriodicMasterEndPointRefresh() throws IOException {
+    Configuration conf = HBaseConfiguration.create();
+    // Refresh every 1 second.
+    conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, 1);
+    conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0);
+    try (DummyMasterRegistry registry = new DummyMasterRegistry(conf)) {
+      // Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made.
+      Waiter.waitFor(
+          conf, 5000, (Waiter.Predicate<Exception>) () -> registry.getMastersCount() > 3);
+    }
+  }
+
+  @Test
+  public void testDurationBetweenRefreshes() throws IOException {
+    Configuration conf = HBaseConfiguration.create();
+    // Disable periodic refresh
+    conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, Integer.MAX_VALUE);
+    // A minimum duration of 1s between refreshes
+    conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 1);
+    try (DummyMasterRegistry registry = new DummyMasterRegistry(conf)) {
+      // Issue a ton of manual refreshes.
+      for (int i = 0; i < 10000; i++) {
+        registry.masterAddressRefresher.refreshNow();
+        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
+      }
+      // Overall wait time is 10000 ms, so the number of requests should be <=10
+      List<Long> callTimeStamps = registry.getCallTimeStamps();
+      // Actual calls to getMasters() should be much lower than the refresh count.
+      Assert.assertTrue(
+          String.valueOf(registry.getMastersCount()), registry.getMastersCount() <= 20);
+      Assert.assertTrue(callTimeStamps.size() > 0);
+      // Verify that the delta between subsequent RPCs is at least 1sec as configured.
+      for (int i = 1; i < callTimeStamps.size() - 1; i++) {
+        long delta = callTimeStamps.get(i) - callTimeStamps.get(i - 1);
+        // Few ms cushion to account for any env jitter.
+        Assert.assertTrue(callTimeStamps.toString(), delta > 990);
+      }
+    }
+
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
index 0e0060c..ba875c5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -26,6 +28,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -33,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -41,6 +45,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
 @Category({ MediumTests.class, ClientTests.class })
 public class TestMasterRegistry {
@@ -126,4 +131,51 @@ public class TestMasterRegistry {
       }
     }
   }
+
+  /**
+   * Tests that the list of masters configured in the MasterRegistry is dynamically refreshed in the
+   * event of errors.
+   */
+  @Test
+  public void testDynamicMasterConfigurationRefresh() throws Exception {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    String currentMasterAddrs = Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY));
+    HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
+    String clusterId = activeMaster.getClusterId();
+    // Add a non-working master
+    ServerName badServer = ServerName.valueOf("localhost", 1234, -1);
+    conf.set(HConstants.MASTER_ADDRS_KEY, badServer.toShortString() + "," + currentMasterAddrs);
+    // Set the hedging fan out so that all masters are queried.
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
+    // Do not limit the number of refreshes during the test run.
+    conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0);
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      final Set<ServerName> masters = registry.getParsedMasterServers();
+      assertTrue(masters.contains(badServer));
+      // Make a registry RPC, this should trigger a refresh since one of the hedged RPC fails.
+      assertEquals(registry.getClusterId().get(), clusterId);
+      // Wait for new set of masters to be populated.
+      TEST_UTIL.waitFor(5000,
+          (Waiter.Predicate<Exception>) () -> !registry.getParsedMasterServers().equals(masters));
+      // new set of masters should not include the bad server
+      final Set<ServerName> newMasters = registry.getParsedMasterServers();
+      // Bad one should be out.
+      assertEquals(3, newMasters.size());
+      assertFalse(newMasters.contains(badServer));
+      // Kill the active master
+      activeMaster.stopMaster();
+      TEST_UTIL.waitFor(10000,
+        () -> TEST_UTIL.getMiniHBaseCluster().getLiveMasterThreads().size() == 2);
+      TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(10000);
+      // Wait until the killed master de-registered. This should also trigger another refresh.
+      TEST_UTIL.waitFor(10000, () -> registry.getMasters().get().size() == 2);
+      TEST_UTIL.waitFor(20000, () -> registry.getParsedMasterServers().size() == 2);
+      final Set<ServerName> newMasters2 = registry.getParsedMasterServers();
+      assertEquals(2, newMasters2.size());
+      assertFalse(newMasters2.contains(activeMaster.getServerName()));
+    } finally {
+      // Reset the state, add a killed master.
+      TEST_UTIL.getMiniHBaseCluster().startMaster();
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java
index 41a008a..85eac40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
@@ -46,7 +47,8 @@ public class AlwaysStandByHMaster extends HMaster {
     private static final Logger LOG =
         LoggerFactory.getLogger(AlwaysStandByMasterManager.class);
 
-    AlwaysStandByMasterManager(ZKWatcher watcher, ServerName sn, Server master) {
+    AlwaysStandByMasterManager(ZKWatcher watcher, ServerName sn, Server master)
+        throws InterruptedIOException {
       super(watcher, sn, master);
     }
 
@@ -94,8 +96,8 @@ public class AlwaysStandByHMaster extends HMaster {
     super(conf);
   }
 
-  protected ActiveMasterManager createActiveMasterManager(
-      ZKWatcher zk, ServerName sn, org.apache.hadoop.hbase.Server server) {
+  protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn,
+      org.apache.hadoop.hbase.Server server) throws InterruptedIOException {
     return new AlwaysStandByMasterManager(zk, sn, server);
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
index ad76642..986d718 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
@@ -23,6 +23,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Semaphore;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,6 +45,7 @@ import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKListener;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -76,43 +80,45 @@ public class TestActiveMasterManager {
   }
 
   @Test public void testRestartMaster() throws IOException, KeeperException {
-    ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
-      "testActiveMasterManagerFromZK", null, true);
-    try {
-      ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
-      ZKUtil.deleteNode(zk, zk.getZNodePaths().clusterStateZNode);
-    } catch(KeeperException.NoNodeException nne) {}
-
-    // Create the master node with a dummy address
-    ServerName master = ServerName.valueOf("localhost", 1, System.currentTimeMillis());
-    // Should not have a master yet
-    DummyMaster dummyMaster = new DummyMaster(zk,master);
-    ClusterStatusTracker clusterStatusTracker =
-      dummyMaster.getClusterStatusTracker();
-    ActiveMasterManager activeMasterManager =
-      dummyMaster.getActiveMasterManager();
-    assertFalse(activeMasterManager.clusterHasActiveMaster.get());
-    assertFalse(activeMasterManager.getActiveMasterServerName().isPresent());
-
-    // First test becoming the active master uninterrupted
-    MonitoredTask status = Mockito.mock(MonitoredTask.class);
-    clusterStatusTracker.setClusterUp();
-
-    activeMasterManager.blockUntilBecomingActiveMaster(100, status);
-    assertTrue(activeMasterManager.clusterHasActiveMaster.get());
-    assertMaster(zk, master);
-    assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
-
-    // Now pretend master restart
-    DummyMaster secondDummyMaster = new DummyMaster(zk,master);
-    ActiveMasterManager secondActiveMasterManager =
-      secondDummyMaster.getActiveMasterManager();
-    assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get());
-    activeMasterManager.blockUntilBecomingActiveMaster(100, status);
-    assertTrue(activeMasterManager.clusterHasActiveMaster.get());
-    assertMaster(zk, master);
-    assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
-    assertMaster(zk, secondActiveMasterManager.getActiveMasterServerName().get());
+    try (ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
+      "testActiveMasterManagerFromZK", null, true)) {
+      try {
+        ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
+        ZKUtil.deleteNode(zk, zk.getZNodePaths().clusterStateZNode);
+      } catch (KeeperException.NoNodeException nne) {
+      }
+
+      // Create the master node with a dummy address
+      ServerName master = ServerName.valueOf("localhost", 1, System.currentTimeMillis());
+      // Should not have a master yet
+      DummyMaster dummyMaster = new DummyMaster(zk, master);
+      ClusterStatusTracker clusterStatusTracker =
+          dummyMaster.getClusterStatusTracker();
+      ActiveMasterManager activeMasterManager =
+          dummyMaster.getActiveMasterManager();
+      assertFalse(activeMasterManager.clusterHasActiveMaster.get());
+      assertFalse(activeMasterManager.getActiveMasterServerName().isPresent());
+
+      // First test becoming the active master uninterrupted
+      MonitoredTask status = Mockito.mock(MonitoredTask.class);
+      clusterStatusTracker.setClusterUp();
+
+      activeMasterManager.blockUntilBecomingActiveMaster(100, status);
+      assertTrue(activeMasterManager.clusterHasActiveMaster.get());
+      assertMaster(zk, master);
+      assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
+
+      // Now pretend master restart
+      DummyMaster secondDummyMaster = new DummyMaster(zk, master);
+      ActiveMasterManager secondActiveMasterManager =
+          secondDummyMaster.getActiveMasterManager();
+      assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get());
+      activeMasterManager.blockUntilBecomingActiveMaster(100, status);
+      assertTrue(activeMasterManager.clusterHasActiveMaster.get());
+      assertMaster(zk, master);
+      assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
+      assertMaster(zk, secondActiveMasterManager.getActiveMasterServerName().get());
+    }
   }
 
   /**
@@ -122,87 +128,122 @@ public class TestActiveMasterManager {
    */
   @Test
   public void testActiveMasterManagerFromZK() throws Exception {
-    ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
-      "testActiveMasterManagerFromZK", null, true);
-    try {
+    try (ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
+      "testActiveMasterManagerFromZK", null, true)) {
+      try {
+        ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
+        ZKUtil.deleteNode(zk, zk.getZNodePaths().clusterStateZNode);
+      } catch (KeeperException.NoNodeException nne) {
+      }
+
+      // Create the master node with a dummy address
+      ServerName firstMasterAddress =
+          ServerName.valueOf("localhost", 1, System.currentTimeMillis());
+      ServerName secondMasterAddress =
+          ServerName.valueOf("localhost", 2, System.currentTimeMillis());
+
+      // Should not have a master yet
+      DummyMaster ms1 = new DummyMaster(zk, firstMasterAddress);
+      ActiveMasterManager activeMasterManager =
+          ms1.getActiveMasterManager();
+      assertFalse(activeMasterManager.clusterHasActiveMaster.get());
+      assertFalse(activeMasterManager.getActiveMasterServerName().isPresent());
+
+      // First test becoming the active master uninterrupted
+      ClusterStatusTracker clusterStatusTracker =
+          ms1.getClusterStatusTracker();
+      clusterStatusTracker.setClusterUp();
+      activeMasterManager.blockUntilBecomingActiveMaster(100,
+          Mockito.mock(MonitoredTask.class));
+      assertTrue(activeMasterManager.clusterHasActiveMaster.get());
+      assertMaster(zk, firstMasterAddress);
+      assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
+
+      // New manager will now try to become the active master in another thread
+      WaitToBeMasterThread t = new WaitToBeMasterThread(zk, secondMasterAddress);
+      t.start();
+      // Wait for this guy to figure out there is another active master
+      // Wait for 1 second at most
+      int sleeps = 0;
+      while (!t.manager.clusterHasActiveMaster.get() && sleeps < 100) {
+        Thread.sleep(10);
+        sleeps++;
+      }
+
+      // Both should see that there is an active master
+      assertTrue(activeMasterManager.clusterHasActiveMaster.get());
+      assertTrue(t.manager.clusterHasActiveMaster.get());
+      // But secondary one should not be the active master
+      assertFalse(t.isActiveMaster);
+      // Verify the active master ServerName is populated in standby master.
+      assertEquals(firstMasterAddress, t.manager.getActiveMasterServerName().get());
+
+      // Close the first server and delete it's master node
+      ms1.stop("stopping first server");
+
+      // Use a listener to capture when the node is actually deleted
+      NodeDeletionListener listener = new NodeDeletionListener(zk,
+          zk.getZNodePaths().masterAddressZNode);
+      zk.registerListener(listener);
+
+      LOG.info("Deleting master node");
       ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
-      ZKUtil.deleteNode(zk, zk.getZNodePaths().clusterStateZNode);
-    } catch(KeeperException.NoNodeException nne) {}
-
-    // Create the master node with a dummy address
-    ServerName firstMasterAddress =
-        ServerName.valueOf("localhost", 1, System.currentTimeMillis());
-    ServerName secondMasterAddress =
-        ServerName.valueOf("localhost", 2, System.currentTimeMillis());
-
-    // Should not have a master yet
-    DummyMaster ms1 = new DummyMaster(zk,firstMasterAddress);
-    ActiveMasterManager activeMasterManager =
-      ms1.getActiveMasterManager();
-    assertFalse(activeMasterManager.clusterHasActiveMaster.get());
-    assertFalse(activeMasterManager.getActiveMasterServerName().isPresent());
-
-    // First test becoming the active master uninterrupted
-    ClusterStatusTracker clusterStatusTracker =
-      ms1.getClusterStatusTracker();
-    clusterStatusTracker.setClusterUp();
-    activeMasterManager.blockUntilBecomingActiveMaster(100,
-        Mockito.mock(MonitoredTask.class));
-    assertTrue(activeMasterManager.clusterHasActiveMaster.get());
-    assertMaster(zk, firstMasterAddress);
-    assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
-
-    // New manager will now try to become the active master in another thread
-    WaitToBeMasterThread t = new WaitToBeMasterThread(zk, secondMasterAddress);
-    t.start();
-    // Wait for this guy to figure out there is another active master
-    // Wait for 1 second at most
-    int sleeps = 0;
-    while(!t.manager.clusterHasActiveMaster.get() && sleeps < 100) {
-      Thread.sleep(10);
-      sleeps++;
-    }
 
-    // Both should see that there is an active master
-    assertTrue(activeMasterManager.clusterHasActiveMaster.get());
-    assertTrue(t.manager.clusterHasActiveMaster.get());
-    // But secondary one should not be the active master
-    assertFalse(t.isActiveMaster);
-    // Verify the active master ServerName is populated in standby master.
-    assertEquals(firstMasterAddress, t.manager.getActiveMasterServerName().get());
-
-    // Close the first server and delete it's master node
-    ms1.stop("stopping first server");
-
-    // Use a listener to capture when the node is actually deleted
-    NodeDeletionListener listener = new NodeDeletionListener(zk,
-            zk.getZNodePaths().masterAddressZNode);
-    zk.registerListener(listener);
-
-    LOG.info("Deleting master node");
-    ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
-
-    // Wait for the node to be deleted
-    LOG.info("Waiting for active master manager to be notified");
-    listener.waitForDeletion();
-    LOG.info("Master node deleted");
-
-    // Now we expect the secondary manager to have and be the active master
-    // Wait for 1 second at most
-    sleeps = 0;
-    while(!t.isActiveMaster && sleeps < 100) {
-      Thread.sleep(10);
-      sleeps++;
-    }
-    LOG.debug("Slept " + sleeps + " times");
+      // Wait for the node to be deleted
+      LOG.info("Waiting for active master manager to be notified");
+      listener.waitForDeletion();
+      LOG.info("Master node deleted");
+
+      // Now we expect the secondary manager to have and be the active master
+      // Wait for 1 second at most
+      sleeps = 0;
+      while (!t.isActiveMaster && sleeps < 100) {
+        Thread.sleep(10);
+        sleeps++;
+      }
+      LOG.debug("Slept " + sleeps + " times");
 
-    assertTrue(t.manager.clusterHasActiveMaster.get());
-    assertTrue(t.isActiveMaster);
-    assertEquals(secondMasterAddress, t.manager.getActiveMasterServerName().get());
+      assertTrue(t.manager.clusterHasActiveMaster.get());
+      assertTrue(t.isActiveMaster);
+      assertEquals(secondMasterAddress, t.manager.getActiveMasterServerName().get());
 
-    LOG.info("Deleting master node");
+      LOG.info("Deleting master node");
+
+      ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
+    }
+  }
 
-    ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
+  @Test
+  public void testBackupMasterUpdates() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    try (ZKWatcher zk = new ZKWatcher(conf, "testBackupMasterUpdates", null, true)) {
+      ServerName sn1 = ServerName.valueOf("localhost", 1, -1);
+      DummyMaster master1 = new DummyMaster(zk, sn1);
+      ActiveMasterManager activeMasterManager = master1.getActiveMasterManager();
+      activeMasterManager.blockUntilBecomingActiveMaster(100,
+          Mockito.mock(MonitoredTask.class));
+      assertEquals(sn1, activeMasterManager.getActiveMasterServerName().get());
+      assertEquals(0, activeMasterManager.getBackupMasters().size());
+      // Add backup masters
+      List<String> backupZNodes = new ArrayList<>();
+      for (int i = 1; i <= 10; i++) {
+        ServerName backupSn = ServerName.valueOf("localhost", 1000 + i, -1);
+        String backupZn = ZNodePaths.joinZNode(
+            zk.getZNodePaths().backupMasterAddressesZNode, backupSn.toString());
+        backupZNodes.add(backupZn);
+        MasterAddressTracker.setMasterAddress(zk, backupZn, backupSn, 1234);
+        TEST_UTIL.waitFor(10000,
+          () -> activeMasterManager.getBackupMasters().size() == backupZNodes.size());
+      }
+      // Remove backup masters
+      int numBackups = backupZNodes.size();
+      for (String backupZNode: backupZNodes) {
+        ZKUtil.deleteNode(zk, backupZNode);
+        final int currentBackups = --numBackups;
+        TEST_UTIL.waitFor(10000,
+          () -> activeMasterManager.getBackupMasters().size() == currentBackups);
+      }
+    }
   }
 
   /**
@@ -212,12 +253,11 @@ public class TestActiveMasterManager {
    * @throws KeeperException unexpected Zookeeper exception
    * @throws IOException if an IO problem is encountered
    */
-  private void assertMaster(ZKWatcher zk,
-      ServerName expectedAddress)
-  throws KeeperException, IOException {
+  private void assertMaster(ZKWatcher zk, ServerName expectedAddress) throws
+      KeeperException, IOException {
     ServerName readAddress = MasterAddressTracker.getMasterAddress(zk);
     assertNotNull(readAddress);
-    assertTrue(expectedAddress.equals(readAddress));
+    assertEquals(expectedAddress, readAddress);
   }
 
   public static class WaitToBeMasterThread extends Thread {
@@ -226,7 +266,7 @@ public class TestActiveMasterManager {
     DummyMaster dummyMaster;
     boolean isActiveMaster;
 
-    public WaitToBeMasterThread(ZKWatcher zk, ServerName address) {
+    public WaitToBeMasterThread(ZKWatcher zk, ServerName address) throws InterruptedIOException {
       this.dummyMaster = new DummyMaster(zk,address);
       this.manager = this.dummyMaster.getActiveMasterManager();
       isActiveMaster = false;
@@ -274,7 +314,7 @@ public class TestActiveMasterManager {
     private ClusterStatusTracker clusterStatusTracker;
     private ActiveMasterManager activeMasterManager;
 
-    public DummyMaster(ZKWatcher zk, ServerName master) {
+    public DummyMaster(ZKWatcher zk, ServerName master) throws InterruptedIOException {
       this.clusterStatusTracker =
         new ClusterStatusTracker(zk, this);
       clusterStatusTracker.start();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressTracker.java
index 05d794a..dec55a6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressTracker.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.List;
 import java.util.concurrent.Semaphore;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -32,6 +33,8 @@ import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKListener;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -52,10 +55,19 @@ public class TestMasterAddressTracker {
   private static final Logger LOG = LoggerFactory.getLogger(TestMasterAddressTracker.class);
 
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  // Cleaned up after each unit test.
+  private static ZKWatcher zk;
 
   @Rule
   public TestName name = new TestName();
 
+  @After
+  public void cleanUp() {
+    if (zk != null) {
+      zk.close();
+    }
+  }
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.startMiniZKCluster();
@@ -86,9 +98,10 @@ public class TestMasterAddressTracker {
    */
   private MasterAddressTracker setupMasterTracker(final ServerName sn, final int infoPort)
       throws Exception {
-    ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
+    zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
         name.getMethodName(), null);
     ZKUtil.createAndFailSilent(zk, zk.getZNodePaths().baseZNode);
+    ZKUtil.createAndFailSilent(zk, zk.getZNodePaths().backupMasterAddressesZNode);
 
     // Should not have a master yet
     MasterAddressTracker addressTracker = new MasterAddressTracker(zk, null);
@@ -164,6 +177,31 @@ public class TestMasterAddressTracker {
     assertEquals("Should receive 0 for backup not found.", 0, addressTracker.getMasterInfoPort());
   }
 
+  @Test
+  public void testBackupMasters() throws Exception {
+    final ServerName sn = ServerName.valueOf("localhost", 5678, System.currentTimeMillis());
+    final MasterAddressTracker addressTracker = setupMasterTracker(sn, 1111);
+    assertTrue(addressTracker.hasMaster());
+    ServerName activeMaster = addressTracker.getMasterAddress();
+    assertEquals(activeMaster, sn);
+    // No current backup masters
+    List<ServerName> backupMasters = addressTracker.getBackupMasters();
+    assertEquals(0, backupMasters.size());
+    ServerName backupMaster1 = ServerName.valueOf("localhost", 2222, -1);
+    ServerName backupMaster2 = ServerName.valueOf("localhost", 3333, -1);
+    String backupZNode1 = ZNodePaths.joinZNode(
+        zk.getZNodePaths().backupMasterAddressesZNode, backupMaster1.toString());
+    String backupZNode2 = ZNodePaths.joinZNode(
+        zk.getZNodePaths().backupMasterAddressesZNode, backupMaster2.toString());
+    // Add a backup master
+    MasterAddressTracker.setMasterAddress(zk, backupZNode1, backupMaster1, 2222);
+    MasterAddressTracker.setMasterAddress(zk, backupZNode2, backupMaster2, 3333);
+    backupMasters = addressTracker.getBackupMasters();
+    assertEquals(2, backupMasters.size());
+    assertTrue(backupMasters.contains(backupMaster1));
+    assertTrue(backupMasters.contains(backupMaster2));
+  }
+
   public static class NodeCreationListener extends ZKListener {
     private static final Logger LOG = LoggerFactory.getLogger(NodeCreationListener.class);
 
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java
index 915244e..3206238 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java
@@ -20,6 +20,10 @@ package org.apache.hadoop.hbase.zookeeper;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
@@ -278,4 +282,58 @@ public class MasterAddressTracker extends ZKNodeTracker {
 
     return false;
   }
+
+  public List<ServerName> getBackupMasters() throws InterruptedIOException {
+    return getBackupMastersAndRenewWatch(watcher);
+  }
+
+  /**
+   * Retrieves the list of registered backup masters and renews a watch on the znode for children
+   * updates.
+   * @param zkw Zookeeper watcher to use
+   * @return List of backup masters.
+   * @throws InterruptedIOException if there is any issue fetching the required data from Zookeeper.
+   */
+  public static List<ServerName> getBackupMastersAndRenewWatch(
+      ZKWatcher zkw) throws InterruptedIOException {
+    // Build Set of backup masters from ZK nodes
+    List<String> backupMasterStrings = Collections.emptyList();
+    try {
+      backupMasterStrings = ZKUtil.listChildrenAndWatchForNewChildren(zkw,
+          zkw.getZNodePaths().backupMasterAddressesZNode);
+    } catch (KeeperException e) {
+      LOG.warn(zkw.prefix("Unable to list backup servers"), e);
+    }
+
+    List<ServerName> backupMasters = Collections.emptyList();
+    if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
+      backupMasters = new ArrayList<>(backupMasterStrings.size());
+      for (String s: backupMasterStrings) {
+        try {
+          byte [] bytes;
+          try {
+            bytes = ZKUtil.getData(zkw, ZNodePaths.joinZNode(
+                zkw.getZNodePaths().backupMasterAddressesZNode, s));
+          } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+          }
+          if (bytes != null) {
+            ServerName sn;
+            try {
+              sn = ProtobufUtil.parseServerNameFrom(bytes);
+            } catch (DeserializationException e) {
+              LOG.warn("Failed parse, skipping registering backup server", e);
+              continue;
+            }
+            backupMasters.add(sn);
+          }
+        } catch (KeeperException e) {
+          LOG.warn(zkw.prefix("Unable to get information about " +
+              "backup servers"), e);
+        }
+      }
+      backupMasters.sort(Comparator.comparing(ServerName::getServerName));
+    }
+    return backupMasters;
+  }
 }