You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/06/27 07:48:08 UTC

[hbase] branch HBASE-11288.splittable-meta updated: HBASE-24389 Introduce new master rpc methods to locate meta region through root region (#1774)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-11288.splittable-meta
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-11288.splittable-meta by this push:
     new 967b04a  HBASE-24389 Introduce new master rpc methods to locate meta region through root region (#1774)
967b04a is described below

commit 967b04a2a2aa4b07e23e064f1172c6230aeded1f
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Jun 27 15:47:51 2020 +0800

    HBASE-24389 Introduce new master rpc methods to locate meta region through root region (#1774)
    
    Signed-off-by: stack <st...@apache.org>
---
 .../apache/hadoop/hbase/CatalogFamilyFormat.java   |  30 +
 .../hadoop/hbase/ClientMetaTableAccessor.java      |  29 +-
 .../client/AbstractAsyncTableRegionLocator.java    | 308 ++++++++++
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |  39 +-
 .../hbase/client/AsyncMetaRegionLocator.java       | 150 -----
 .../hbase/client/AsyncMetaTableRegionLocator.java  | 152 +++++
 .../hbase/client/AsyncNonMetaRegionLocator.java    | 670 ---------------------
 .../client/AsyncNonMetaTableRegionLocator.java     | 148 +++++
 .../hadoop/hbase/client/AsyncRegionLocator.java    | 191 ++++--
 .../hbase/client/AsyncRegionLocatorHelper.java     |  42 +-
 .../hbase/client/AsyncTableRegionLocator.java      |   4 +-
 .../hbase/client/AsyncTableRegionLocatorImpl.java  |   8 +-
 .../hadoop/hbase/client/ConnectionRegistry.java    |   6 -
 .../hadoop/hbase/client/ConnectionUtils.java       |  18 -
 .../apache/hadoop/hbase/client/MasterRegistry.java |   2 +-
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    |  47 +-
 .../hadoop/hbase/client/RegionLocateType.java      |   5 +-
 .../hbase/client/TableRegionLocationCache.java     | 226 +++++++
 .../hadoop/hbase/client/ZKConnectionRegistry.java  |  33 +-
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  27 +
 .../hbase/client/DoNothingConnectionRegistry.java  |   6 -
 .../client/TestAsyncMetaRegionLocatorFailFast.java |  67 ---
 .../src/main/protobuf/server/master/Master.proto   |  35 ++
 .../hadoop/hbase/coprocessor/MasterObserver.java   |  44 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    | 109 +++-
 .../hadoop/hbase/master/MasterCoprocessorHost.java |  40 ++
 .../hadoop/hbase/master/MasterMetaBootstrap.java   |  39 +-
 .../hadoop/hbase/master/MasterRpcServices.java     | 128 ++--
 .../apache/hadoop/hbase/master/MasterServices.java |   9 +
 .../hadoop/hbase/master/MasterStatusServlet.java   |   5 +-
 .../hbase/master/MetaRegionLocationCache.java      |   9 +-
 .../hbase/master/assignment/RegionStateStore.java  |   6 +-
 .../master/procedure/CreateTableProcedure.java     |   2 -
 .../hbase/master/procedure/ProcedureSyncWait.java  |  14 -
 .../master/snapshot/MasterSnapshotVerifier.java    |   8 +-
 .../hbase/master/snapshot/TakeSnapshotHandler.java |  14 +-
 .../flush/MasterFlushTableProcedureManager.java    |  18 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   4 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  18 +-
 .../main/resources/hbase-webapps/master/table.jsp  |  10 +-
 .../apache/hadoop/hbase/TestMetaTableAccessor.java |  10 +-
 .../apache/hadoop/hbase/TestMetaTableLocator.java  | 206 -------
 .../hbase/client/AbstractTestRegionLocator.java    |   5 +-
 .../hbase/client/DummyConnectionRegistry.java      |   6 -
 .../hbase/client/MetaWithReplicasTestBase.java     |   9 +-
 .../hbase/client/RegionReplicaTestHelper.java      |  24 +-
 .../client/TestAsyncAdminWithRegionReplicas.java   |   8 +-
 .../hbase/client/TestAsyncMetaRegionLocator.java   |  24 +-
 .../client/TestAsyncNonMetaRegionLocator.java      |  28 +-
 .../hbase/client/TestAsyncRegionAdminApi2.java     |  45 +-
 ... => TestAsyncRegionLocatorConcurrenyLimit.java} |  23 +-
 .../hbase/client/TestAsyncTableAdminApi.java       |  53 +-
 .../hbase/client/TestAsyncTableAdminApi3.java      |  24 +-
 .../hbase/client/TestAsyncTableLocatePrefetch.java |  14 +-
 .../hbase/client/TestAsyncTableRSCrashPublish.java |   3 +-
 .../client/TestAsyncTableUseMetaReplicas.java      |   4 +-
 .../hadoop/hbase/client/TestMasterRegistry.java    |   2 +-
 .../hbase/client/TestMetaRegionLocationCache.java  |  45 +-
 .../TestMetaWithReplicasShutdownHandling.java      |  15 +-
 .../hadoop/hbase/client/TestReplicasClient.java    |  14 +-
 .../hbase/client/TestZKConnectionRegistry.java     |  11 +-
 .../hadoop/hbase/master/AlwaysStandByHMaster.java  |   2 +-
 .../hbase/master/MockNoopMasterServices.java       |   7 +
 .../hadoop/hbase/master/TestMasterFailover.java    |  52 --
 .../master/TestMetaAssignmentWithStopMaster.java   |  16 +-
 .../hbase/master/TestMetaShutdownHandler.java      |  12 +-
 .../master/assignment/TestRegionReplicaSplit.java  |   5 +-
 .../TestCompactionLifeCycleTracker.java            |   4 +-
 .../hbase/regionserver/TestRegionReplicas.java     |   2 +-
 .../regionserver/TestRegionServerNoMaster.java     |  42 +-
 ...stRegionReplicaReplicationEndpointNoMaster.java |  58 +-
 .../hadoop/hbase/util/TestHBaseFsckEncryption.java |   2 +-
 .../hadoop/hbase/zookeeper/MetaTableLocator.java   | 301 +--------
 .../org/apache/hadoop/hbase/zookeeper/ZKUtil.java  |  13 +-
 74 files changed, 1808 insertions(+), 2001 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java
index a2c59de..710084d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hbase;
 
+import static org.apache.hadoop.hbase.HConstants.NINES;
+import static org.apache.hadoop.hbase.HConstants.ZEROES;
+import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
+
 import edu.umd.cs.findbugs.annotations.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -30,8 +34,11 @@ import java.util.regex.Pattern;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocateType;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -363,4 +370,27 @@ public class CatalogFamilyFormat {
     }
     return deleteReplicaLocations;
   }
+
+  private static byte[] buildRegionLocateStartRow(TableName tableName, byte[] row,
+    RegionLocateType locateType) {
+    if (locateType.equals(RegionLocateType.BEFORE)) {
+      if (Bytes.equals(row, HConstants.EMPTY_END_ROW)) {
+        byte[] binaryTableName = tableName.getName();
+        return Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
+      } else {
+        return createRegionName(tableName, row, ZEROES, false);
+      }
+    } else {
+      return createRegionName(tableName, row, NINES, false);
+    }
+  }
+
+  public static Scan createRegionLocateScan(TableName tableName, byte[] row,
+    RegionLocateType locateType, int prefetchLimit) {
+    byte[] startRow = buildRegionLocateStartRow(tableName, row, locateType);
+    byte[] stopRow = RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
+    return new Scan().withStartRow(startRow).withStopRow(stopRow, true)
+      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(prefetchLimit)
+      .setReadType(ReadType.PREAD);
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java
index ecc6573..74d2322 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java
@@ -164,26 +164,27 @@ public final class ClientMetaTableAccessor {
 
   /**
    * Used to get all region locations for the specific table.
-   * @param metaTable
    * @param tableName table we're looking for, can be null for getting all regions
    * @return the list of region locations. The return value will be wrapped by a
    *         {@link CompletableFuture}.
    */
   public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
-    AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) {
+    AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName,
+    boolean excludeOfflinedSplitParents) {
     CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
-    addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-      } else if (locations == null || locations.isEmpty()) {
-        future.complete(Collections.emptyList());
-      } else {
-        List<HRegionLocation> regionLocations =
-          locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
-            .collect(Collectors.toList());
-        future.complete(regionLocations);
-      }
-    });
+    addListener(getTableRegionsAndLocations(metaTable, tableName, excludeOfflinedSplitParents),
+      (locations, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+        } else if (locations == null || locations.isEmpty()) {
+          future.complete(Collections.emptyList());
+        } else {
+          List<HRegionLocation> regionLocations =
+            locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
+              .collect(Collectors.toList());
+          future.complete(regionLocations);
+        }
+      });
     return future;
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractAsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractAsyncTableRegionLocator.java
new file mode 100644
index 0000000..8048661
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractAsyncTableRegionLocator.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The base class for locating region of a table.
+ */
+@InterfaceAudience.Private
+abstract class AbstractAsyncTableRegionLocator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractAsyncTableRegionLocator.class);
+
+  protected final AsyncConnectionImpl conn;
+
+  protected final TableName tableName;
+
+  protected final int maxConcurrent;
+
+  protected final TableRegionLocationCache cache;
+
+  protected static final class LocateRequest {
+
+    final byte[] row;
+
+    final RegionLocateType locateType;
+
+    public LocateRequest(byte[] row, RegionLocateType locateType) {
+      this.row = row;
+      this.locateType = locateType;
+    }
+
+    @Override
+    public int hashCode() {
+      return Bytes.hashCode(row) ^ locateType.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || obj.getClass() != LocateRequest.class) {
+        return false;
+      }
+      LocateRequest that = (LocateRequest) obj;
+      return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
+    }
+  }
+
+  private final Set<LocateRequest> pendingRequests = new HashSet<>();
+
+  private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
+    new LinkedHashMap<>();
+
+  AbstractAsyncTableRegionLocator(AsyncConnectionImpl conn, TableName tableName,
+    int maxConcurrent) {
+    this.conn = conn;
+    this.tableName = tableName;
+    this.maxConcurrent = maxConcurrent;
+    this.cache = new TableRegionLocationCache(conn.getConnectionMetrics());
+  }
+
+  private boolean hasQuota() {
+    return pendingRequests.size() < maxConcurrent;
+  }
+
+  protected final Optional<LocateRequest> getCandidate() {
+    return allRequests.keySet().stream().filter(r -> !pendingRequests.contains(r)).findFirst();
+  }
+
+  void clearCompletedRequests(RegionLocations locations) {
+    for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
+      allRequests.entrySet().iterator(); iter.hasNext();) {
+      Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
+      if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
+        iter.remove();
+      }
+    }
+  }
+
+  private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
+    RegionLocations locations) {
+    if (future.isDone()) {
+      return true;
+    }
+    if (locations == null) {
+      return false;
+    }
+    HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
+    // we should at least have one location available, otherwise the request should fail and
+    // should not arrive here
+    assert loc != null;
+    boolean completed;
+    if (req.locateType.equals(RegionLocateType.BEFORE)) {
+      // for locating the row before current row, the common case is to find the previous region
+      // in reverse scan, so we check the endKey first. In general, the condition should be
+      // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
+      // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
+      // endKey.
+      byte[] endKey = loc.getRegion().getEndKey();
+      int c = Bytes.compareTo(endKey, req.row);
+      completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) &&
+        Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
+    } else {
+      completed = loc.getRegion().containsRow(req.row);
+    }
+    if (completed) {
+      future.complete(locations);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  protected final void onLocateComplete(LocateRequest req, RegionLocations locs, Throwable error) {
+    if (error != null) {
+      LOG.warn("Failed to locate region in '" + tableName + "', row='" +
+        Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
+    }
+    Optional<LocateRequest> toSend = Optional.empty();
+    if (locs != null) {
+      RegionLocations addedLocs = cache.add(locs);
+      synchronized (this) {
+        pendingRequests.remove(req);
+        clearCompletedRequests(addedLocs);
+        // Remove a complete locate request in a synchronized block, so the table cache must have
+        // quota to send a candidate request.
+        toSend = getCandidate();
+        toSend.ifPresent(pendingRequests::add);
+      }
+      toSend.ifPresent(this::locate);
+    } else {
+      // we meet an error
+      assert error != null;
+      synchronized (this) {
+        pendingRequests.remove(req);
+        // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
+        // already retried several times
+        CompletableFuture<?> future = allRequests.remove(req);
+        if (future != null) {
+          future.completeExceptionally(error);
+        }
+        clearCompletedRequests(null);
+        // Remove a complete locate request in a synchronized block, so the table cache must have
+        // quota to send a candidate request.
+        toSend = getCandidate();
+        toSend.ifPresent(pendingRequests::add);
+      }
+      toSend.ifPresent(this::locate);
+    }
+  }
+
+  // return false means you do not need to go on, just return. And you do not need to call the above
+  // onLocateComplete either when returning false, as we will call it in this method for you, this
+  // is why we need to pass the LocateRequest as a parameter.
+  protected final boolean validateRegionLocations(RegionLocations locs, LocateRequest req) {
+    // remove HRegionLocation with null location, i.e, getServerName returns null.
+    if (locs != null) {
+      locs = locs.removeElementsWithNullLocation();
+    }
+
+    // the default region location should always be presented when fetching from meta, otherwise
+    // let's fail the request.
+    if (locs == null || locs.getDefaultRegionLocation() == null) {
+      onLocateComplete(req, null,
+        new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
+          tableName, Bytes.toStringBinary(req.row), req.locateType)));
+      return false;
+    }
+    HRegionLocation loc = locs.getDefaultRegionLocation();
+    RegionInfo info = loc.getRegion();
+    if (info == null) {
+      onLocateComplete(req, null,
+        new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
+          tableName, Bytes.toStringBinary(req.row), req.locateType)));
+      return false;
+    }
+    return true;
+  }
+
+  protected abstract void locate(LocateRequest req);
+
+  abstract CompletableFuture<List<HRegionLocation>>
+    getAllRegionLocations(boolean excludeOfflinedSplitParents);
+
+  CompletableFuture<RegionLocations> getRegionLocations(byte[] row, int replicaId,
+    RegionLocateType locateType, boolean reload) {
+    if (locateType.equals(RegionLocateType.AFTER)) {
+      row = createClosestRowAfter(row);
+      locateType = RegionLocateType.CURRENT;
+    }
+    if (!reload) {
+      RegionLocations locs = cache.locate(tableName, row, replicaId, locateType);
+      if (isGood(locs, replicaId)) {
+        return CompletableFuture.completedFuture(locs);
+      }
+    }
+    CompletableFuture<RegionLocations> future;
+    LocateRequest req;
+    boolean sendRequest = false;
+    synchronized (this) {
+      // check again
+      if (!reload) {
+        RegionLocations locs = cache.locate(tableName, row, replicaId, locateType);
+        if (isGood(locs, replicaId)) {
+          return CompletableFuture.completedFuture(locs);
+        }
+      }
+      req = new LocateRequest(row, locateType);
+      future = allRequests.get(req);
+      if (future == null) {
+        future = new CompletableFuture<>();
+        allRequests.put(req, future);
+        if (hasQuota() && !pendingRequests.contains(req)) {
+          pendingRequests.add(req);
+          sendRequest = true;
+        }
+      }
+    }
+    if (sendRequest) {
+      locate(req);
+    }
+    return future;
+  }
+
+  void addToCache(RegionLocations locs) {
+    cache.add(locs);
+  }
+
+  // notice that this is not a constant time operation, do not call it on critical path.
+  int getCacheSize() {
+    return cache.size();
+  }
+
+  void clearPendingRequests() {
+    synchronized (this) {
+      if (!allRequests.isEmpty()) {
+        IOException error = new IOException("Cache cleared");
+        for (CompletableFuture<?> future : allRequests.values()) {
+          future.completeExceptionally(error);
+        }
+      }
+    }
+  }
+
+  void clearCache(ServerName serverName) {
+    cache.clearCache(serverName);
+  }
+
+  void removeLocationFromCache(HRegionLocation loc) {
+    cache.removeLocationFromCache(loc);
+  }
+
+  RegionLocations getInCache(byte[] key) {
+    return cache.get(key);
+  }
+
+  // only used for testing whether we have cached the location for a region.
+  @VisibleForTesting
+  RegionLocations locateInCache(byte[] row) {
+    return cache.locate(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID,
+      RegionLocateType.CURRENT);
+  }
+
+  // only used for testing whether we have cached the location for a table.
+  @VisibleForTesting
+  int getNumberOfCachedRegionLocations() {
+    return cache.getNumberOfCachedRegionLocations();
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 0f12e90..9b80025 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -38,7 +38,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.AuthUtil;
 import org.apache.hadoop.hbase.ChoreService;
@@ -59,6 +58,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@@ -84,11 +84,11 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   final AsyncConnectionConfiguration connConf;
 
-  private final User user;
+  final User user;
 
   final ConnectionRegistry registry;
 
-  private final int rpcTimeout;
+  final int rpcTimeout;
 
   protected final RpcClient rpcClient;
 
@@ -124,7 +124,7 @@ class AsyncConnectionImpl implements AsyncConnection {
   private volatile ConnectionOverAsyncConnection conn;
 
   public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
-      SocketAddress localAddress, User user) {
+    SocketAddress localAddress, User user) {
     this.conf = conf;
     this.user = user;
     if (user.isLoginFromKeytab()) {
@@ -137,8 +137,8 @@ class AsyncConnectionImpl implements AsyncConnection {
     } else {
       this.metrics = Optional.empty();
     }
-    this.rpcClient = RpcClientFactory.createClient(
-        conf, clusterId, localAddress, metrics.orElse(null));
+    this.rpcClient =
+      RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null));
     this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
     this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
     this.rpcTimeout =
@@ -162,14 +162,13 @@ class AsyncConnectionImpl implements AsyncConnection {
         LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
       } else {
         try {
-          listener = new ClusterStatusListener(
-            new ClusterStatusListener.DeadServerHandler() {
-              @Override
-              public void newDead(ServerName sn) {
-                locator.clearCache(sn);
-                rpcClient.cancelConnections(sn);
-              }
-            }, conf, listenerClass);
+          listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() {
+            @Override
+            public void newDead(ServerName sn) {
+              locator.clearCache(sn);
+              rpcClient.cancelConnections(sn);
+            }
+          }, conf, listenerClass);
         } catch (IOException e) {
           LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
         }
@@ -194,13 +193,13 @@ class AsyncConnectionImpl implements AsyncConnection {
   }
 
   @Override
-  public void close() {
+  public void close() throws IOException {
     if (!closed.compareAndSet(false, true)) {
       return;
     }
-    IOUtils.closeQuietly(clusterStatusListener);
-    IOUtils.closeQuietly(rpcClient);
-    IOUtils.closeQuietly(registry);
+    Closeables.close(clusterStatusListener, true);
+    Closeables.close(rpcClient, true);
+    Closeables.close(registry, true);
     if (authService != null) {
       authService.shutdown();
     }
@@ -312,7 +311,7 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   @Override
   public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
-      ExecutorService pool) {
+    ExecutorService pool) {
     return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
 
       @Override
@@ -353,7 +352,7 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   @Override
   public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
-      ExecutorService pool) {
+    ExecutorService pool) {
     return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
       RETRY_TIMER);
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
deleted file mode 100644
index 3571f960..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
-/**
- * The asynchronous locator for meta region.
- */
-@InterfaceAudience.Private
-class AsyncMetaRegionLocator {
-
-  private final ConnectionRegistry registry;
-
-  private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();
-
-  private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture =
-    new AtomicReference<>();
-
-  AsyncMetaRegionLocator(ConnectionRegistry registry) {
-    this.registry = registry;
-  }
-
-  /**
-   * Get the region locations for meta region. If the location for the given replica is not
-   * available in the cached locations, then fetch from the HBase cluster.
-   * <p/>
-   * The <code>replicaId</code> parameter is important. If the region replication config for meta
-   * region is changed, then the cached region locations may not have the locations for new
-   * replicas. If we do not check the location for the given replica, we will always return the
-   * cached region locations and cause an infinite loop.
-   */
-  CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
-    return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload,
-      registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location");
-  }
-
-  private HRegionLocation getCacheLocation(HRegionLocation loc) {
-    RegionLocations locs = metaRegionLocations.get();
-    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
-  }
-
-  private void addLocationToCache(HRegionLocation loc) {
-    for (;;) {
-      int replicaId = loc.getRegion().getReplicaId();
-      RegionLocations oldLocs = metaRegionLocations.get();
-      if (oldLocs == null) {
-        RegionLocations newLocs = createRegionLocations(loc);
-        if (metaRegionLocations.compareAndSet(null, newLocs)) {
-          return;
-        }
-      }
-      HRegionLocation oldLoc = oldLocs.getRegionLocation(replicaId);
-      if (oldLoc != null && (oldLoc.getSeqNum() > loc.getSeqNum() ||
-        oldLoc.getServerName().equals(loc.getServerName()))) {
-        return;
-      }
-      RegionLocations newLocs = replaceRegionLocation(oldLocs, loc);
-      if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
-        return;
-      }
-    }
-  }
-
-  private void removeLocationFromCache(HRegionLocation loc) {
-    for (;;) {
-      RegionLocations oldLocs = metaRegionLocations.get();
-      if (oldLocs == null) {
-        return;
-      }
-      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
-      if (!canUpdateOnError(loc, oldLoc)) {
-        return;
-      }
-      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
-      if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
-        return;
-      }
-    }
-  }
-
-  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
-    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
-      this::addLocationToCache, this::removeLocationFromCache, null);
-  }
-
-  void clearCache() {
-    metaRegionLocations.set(null);
-  }
-
-  void clearCache(ServerName serverName) {
-    for (;;) {
-      RegionLocations locs = metaRegionLocations.get();
-      if (locs == null) {
-        return;
-      }
-      RegionLocations newLocs = locs.removeByServer(serverName);
-      if (locs == newLocs) {
-        return;
-      }
-      if (newLocs.isEmpty()) {
-        newLocs = null;
-      }
-      if (metaRegionLocations.compareAndSet(locs, newLocs)) {
-        return;
-      }
-    }
-  }
-
-  // only used for testing whether we have cached the location for a region.
-  @VisibleForTesting
-  RegionLocations getRegionLocationInCache() {
-    return metaRegionLocations.get();
-  }
-
-  // only used for testing whether we have cached the location for a table.
-  @VisibleForTesting
-  int getNumberOfCachedRegionLocations() {
-    RegionLocations locs = metaRegionLocations.get();
-    return locs != null ? locs.numNonNullElements() : 0;
-  }
-}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java
new file mode 100644
index 0000000..16a1b8a
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService.Interface;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;
+
+/**
+ * The class for locating region for meta table.
+ */
+@InterfaceAudience.Private
+class AsyncMetaTableRegionLocator extends AbstractAsyncTableRegionLocator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncMetaTableRegionLocator.class);
+
+  private final AtomicReference<Interface> stub = new AtomicReference<>();
+
+  private final AtomicReference<CompletableFuture<Interface>> stubMakeFuture =
+    new AtomicReference<>();
+
+  AsyncMetaTableRegionLocator(AsyncConnectionImpl conn, TableName tableName, int maxConcurrent) {
+    super(conn, tableName, maxConcurrent);
+  }
+
+  private Interface createStub(ServerName serverName) throws IOException {
+    return ClientMetaService.newStub(conn.rpcClient.createRpcChannel(serverName, conn.user,
+      (int) TimeUnit.NANOSECONDS.toMillis(conn.connConf.getReadRpcTimeoutNs())));
+  }
+
+  CompletableFuture<Interface> getStub() {
+    return ConnectionUtils.getOrFetch(stub, stubMakeFuture, false, () -> {
+      CompletableFuture<Interface> future = new CompletableFuture<>();
+      addListener(conn.registry.getActiveMaster(), (addr, error) -> {
+        if (error != null) {
+          future.completeExceptionally(error);
+        } else if (addr == null) {
+          future.completeExceptionally(new MasterNotRunningException(
+            "ZooKeeper available but no active master location found"));
+        } else {
+          LOG.debug("The fetched master address is {}", addr);
+          try {
+            future.complete(createStub(addr));
+          } catch (IOException e) {
+            future.completeExceptionally(e);
+          }
+        }
+
+      });
+      return future;
+    }, stub -> true, "ClientLocateMetaStub");
+  }
+
+  private void tryClearMasterStubCache(IOException error, Interface currentStub) {
+    if (ClientExceptionsUtil.isConnectionException(error) ||
+      error instanceof ServerNotRunningYetException) {
+      stub.compareAndSet(currentStub, null);
+    }
+  }
+
+  @Override
+  protected void locate(LocateRequest req) {
+    addListener(getStub(), (stub, error) -> {
+      if (error != null) {
+        onLocateComplete(req, null, error);
+        return;
+      }
+      HBaseRpcController controller = conn.rpcControllerFactory.newController();
+      stub.locateMetaRegion(controller,
+        LocateMetaRegionRequest.newBuilder().setRow(ByteString.copyFrom(req.row))
+          .setLocateType(ProtobufUtil.toProtoRegionLocateType(req.locateType)).build(),
+        resp -> {
+          if (controller.failed()) {
+            IOException ex = controller.getFailed();
+            tryClearMasterStubCache(ex, stub);
+            onLocateComplete(req, null, ex);
+            return;
+          }
+          RegionLocations locs = new RegionLocations(resp.getMetaLocationsList().stream()
+            .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList()));
+          if (validateRegionLocations(locs, req)) {
+            onLocateComplete(req, locs, null);
+          }
+        });
+    });
+  }
+
+  @Override
+  CompletableFuture<List<HRegionLocation>>
+    getAllRegionLocations(boolean excludeOfflinedSplitParents) {
+    CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
+    addListener(getStub(), (stub, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      HBaseRpcController controller = conn.rpcControllerFactory.newController();
+      stub.getAllMetaRegionLocations(controller, GetAllMetaRegionLocationsRequest.newBuilder()
+        .setExcludeOfflinedSplitParents(excludeOfflinedSplitParents).build(), resp -> {
+          if (controller.failed()) {
+            IOException ex = controller.getFailed();
+            tryClearMasterStubCache(ex, stub);
+            future.completeExceptionally(ex);
+            return;
+          }
+          List<HRegionLocation> locs = resp.getMetaLocationsList().stream()
+            .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList());
+          future.complete(locs);
+        });
+    });
+    return future;
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
deleted file mode 100644
index b202168..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ /dev/null
@@ -1,670 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
-import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
-import static org.apache.hadoop.hbase.HConstants.NINES;
-import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
-import static org.apache.hadoop.hbase.HConstants.ZEROES;
-import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
-import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
-import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
-import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import org.apache.commons.lang3.ObjectUtils;
-import org.apache.hadoop.hbase.CatalogFamilyFormat;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Scan.ReadType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.base.Objects;
-
-/**
- * The asynchronous locator for regions other than meta.
- */
-@InterfaceAudience.Private
-class AsyncNonMetaRegionLocator {
-
-  private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class);
-
-  @VisibleForTesting
-  static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
-    "hbase.client.meta.max.concurrent.locate.per.table";
-
-  private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
-
-  @VisibleForTesting
-  static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
-
-  private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
-
-  private final AsyncConnectionImpl conn;
-
-  private final int maxConcurrentLocateRequestPerTable;
-
-  private final int locatePrefetchLimit;
-
-  private final boolean useMetaReplicas;
-
-  private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
-
-  private static final class LocateRequest {
-
-    private final byte[] row;
-
-    private final RegionLocateType locateType;
-
-    public LocateRequest(byte[] row, RegionLocateType locateType) {
-      this.row = row;
-      this.locateType = locateType;
-    }
-
-    @Override
-    public int hashCode() {
-      return Bytes.hashCode(row) ^ locateType.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == null || obj.getClass() != LocateRequest.class) {
-        return false;
-      }
-      LocateRequest that = (LocateRequest) obj;
-      return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
-    }
-  }
-
-  private static final class TableCache {
-
-    private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
-      new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
-
-    private final Set<LocateRequest> pendingRequests = new HashSet<>();
-
-    private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
-      new LinkedHashMap<>();
-
-    public boolean hasQuota(int max) {
-      return pendingRequests.size() < max;
-    }
-
-    public boolean isPending(LocateRequest req) {
-      return pendingRequests.contains(req);
-    }
-
-    public void send(LocateRequest req) {
-      pendingRequests.add(req);
-    }
-
-    public Optional<LocateRequest> getCandidate() {
-      return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
-    }
-
-    public void clearCompletedRequests(RegionLocations locations) {
-      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
-        allRequests.entrySet().iterator(); iter.hasNext();) {
-        Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
-        if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
-          iter.remove();
-        }
-      }
-    }
-
-    private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
-        RegionLocations locations) {
-      if (future.isDone()) {
-        return true;
-      }
-      if (locations == null) {
-        return false;
-      }
-      HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
-      // we should at least have one location available, otherwise the request should fail and
-      // should not arrive here
-      assert loc != null;
-      boolean completed;
-      if (req.locateType.equals(RegionLocateType.BEFORE)) {
-        // for locating the row before current row, the common case is to find the previous region
-        // in reverse scan, so we check the endKey first. In general, the condition should be
-        // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
-        // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
-        // endKey.
-        byte[] endKey = loc.getRegion().getEndKey();
-        int c = Bytes.compareTo(endKey, req.row);
-        completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) &&
-          Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
-      } else {
-        completed = loc.getRegion().containsRow(req.row);
-      }
-      if (completed) {
-        future.complete(locations);
-        return true;
-      } else {
-        return false;
-      }
-    }
-  }
-
-  AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
-    this.conn = conn;
-    this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
-      MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
-    this.locatePrefetchLimit =
-      conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
-    this.useMetaReplicas =
-      conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
-  }
-
-  private TableCache getTableCache(TableName tableName) {
-    return computeIfAbsent(cache, tableName, TableCache::new);
-  }
-
-  private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
-    HRegionLocation[] locArr1 = locs1.getRegionLocations();
-    HRegionLocation[] locArr2 = locs2.getRegionLocations();
-    if (locArr1.length != locArr2.length) {
-      return false;
-    }
-    for (int i = 0; i < locArr1.length; i++) {
-      // do not need to compare region info
-      HRegionLocation loc1 = locArr1[i];
-      HRegionLocation loc2 = locArr2[i];
-      if (loc1 == null) {
-        if (loc2 != null) {
-          return false;
-        }
-      } else {
-        if (loc2 == null) {
-          return false;
-        }
-        if (loc1.getSeqNum() != loc2.getSeqNum()) {
-          return false;
-        }
-        if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  // if we successfully add the locations to cache, return the locations, otherwise return the one
-  // which prevents us being added. The upper layer can use this value to complete pending requests.
-  private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) {
-    LOG.trace("Try adding {} to cache", locs);
-    byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
-    for (;;) {
-      RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
-      if (oldLocs == null) {
-        return locs;
-      }
-      // check whether the regions are the same, this usually happens when table is split/merged, or
-      // deleted and recreated again.
-      RegionInfo region = locs.getRegionLocation().getRegion();
-      RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
-      if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
-        RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
-        if (isEqual(mergedLocs, oldLocs)) {
-          // the merged one is the same with the old one, give up
-          LOG.trace("Will not add {} to cache because the old value {} " +
-            " is newer than us or has the same server name." +
-            " Maybe it is updated before we replace it", locs, oldLocs);
-          return oldLocs;
-        }
-        if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
-          return mergedLocs;
-        }
-      } else {
-        // the region is different, here we trust the one we fetched. This maybe wrong but finally
-        // the upper layer can detect this and trigger removal of the wrong locations
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," +
-            " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
-        }
-        if (tableCache.cache.replace(startKey, oldLocs, locs)) {
-          return locs;
-        }
-      }
-    }
-  }
-
-  private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
-      Throwable error) {
-    if (error != null) {
-      LOG.warn("Failed to locate region in '" + tableName + "', row='" +
-        Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
-    }
-    Optional<LocateRequest> toSend = Optional.empty();
-    TableCache tableCache = getTableCache(tableName);
-    if (locs != null) {
-      RegionLocations addedLocs = addToCache(tableCache, locs);
-      synchronized (tableCache) {
-        tableCache.pendingRequests.remove(req);
-        tableCache.clearCompletedRequests(addedLocs);
-        // Remove a complete locate request in a synchronized block, so the table cache must have
-        // quota to send a candidate request.
-        toSend = tableCache.getCandidate();
-        toSend.ifPresent(r -> tableCache.send(r));
-      }
-      toSend.ifPresent(r -> locateInMeta(tableName, r));
-    } else {
-      // we meet an error
-      assert error != null;
-      synchronized (tableCache) {
-        tableCache.pendingRequests.remove(req);
-        // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
-        // already retried several times
-        CompletableFuture<?> future = tableCache.allRequests.remove(req);
-        if (future != null) {
-          future.completeExceptionally(error);
-        }
-        tableCache.clearCompletedRequests(null);
-        // Remove a complete locate request in a synchronized block, so the table cache must have
-        // quota to send a candidate request.
-        toSend = tableCache.getCandidate();
-        toSend.ifPresent(r -> tableCache.send(r));
-      }
-      toSend.ifPresent(r -> locateInMeta(tableName, r));
-    }
-  }
-
-  // return whether we should stop the scan
-  private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
-    RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
-        Bytes.toStringBinary(req.row), req.locateType, locs);
-    }
-    // remove HRegionLocation with null location, i.e, getServerName returns null.
-    if (locs != null) {
-      locs = locs.removeElementsWithNullLocation();
-    }
-
-    // the default region location should always be presented when fetching from meta, otherwise
-    // let's fail the request.
-    if (locs == null || locs.getDefaultRegionLocation() == null) {
-      complete(tableName, req, null,
-        new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
-          tableName, Bytes.toStringBinary(req.row), req.locateType)));
-      return true;
-    }
-    HRegionLocation loc = locs.getDefaultRegionLocation();
-    RegionInfo info = loc.getRegion();
-    if (info == null) {
-      complete(tableName, req, null,
-        new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
-          tableName, Bytes.toStringBinary(req.row), req.locateType)));
-      return true;
-    }
-    if (info.isSplitParent()) {
-      return false;
-    }
-    complete(tableName, req, locs, null);
-    return true;
-  }
-
-  private void recordCacheHit() {
-    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit);
-  }
-
-  private void recordCacheMiss() {
-    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
-  }
-
-  private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
-      int replicaId) {
-    Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
-    if (entry == null) {
-      recordCacheMiss();
-      return null;
-    }
-    RegionLocations locs = entry.getValue();
-    HRegionLocation loc = locs.getRegionLocation(replicaId);
-    if (loc == null) {
-      recordCacheMiss();
-      return null;
-    }
-    byte[] endKey = loc.getRegion().getEndKey();
-    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
-          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
-      }
-      recordCacheHit();
-      return locs;
-    } else {
-      recordCacheMiss();
-      return null;
-    }
-  }
-
-  private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
-      byte[] row, int replicaId) {
-    boolean isEmptyStopRow = isEmptyStopRow(row);
-    Map.Entry<byte[], RegionLocations> entry =
-      isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
-    if (entry == null) {
-      recordCacheMiss();
-      return null;
-    }
-    RegionLocations locs = entry.getValue();
-    HRegionLocation loc = locs.getRegionLocation(replicaId);
-    if (loc == null) {
-      recordCacheMiss();
-      return null;
-    }
-    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
-      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
-          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
-      }
-      recordCacheHit();
-      return locs;
-    } else {
-      recordCacheMiss();
-      return null;
-    }
-  }
-
-  private void locateInMeta(TableName tableName, LocateRequest req) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) +
-        "', locateType=" + req.locateType + " in meta");
-    }
-    byte[] metaStartKey;
-    if (req.locateType.equals(RegionLocateType.BEFORE)) {
-      if (isEmptyStopRow(req.row)) {
-        byte[] binaryTableName = tableName.getName();
-        metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
-      } else {
-        metaStartKey = createRegionName(tableName, req.row, ZEROES, false);
-      }
-    } else {
-      metaStartKey = createRegionName(tableName, req.row, NINES, false);
-    }
-    byte[] metaStopKey =
-      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
-    Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
-      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
-      .setReadType(ReadType.PREAD);
-    if (useMetaReplicas) {
-      scan.setConsistency(Consistency.TIMELINE);
-    }
-    conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
-
-      private boolean completeNormally = false;
-
-      private boolean tableNotFound = true;
-
-      @Override
-      public void onError(Throwable error) {
-        complete(tableName, req, null, error);
-      }
-
-      @Override
-      public void onComplete() {
-        if (tableNotFound) {
-          complete(tableName, req, null, new TableNotFoundException(tableName));
-        } else if (!completeNormally) {
-          complete(tableName, req, null, new IOException(
-            "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
-        }
-      }
-
-      @Override
-      public void onNext(Result[] results, ScanController controller) {
-        if (results.length == 0) {
-          return;
-        }
-        tableNotFound = false;
-        int i = 0;
-        for (; i < results.length; i++) {
-          if (onScanNext(tableName, req, results[i])) {
-            completeNormally = true;
-            controller.terminate();
-            i++;
-            break;
-          }
-        }
-        // Add the remaining results into cache
-        if (i < results.length) {
-          TableCache tableCache = getTableCache(tableName);
-          for (; i < results.length; i++) {
-            RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]);
-            if (locs == null) {
-              continue;
-            }
-            HRegionLocation loc = locs.getDefaultRegionLocation();
-            if (loc == null) {
-              continue;
-            }
-            RegionInfo info = loc.getRegion();
-            if (info == null || info.isOffline() || info.isSplitParent()) {
-              continue;
-            }
-            RegionLocations addedLocs = addToCache(tableCache, locs);
-            synchronized (tableCache) {
-              tableCache.clearCompletedRequests(addedLocs);
-            }
-          }
-        }
-      }
-    });
-  }
-
-  private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
-      int replicaId, RegionLocateType locateType) {
-    return locateType.equals(RegionLocateType.BEFORE)
-      ? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
-      : locateRowInCache(tableCache, tableName, row, replicaId);
-  }
-
-  // locateToPrevious is true means we will use the start key of a region to locate the region
-  // placed before it. Used for reverse scan. See the comment of
-  // AsyncRegionLocator.getPreviousRegionLocation.
-  private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
-      byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
-    // AFTER should be convert to CURRENT before calling this method
-    assert !locateType.equals(RegionLocateType.AFTER);
-    TableCache tableCache = getTableCache(tableName);
-    if (!reload) {
-      RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
-      if (isGood(locs, replicaId)) {
-        return CompletableFuture.completedFuture(locs);
-      }
-    }
-    CompletableFuture<RegionLocations> future;
-    LocateRequest req;
-    boolean sendRequest = false;
-    synchronized (tableCache) {
-      // check again
-      if (!reload) {
-        RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
-        if (isGood(locs, replicaId)) {
-          return CompletableFuture.completedFuture(locs);
-        }
-      }
-      req = new LocateRequest(row, locateType);
-      future = tableCache.allRequests.get(req);
-      if (future == null) {
-        future = new CompletableFuture<>();
-        tableCache.allRequests.put(req, future);
-        if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) {
-          tableCache.send(req);
-          sendRequest = true;
-        }
-      }
-    }
-    if (sendRequest) {
-      locateInMeta(tableName, req);
-    }
-    return future;
-  }
-
-  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
-      int replicaId, RegionLocateType locateType, boolean reload) {
-    // as we know the exact row after us, so we can just create the new row, and use the same
-    // algorithm to locate it.
-    if (locateType.equals(RegionLocateType.AFTER)) {
-      row = createClosestRowAfter(row);
-      locateType = RegionLocateType.CURRENT;
-    }
-    return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
-  }
-
-  private void recordClearRegionCache() {
-    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
-  }
-
-  private void removeLocationFromCache(HRegionLocation loc) {
-    TableCache tableCache = cache.get(loc.getRegion().getTable());
-    if (tableCache == null) {
-      return;
-    }
-    byte[] startKey = loc.getRegion().getStartKey();
-    for (;;) {
-      RegionLocations oldLocs = tableCache.cache.get(startKey);
-      if (oldLocs == null) {
-        return;
-      }
-      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
-      if (!canUpdateOnError(loc, oldLoc)) {
-        return;
-      }
-      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
-      if (newLocs == null) {
-        if (tableCache.cache.remove(startKey, oldLocs)) {
-          recordClearRegionCache();
-          return;
-        }
-      } else {
-        if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
-          recordClearRegionCache();
-          return;
-        }
-      }
-    }
-  }
-
-  private void addLocationToCache(HRegionLocation loc) {
-    addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
-  }
-
-  private HRegionLocation getCachedLocation(HRegionLocation loc) {
-    TableCache tableCache = cache.get(loc.getRegion().getTable());
-    if (tableCache == null) {
-      return null;
-    }
-    RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
-    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
-  }
-
-  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
-    Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics();
-    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
-      this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null));
-  }
-
-  void clearCache(TableName tableName) {
-    TableCache tableCache = cache.remove(tableName);
-    if (tableCache == null) {
-      return;
-    }
-    synchronized (tableCache) {
-      if (!tableCache.allRequests.isEmpty()) {
-        IOException error = new IOException("Cache cleared");
-        tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
-      }
-    }
-    conn.getConnectionMetrics()
-      .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
-  }
-
-  void clearCache() {
-    cache.clear();
-  }
-
-  void clearCache(ServerName serverName) {
-    for (TableCache tableCache : cache.values()) {
-      for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) {
-        byte[] regionName = entry.getKey();
-        RegionLocations locs = entry.getValue();
-        RegionLocations newLocs = locs.removeByServer(serverName);
-        if (locs == newLocs) {
-          continue;
-        }
-        if (newLocs.isEmpty()) {
-          tableCache.cache.remove(regionName, locs);
-        } else {
-          tableCache.cache.replace(regionName, locs, newLocs);
-        }
-      }
-    }
-  }
-
-  // only used for testing whether we have cached the location for a region.
-  @VisibleForTesting
-  RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
-    TableCache tableCache = cache.get(tableName);
-    if (tableCache == null) {
-      return null;
-    }
-    return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
-  }
-
-  // only used for testing whether we have cached the location for a table.
-  @VisibleForTesting
-  int getNumberOfCachedRegionLocations(TableName tableName) {
-    TableCache tableCache = cache.get(tableName);
-    if (tableCache == null) {
-      return 0;
-    }
-    return tableCache.cache.values().stream().mapToInt(RegionLocations::numNonNullElements).sum();
-  }
-}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaTableRegionLocator.java
new file mode 100644
index 0000000..09b3133
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaTableRegionLocator.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.CatalogFamilyFormat;
+import org.apache.hadoop.hbase.ClientMetaTableAccessor;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class for locating region for table other than meta.
+ */
+@InterfaceAudience.Private
+class AsyncNonMetaTableRegionLocator extends AbstractAsyncTableRegionLocator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaTableRegionLocator.class);
+
+  private final int prefetchLimit;
+
+  private final boolean useMetaReplicas;
+
+  AsyncNonMetaTableRegionLocator(AsyncConnectionImpl conn, TableName tableName, int maxConcurrent,
+    int prefetchLimit, boolean useMetaReplicas) {
+    super(conn, tableName, maxConcurrent);
+    this.prefetchLimit = prefetchLimit;
+    this.useMetaReplicas = useMetaReplicas;
+  }
+
+  // return whether we should stop the scan
+  private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
+    RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
+        Bytes.toStringBinary(req.row), req.locateType, locs);
+    }
+    if (!validateRegionLocations(locs, req)) {
+      return true;
+    }
+    if (locs.getDefaultRegionLocation().getRegion().isSplitParent()) {
+      return false;
+    }
+    onLocateComplete(req, locs, null);
+    return true;
+  }
+
+  @Override
+  protected void locate(LocateRequest req) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Try locate '{}', row='{}', locateType={} in meta", tableName,
+        Bytes.toStringBinary(req.row), req.locateType);
+    }
+    Scan scan =
+      CatalogFamilyFormat.createRegionLocateScan(tableName, req.row, req.locateType, prefetchLimit);
+    if (useMetaReplicas) {
+      scan.setConsistency(Consistency.TIMELINE);
+    }
+    conn.getTable(TableName.META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
+
+      private boolean completeNormally = false;
+
+      private boolean tableNotFound = true;
+
+      @Override
+      public void onError(Throwable error) {
+        onLocateComplete(req, null, error);
+      }
+
+      @Override
+      public void onComplete() {
+        if (tableNotFound) {
+          onLocateComplete(req, null, new TableNotFoundException(tableName));
+        } else if (!completeNormally) {
+          onLocateComplete(req, null, new IOException(
+            "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
+        }
+      }
+
+      @Override
+      public void onNext(Result[] results, ScanController controller) {
+        if (results.length == 0) {
+          return;
+        }
+        tableNotFound = false;
+        int i = 0;
+        for (; i < results.length; i++) {
+          if (onScanNext(tableName, req, results[i])) {
+            completeNormally = true;
+            controller.terminate();
+            i++;
+            break;
+          }
+        }
+        // Add the remaining results into cache
+        if (i < results.length) {
+          for (; i < results.length; i++) {
+            RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]);
+            if (locs == null) {
+              continue;
+            }
+            HRegionLocation loc = locs.getDefaultRegionLocation();
+            if (loc == null) {
+              continue;
+            }
+            RegionInfo info = loc.getRegion();
+            if (info == null || info.isOffline() || info.isSplitParent()) {
+              continue;
+            }
+            RegionLocations addedLocs = cache.add(locs);
+            synchronized (this) {
+              clearCompletedRequests(addedLocs);
+            }
+          }
+        }
+      }
+    });
+  }
+
+  @Override
+  CompletableFuture<List<HRegionLocation>>
+    getAllRegionLocations(boolean excludeOfflinedSplitParents) {
+    return ClientMetaTableAccessor.getTableHRegionLocations(
+      conn.getTable(TableName.META_TABLE_NAME), tableName, excludeOfflinedSplitParents);
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 09eabfc..fce59a7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -17,10 +17,19 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
+import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -31,8 +40,6 @@ import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
@@ -44,25 +51,51 @@ import org.apache.hbase.thirdparty.io.netty.util.Timeout;
 @InterfaceAudience.Private
 class AsyncRegionLocator {
 
-  private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocator.class);
+  @VisibleForTesting
+  static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
+    "hbase.client.meta.max.concurrent.locate.per.table";
+
+  private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
+
+  @VisibleForTesting
+  static final String MAX_CONCURRENT_LOCATE_META_REQUEST =
+    "hbase.client.meta.max.concurrent.locate";
+
+  @VisibleForTesting
+  static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
+
+  private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
 
   private final HashedWheelTimer retryTimer;
 
   private final AsyncConnectionImpl conn;
 
-  private final AsyncMetaRegionLocator metaRegionLocator;
+  private final int maxConcurrentLocateRequestPerTable;
+
+  private final int maxConcurrentLocateMetaRequest;
+
+  private final int locatePrefetchLimit;
 
-  private final AsyncNonMetaRegionLocator nonMetaRegionLocator;
+  private final boolean useMetaReplicas;
 
-  AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
+  private final ConcurrentMap<TableName, AbstractAsyncTableRegionLocator> table2Locator =
+    new ConcurrentHashMap<>();
+
+  public AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
     this.conn = conn;
-    this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry);
-    this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn);
     this.retryTimer = retryTimer;
+    this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
+      MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
+    this.maxConcurrentLocateMetaRequest = conn.getConfiguration()
+      .getInt(MAX_CONCURRENT_LOCATE_META_REQUEST, maxConcurrentLocateRequestPerTable);
+    this.locatePrefetchLimit =
+      conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
+    this.useMetaReplicas =
+      conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
   }
 
   private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs,
-      Supplier<String> timeoutMsg) {
+    Supplier<String> timeoutMsg) {
     if (future.isDone() || timeoutNs <= 0) {
       return future;
     }
@@ -85,12 +118,28 @@ class AsyncRegionLocator {
     return TableName.isMetaTableName(tableName);
   }
 
+  private AbstractAsyncTableRegionLocator getOrCreateTableRegionLocator(TableName tableName) {
+    return computeIfAbsent(table2Locator, tableName, () -> {
+      if (isMeta(tableName)) {
+        return new AsyncMetaTableRegionLocator(conn, tableName, maxConcurrentLocateMetaRequest);
+      } else {
+        return new AsyncNonMetaTableRegionLocator(conn, tableName,
+          maxConcurrentLocateRequestPerTable, locatePrefetchLimit, useMetaReplicas);
+      }
+    });
+  }
+
+  @VisibleForTesting
   CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
-      RegionLocateType type, boolean reload, long timeoutNs) {
-    CompletableFuture<RegionLocations> future = isMeta(tableName)
-      ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload)
-      : nonMetaRegionLocator.getRegionLocations(tableName, row,
-        RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
+    int replicaId, RegionLocateType locateType, boolean reload) {
+    return getOrCreateTableRegionLocator(tableName).getRegionLocations(row, replicaId, locateType,
+      reload);
+  }
+
+  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
+    RegionLocateType type, boolean reload, long timeoutNs) {
+    CompletableFuture<RegionLocations> future =
+      getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
     return withTimeout(future, timeoutNs,
       () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
         "ms) waiting for region locations for " + tableName + ", row='" +
@@ -98,13 +147,12 @@ class AsyncRegionLocator {
   }
 
   CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
-      int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
+    int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
     // meta region can not be split right now so we always call the same method.
     // Change it later if the meta table can have more than one regions.
     CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
     CompletableFuture<RegionLocations> locsFuture =
-      isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload)
-        : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
+      getRegionLocations(tableName, row, replicaId, type, reload);
     addListener(locsFuture, (locs, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
@@ -131,72 +179,115 @@ class AsyncRegionLocator {
   }
 
   CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
-      int replicaId, RegionLocateType type, long timeoutNs) {
+    int replicaId, RegionLocateType type, long timeoutNs) {
     return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs);
   }
 
   CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
-      RegionLocateType type, boolean reload, long timeoutNs) {
+    RegionLocateType type, boolean reload, long timeoutNs) {
     return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload,
       timeoutNs);
   }
 
   CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
-      RegionLocateType type, long timeoutNs) {
+    RegionLocateType type, long timeoutNs) {
     return getRegionLocation(tableName, row, type, false, timeoutNs);
   }
 
-  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
-    if (loc.getRegion().isMetaRegion()) {
-      metaRegionLocator.updateCachedLocationOnError(loc, exception);
-    } else {
-      nonMetaRegionLocator.updateCachedLocationOnError(loc, exception);
+  /**
+   * Get all region locations for a table.
+   * <p/>
+   * Notice that this method will not read from cache.
+   */
+  CompletableFuture<List<HRegionLocation>> getAllRegionLocations(TableName tableName,
+    boolean excludeOfflinedSplitParents) {
+    CompletableFuture<List<HRegionLocation>> future =
+      getOrCreateTableRegionLocator(tableName).getAllRegionLocations(excludeOfflinedSplitParents);
+    addListener(future, (locs, error) -> {
+      if (error != null) {
+        return;
+      }
+      // add locations to cache
+      AbstractAsyncTableRegionLocator locator = getOrCreateTableRegionLocator(tableName);
+      Map<RegionInfo, List<HRegionLocation>> map = new HashMap<>();
+      for (HRegionLocation loc : locs) {
+        // do not cache split parent
+        if (loc.getRegion() != null && !loc.getRegion().isSplitParent()) {
+          map.computeIfAbsent(RegionReplicaUtil.getRegionInfoForDefaultReplica(loc.getRegion()),
+            k -> new ArrayList<>()).add(loc);
+        }
+      }
+      for (List<HRegionLocation> l : map.values()) {
+        locator.addToCache(new RegionLocations(l));
+      }
+    });
+    return future;
+  }
+
+  private void removeLocationFromCache(HRegionLocation loc) {
+    AbstractAsyncTableRegionLocator locator = table2Locator.get(loc.getRegion().getTable());
+    if (locator == null) {
+      return;
     }
+    locator.removeLocationFromCache(loc);
   }
 
-  void clearCache(TableName tableName) {
-    LOG.debug("Clear meta cache for {}", tableName);
-    if (tableName.equals(META_TABLE_NAME)) {
-      metaRegionLocator.clearCache();
-    } else {
-      nonMetaRegionLocator.clearCache(tableName);
+  private void addLocationToCache(HRegionLocation loc) {
+    getOrCreateTableRegionLocator(loc.getRegion().getTable())
+      .addToCache(createRegionLocations(loc));
+  }
+
+  private HRegionLocation getCachedLocation(HRegionLocation loc) {
+    AbstractAsyncTableRegionLocator locator = table2Locator.get(loc.getRegion().getTable());
+    if (locator == null) {
+      return null;
     }
+    RegionLocations locs = locator.getInCache(loc.getRegion().getStartKey());
+    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
   }
 
-  void clearCache(ServerName serverName) {
-    LOG.debug("Clear meta cache for {}", serverName);
-    metaRegionLocator.clearCache(serverName);
-    nonMetaRegionLocator.clearCache(serverName);
-    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
+  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
+      this::addLocationToCache, this::removeLocationFromCache, conn.getConnectionMetrics());
+  }
+
+  void clearCache(TableName tableName) {
+    AbstractAsyncTableRegionLocator locator = table2Locator.remove(tableName);
+    if (locator == null) {
+      return;
+    }
+    locator.clearPendingRequests();
+    conn.getConnectionMetrics()
+      .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(locator.getCacheSize()));
   }
 
   void clearCache() {
-    metaRegionLocator.clearCache();
-    nonMetaRegionLocator.clearCache();
+    table2Locator.clear();
   }
 
-  @VisibleForTesting
-  AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
-    return nonMetaRegionLocator;
+  void clearCache(ServerName serverName) {
+    for (AbstractAsyncTableRegionLocator locator : table2Locator.values()) {
+      locator.clearCache(serverName);
+    }
   }
 
   // only used for testing whether we have cached the location for a region.
   @VisibleForTesting
   RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
-    if (TableName.isMetaTableName(tableName)) {
-      return metaRegionLocator.getRegionLocationInCache();
-    } else {
-      return nonMetaRegionLocator.getRegionLocationInCache(tableName, row);
+    AbstractAsyncTableRegionLocator locator = table2Locator.get(tableName);
+    if (locator == null) {
+      return null;
     }
+    return locator.locateInCache(row);
   }
 
   // only used for testing whether we have cached the location for a table.
   @VisibleForTesting
   int getNumberOfCachedRegionLocations(TableName tableName) {
-    if (TableName.isMetaTableName(tableName)) {
-      return metaRegionLocator.getNumberOfCachedRegionLocations();
-    } else {
-      return nonMetaRegionLocator.getNumberOfCachedRegionLocations(tableName);
+    AbstractAsyncTableRegionLocator locator = table2Locator.get(tableName);
+    if (locator == null) {
+      return 0;
     }
+    return locator.getNumberOfCachedRegionLocations();
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
index 4c6cd5a..c34cc5a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
 import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
 import java.util.Arrays;
+import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.commons.lang3.ObjectUtils;
@@ -30,6 +31,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Objects;
+
 /**
  * Helper class for asynchronous region locator.
  */
@@ -55,9 +58,9 @@ final class AsyncRegionLocatorHelper {
   }
 
   static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
-      Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
-      Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
-      MetricsConnection metrics) {
+    Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
+    Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
+    Optional<MetricsConnection> metrics) {
     HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc,
@@ -85,9 +88,7 @@ final class AsyncRegionLocatorHelper {
       addToCache.accept(newLoc);
     } else {
       LOG.debug("Try removing {} from cache", loc);
-      if (metrics != null) {
-        metrics.incrCacheDroppingExceptions(exception);
-      }
+      metrics.ifPresent(m -> m.incrCacheDroppingExceptions(exception));
       removeFromCache.accept(loc);
     }
   }
@@ -146,4 +147,33 @@ final class AsyncRegionLocatorHelper {
     HRegionLocation loc = locs.getRegionLocation(replicaId);
     return loc != null && loc.getServerName() != null;
   }
+
+  static boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
+    HRegionLocation[] locArr1 = locs1.getRegionLocations();
+    HRegionLocation[] locArr2 = locs2.getRegionLocations();
+    if (locArr1.length != locArr2.length) {
+      return false;
+    }
+    for (int i = 0; i < locArr1.length; i++) {
+      // do not need to compare region info
+      HRegionLocation loc1 = locArr1[i];
+      HRegionLocation loc2 = locArr2[i];
+      if (loc1 == null) {
+        if (loc2 != null) {
+          return false;
+        }
+      } else {
+        if (loc2 == null) {
+          return false;
+        }
+        if (loc1.getSeqNum() != loc2.getSeqNum()) {
+          return false;
+        }
+        if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
index 321f44e..49bade3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
@@ -107,8 +107,8 @@ public interface AsyncTableRegionLocator {
   /**
    * Retrieves all of the regions associated with this table.
    * <p/>
-   * Usually we will go to meta table directly in this method so there is no {@code reload}
-   * parameter.
+   * We will go to meta table directly in this method so there is no {@code reload} parameter. So
+   * please use with caution as this could generate great load to a cluster.
    * <p/>
    * Notice that the location for region replicas other than the default replica are also returned.
    * @return a {@link List} of all regions associated with this table.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index ad6a051..83b601e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import org.apache.hadoop.hbase.ClientMetaTableAccessor;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -54,12 +53,7 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
 
   @Override
   public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
-    if (TableName.isMetaTableName(tableName)) {
-      return conn.registry.getMetaRegionLocations()
-        .thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
-    }
-    return ClientMetaTableAccessor
-      .getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), tableName);
+    return conn.getLocator().getAllRegionLocations(tableName, true);
   }
 
   @Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
index cd22d78..569d728 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
-import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -32,11 +31,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 interface ConnectionRegistry extends Closeable {
 
   /**
-   * Get the location of meta region(s).
-   */
-  CompletableFuture<RegionLocations> getMetaRegionLocations();
-
-  /**
    * Should only be called once.
    * <p>
    * The upper layer should store this value somewhere as it will not be change any more.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 1228c7e..94ab7fa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -186,24 +186,6 @@ public final class ConnectionUtils {
     return Arrays.copyOf(row, row.length + 1);
   }
 
-  /**
-   * Create a row before the specified row and very close to the specified row.
-   */
-  static byte[] createCloseRowBefore(byte[] row) {
-    if (row.length == 0) {
-      return MAX_BYTE_ARRAY;
-    }
-    if (row[row.length - 1] == 0) {
-      return Arrays.copyOf(row, row.length - 1);
-    } else {
-      byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length];
-      System.arraycopy(row, 0, nextRow, 0, row.length - 1);
-      nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1);
-      System.arraycopy(MAX_BYTE_ARRAY, 0, nextRow, row.length, MAX_BYTE_ARRAY.length);
-      return nextRow;
-    }
-  }
-
   static boolean isEmptyStartRow(byte[] row) {
     return Bytes.equals(row, EMPTY_START_ROW);
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
index 4d0a591..e70aa97 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -243,7 +243,7 @@ public class MasterRegistry implements ConnectionRegistry {
     return new RegionLocations(regionLocations);
   }
 
-  @Override
+  // keep the method here just for testing compatibility
   public CompletableFuture<RegionLocations> getMetaRegionLocations() {
     return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
       GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 16483ef..cbdc462 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -746,8 +746,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
     if (TableName.isMetaTableName(tableName)) {
-      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
-        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
+      return getTableHRegionLocations(tableName).thenApply(
+        locs -> locs.stream().allMatch(loc -> loc != null && loc.getServerName() != null));
     }
     CompletableFuture<Boolean> future = new CompletableFuture<>();
     addListener(isTableEnabled(tableName), (enabled, error) -> {
@@ -763,7 +763,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         future.complete(false);
       } else {
         addListener(
-          ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
+          ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true),
           (locations, error1) -> {
             if (error1 != null) {
               future.completeExceptionally(error1);
@@ -883,15 +883,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
-    if (tableName.equals(META_TABLE_NAME)) {
-      return connection.registry.getMetaRegionLocations()
-        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
-          .collect(Collectors.toList()));
-    } else {
-      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
-        .thenApply(
-          locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
-    }
+    return getTableHRegionLocations(tableName).thenApply(
+      locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
   }
 
   @Override
@@ -1109,23 +1102,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
    * List all region locations for the specific table.
    */
   private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
-    if (TableName.META_TABLE_NAME.equals(tableName)) {
-      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
-      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-        } else if (metaRegions == null || metaRegions.isEmpty() ||
-          metaRegions.getDefaultRegionLocation() == null) {
-          future.completeExceptionally(new IOException("meta region does not found"));
-        } else {
-          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
-        }
-      });
-      return future;
-    } else {
-      // For non-meta table, we fetch all locations by scanning hbase:meta table
-      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
-    }
+    return connection.getRegionLocator(tableName).getAllRegionLocations();
   }
 
   /**
@@ -2375,9 +2352,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
         if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
           // old format encodedName, should be meta region
-          future = connection.registry.getMetaRegionLocations()
-            .thenApply(locs -> Stream.of(locs.getRegionLocations())
-              .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
+          future = getTableHRegionLocations(META_TABLE_NAME).thenApply(locs -> locs.stream()
+            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
         } else {
           future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
             regionNameOrEncodedRegionName);
@@ -2386,10 +2362,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         RegionInfo regionInfo =
           CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
         if (regionInfo.isMetaRegion()) {
-          future = connection.registry.getMetaRegionLocations()
-            .thenApply(locs -> Stream.of(locs.getRegionLocations())
-              .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
-              .findFirst());
+          future = getTableHRegionLocations(META_TABLE_NAME).thenApply(locs -> locs.stream()
+            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
+            .findFirst());
         } else {
           future =
             ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java
index 950123c..2380fd8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -27,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * <li>{@link #AFTER} locate the region which contains the row after the given row.</li>
  * </ul>
  */
-@InterfaceAudience.Private
-enum RegionLocateType {
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+public enum RegionLocateType {
   BEFORE, CURRENT, AFTER
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java
new file mode 100644
index 0000000..86f79ae
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isEqual;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The location cache for regions of a table.
+ */
+@InterfaceAudience.Private
+class TableRegionLocationCache {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TableRegionLocationCache.class);
+
+  private final Optional<MetricsConnection> metrics;
+
+  private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
+    new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
+
+  TableRegionLocationCache(Optional<MetricsConnection> metrics) {
+    this.metrics = metrics;
+  }
+
+  private void recordCacheHit() {
+    metrics.ifPresent(MetricsConnection::incrMetaCacheHit);
+  }
+
+  private void recordCacheMiss() {
+    metrics.ifPresent(MetricsConnection::incrMetaCacheMiss);
+  }
+
+  private void recordClearRegionCache() {
+    metrics.ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
+  }
+
+  private RegionLocations locateRow(TableName tableName, byte[] row, int replicaId) {
+    Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
+    if (entry == null) {
+      recordCacheMiss();
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      recordCacheMiss();
+      return null;
+    }
+    byte[] endKey = loc.getRegion().getEndKey();
+    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
+      }
+      recordCacheHit();
+      return locs;
+    } else {
+      recordCacheMiss();
+      return null;
+    }
+  }
+
+  private RegionLocations locateRowBefore(TableName tableName, byte[] row, int replicaId) {
+    boolean isEmptyStopRow = isEmptyStopRow(row);
+    Map.Entry<byte[], RegionLocations> entry =
+      isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
+    if (entry == null) {
+      recordCacheMiss();
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      recordCacheMiss();
+      return null;
+    }
+    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
+      }
+      recordCacheHit();
+      return locs;
+    } else {
+      recordCacheMiss();
+      return null;
+    }
+  }
+
+  RegionLocations locate(TableName tableName, byte[] row, int replicaId,
+    RegionLocateType locateType) {
+    return locateType.equals(RegionLocateType.BEFORE) ? locateRowBefore(tableName, row, replicaId) :
+      locateRow(tableName, row, replicaId);
+  }
+
+  // if we successfully add the locations to cache, return the locations, otherwise return the one
+  // which prevents us being added. The upper layer can use this value to complete pending requests.
+  RegionLocations add(RegionLocations locs) {
+    LOG.trace("Try adding {} to cache", locs);
+    byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
+    for (;;) {
+      RegionLocations oldLocs = cache.putIfAbsent(startKey, locs);
+      if (oldLocs == null) {
+        return locs;
+      }
+      // check whether the regions are the same, this usually happens when table is split/merged, or
+      // deleted and recreated again.
+      RegionInfo region = locs.getRegionLocation().getRegion();
+      RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
+      if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
+        RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
+        if (isEqual(mergedLocs, oldLocs)) {
+          // the merged one is the same with the old one, give up
+          LOG.trace("Will not add {} to cache because the old value {} " +
+            " is newer than us or has the same server name." +
+            " Maybe it is updated before we replace it", locs, oldLocs);
+          return oldLocs;
+        }
+        if (cache.replace(startKey, oldLocs, mergedLocs)) {
+          return mergedLocs;
+        }
+      } else {
+        // the region is different, here we trust the one we fetched. This maybe wrong but finally
+        // the upper layer can detect this and trigger removal of the wrong locations
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," +
+            " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
+        }
+        if (cache.replace(startKey, oldLocs, locs)) {
+          return locs;
+        }
+      }
+    }
+  }
+
+  // notice that this is not a constant time operation, do not call it on critical path.
+  int size() {
+    return cache.size();
+  }
+
+  void clearCache(ServerName serverName) {
+    for (Map.Entry<byte[], RegionLocations> entry : cache.entrySet()) {
+      byte[] regionName = entry.getKey();
+      RegionLocations locs = entry.getValue();
+      RegionLocations newLocs = locs.removeByServer(serverName);
+      if (locs == newLocs) {
+        continue;
+      }
+      if (newLocs.isEmpty()) {
+        cache.remove(regionName, locs);
+      } else {
+        cache.replace(regionName, locs, newLocs);
+      }
+    }
+  }
+
+  void removeLocationFromCache(HRegionLocation loc) {
+    byte[] startKey = loc.getRegion().getStartKey();
+    for (;;) {
+      RegionLocations oldLocs = cache.get(startKey);
+      if (oldLocs == null) {
+        return;
+      }
+      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
+      if (!canUpdateOnError(loc, oldLoc)) {
+        return;
+      }
+      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
+      if (newLocs == null) {
+        if (cache.remove(startKey, oldLocs)) {
+          recordClearRegionCache();
+          return;
+        }
+      } else {
+        if (cache.replace(startKey, oldLocs, newLocs)) {
+          recordClearRegionCache();
+          return;
+        }
+      }
+    }
+  }
+
+  RegionLocations get(byte[] key) {
+    return cache.get(key);
+  }
+
+  // only used for testing whether we have cached the location for a table.
+  @VisibleForTesting
+  int getNumberOfCachedRegionLocations() {
+    return cache.values().stream().mapToInt(RegionLocations::numNonNullElements).sum();
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
index 42a4188..04ea535 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForR
 import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -42,7 +43,9 @@ import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
 
@@ -112,7 +115,7 @@ class ZKConnectionRegistry implements ConnectionRegistry {
   }
 
   private static void tryComplete(MutableInt remaining, HRegionLocation[] locs,
-      CompletableFuture<RegionLocations> future) {
+    CompletableFuture<RegionLocations> future) {
     remaining.decrement();
     if (remaining.intValue() > 0) {
       return;
@@ -120,8 +123,8 @@ class ZKConnectionRegistry implements ConnectionRegistry {
     future.complete(new RegionLocations(locs));
   }
 
-  private Pair<RegionState.State, ServerName> getStateAndServerName(
-      ZooKeeperProtos.MetaRegionServer proto) {
+  private Pair<RegionState.State, ServerName>
+    getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) {
     RegionState.State state;
     if (proto.hasState()) {
       state = RegionState.State.convert(proto.getState());
@@ -134,7 +137,7 @@ class ZKConnectionRegistry implements ConnectionRegistry {
   }
 
   private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
-      List<String> metaReplicaZNodes) {
+    List<String> metaReplicaZNodes) {
     if (metaReplicaZNodes.isEmpty()) {
       future.completeExceptionally(new IOException("No meta znode available"));
     }
@@ -190,13 +193,12 @@ class ZKConnectionRegistry implements ConnectionRegistry {
     }
   }
 
-  @Override
+  // keep the method here just for testing compatibility
   public CompletableFuture<RegionLocations> getMetaRegionLocations() {
     CompletableFuture<RegionLocations> future = new CompletableFuture<>();
     addListener(
-      zk.list(znodePaths.baseZNode)
-        .thenApply(children -> children.stream()
-          .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
+      zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
+        .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
       (metaReplicaZNodes, error) -> {
         if (error != null) {
           future.completeExceptionally(error);
@@ -219,14 +221,13 @@ class ZKConnectionRegistry implements ConnectionRegistry {
   @Override
   public CompletableFuture<ServerName> getActiveMaster() {
     return getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
-        .thenApply(proto -> {
-          if (proto == null) {
-            return null;
-          }
-          HBaseProtos.ServerName snProto = proto.getMaster();
-          return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
-            snProto.getStartCode());
-        });
+      .thenApply(proto -> {
+        if (proto == null) {
+          return null;
+        }
+        HBaseProtos.ServerName snProto = proto.getMaster();
+        return ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode());
+      });
   }
 
   @Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 4cdbfbc..1f6e057 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.RegionLoadStats;
+import org.apache.hadoop.hbase.client.RegionLocateType;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.RegionStatesCount;
 import org.apache.hadoop.hbase.client.Result;
@@ -3543,4 +3544,30 @@ public final class ProtobufUtil {
     return RSGroupProtos.RSGroupInfo.newBuilder().setName(pojo.getName()).addAllServers(hostports)
         .addAllTables(tables).addAllConfiguration(configuration).build();
   }
+
+  public static MasterProtos.RegionLocateType toProtoRegionLocateType(RegionLocateType pojo) {
+    switch (pojo) {
+      case BEFORE:
+        return MasterProtos.RegionLocateType.REGION_LOCATE_TYPE_BEFORE;
+      case CURRENT:
+        return MasterProtos.RegionLocateType.REGION_LOCATE_TYPE_CURRENT;
+      case AFTER:
+        return MasterProtos.RegionLocateType.REGION_LOCATE_TYPE_AFTER;
+      default:
+        throw new IllegalArgumentException("Unknown RegionLocateType: " + pojo);
+    }
+  }
+
+  public static RegionLocateType toRegionLocateType(MasterProtos.RegionLocateType proto) {
+    switch (proto) {
+      case REGION_LOCATE_TYPE_BEFORE:
+        return RegionLocateType.BEFORE;
+      case REGION_LOCATE_TYPE_CURRENT:
+        return RegionLocateType.CURRENT;
+      case REGION_LOCATE_TYPE_AFTER:
+        return RegionLocateType.AFTER;
+      default:
+        throw new IllegalArgumentException("Unknown proto RegionLocateType: " + proto);
+    }
+  }
 }
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
index 4bd6687..64ded7f 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -33,11 +32,6 @@ class DoNothingConnectionRegistry implements ConnectionRegistry {
   }
 
   @Override
-  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
-    return CompletableFuture.completedFuture(null);
-  }
-
-  @Override
   public CompletableFuture<String> getClusterId() {
     return CompletableFuture.completedFuture(null);
   }
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java
deleted file mode 100644
index b306500..0000000
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.FutureUtils;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ ClientTests.class, SmallTests.class })
-public class TestAsyncMetaRegionLocatorFailFast {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestAsyncMetaRegionLocatorFailFast.class);
-
-  private static Configuration CONF = HBaseConfiguration.create();
-
-  private static AsyncMetaRegionLocator LOCATOR;
-
-  private static final class FaultyConnectionRegistry extends DoNothingConnectionRegistry {
-
-    public FaultyConnectionRegistry(Configuration conf) {
-      super(conf);
-    }
-
-    @Override
-    public CompletableFuture<RegionLocations> getMetaRegionLocations() {
-      return FutureUtils.failedFuture(new DoNotRetryRegionException("inject error"));
-    }
-  }
-
-  @BeforeClass
-  public static void setUp() {
-    LOCATOR = new AsyncMetaRegionLocator(new FaultyConnectionRegistry(CONF));
-  }
-
-  @Test(expected = DoNotRetryIOException.class)
-  public void test() throws IOException {
-    FutureUtils.get(LOCATOR.getRegionLocations(RegionInfo.DEFAULT_REPLICA_ID, false));
-  }
-}
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
index 286c96f..44d5125 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
@@ -1269,6 +1269,29 @@ message GetMetaRegionLocationsResponse {
   repeated RegionLocation meta_locations = 1;
 }
 
+enum RegionLocateType {
+  REGION_LOCATE_TYPE_BEFORE =1;
+  REGION_LOCATE_TYPE_CURRENT = 2;
+  REGION_LOCATE_TYPE_AFTER = 3;
+}
+
+message LocateMetaRegionRequest {
+  required bytes row = 1;
+  required RegionLocateType locateType = 2;
+}
+
+message LocateMetaRegionResponse {
+  repeated RegionLocation meta_locations = 1;
+}
+
+message GetAllMetaRegionLocationsRequest {
+  required bool exclude_offlined_split_parents = 1;
+}
+
+message GetAllMetaRegionLocationsResponse {
+  repeated RegionLocation meta_locations = 1;
+}
+
 /**
  * Implements all the RPCs needed by clients to look up cluster meta information needed for connection establishment.
  */
@@ -1287,4 +1310,16 @@ service ClientMetaService {
    * Get current meta replicas' region locations.
    */
   rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
+
+  /**
+   * Get meta region locations for a given row
+   */
+  rpc LocateMetaRegion(LocateMetaRegionRequest)
+    returns(LocateMetaRegionResponse);
+
+  /**
+   * Get all meta regions locations
+   */
+  rpc GetAllMetaRegionLocations(GetAllMetaRegionLocationsRequest)
+    returns(GetAllMetaRegionLocationsResponse);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index ffe4f53..92b3f6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaMutationAnnotation;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ServerName;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocateType;
 import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.master.RegionPlan;
@@ -1765,7 +1767,7 @@ public interface MasterObserver {
       throws IOException {
   }
 
-  /*
+  /**
    * Called before checking if user has permissions.
    * @param ctx the coprocessor instance's environment
    * @param userName the user name
@@ -1784,4 +1786,44 @@ public interface MasterObserver {
   default void postHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
       String userName, List<Permission> permissions) throws IOException {
   }
+
+  /**
+   * Called before locating meta region.
+   * @param ctx ctx the coprocessor instance's environment
+   * @param row the row key to locate
+   * @param locateType the direction of the locate operation
+   */
+  default void preLocateMetaRegion(ObserverContext<MasterCoprocessorEnvironment> ctx, byte[] row,
+    RegionLocateType locateType) throws IOException {
+  }
+
+  /**
+   * Called after locating meta region.
+   * @param ctx ctx the coprocessor instance's environment
+   * @param row the row key to locate
+   * @param locateType the direction of the locate operation
+   * @param locs the locations of the given meta region, including meta replicas if any.
+   */
+  default void postLocateMetaRegion(ObserverContext<MasterCoprocessorEnvironment> ctx, byte[] row,
+    RegionLocateType locateType, List<HRegionLocation> locs) throws IOException {
+  }
+
+  /**
+   * Called before getting all locations for meta regions.
+   * @param ctx ctx the coprocessor instance's environment
+   * @param excludeOfflinedSplitParents don't return split parents
+   */
+  default void preGetAllMetaRegionLocations(ObserverContext<MasterCoprocessorEnvironment> ctx,
+    boolean excludeOfflinedSplitParents) {
+  }
+
+  /**
+   * Called after getting all locations for meta regions.
+   * @param ctx ctx the coprocessor instance's environment
+   * @param excludeOfflinedSplitParents don't return split parents
+   * @param locs the locations of all meta regions, including meta replicas if any.
+   */
+  default void postGetAllMetaRegionLocations(ObserverContext<MasterCoprocessorEnvironment> ctx,
+    boolean excludeOfflinedSplitParents, List<HRegionLocation> locs) {
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 2e2136e..2e59f0d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -72,11 +72,13 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.InvalidFamilyOperationException;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.PleaseHoldException;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -89,7 +91,9 @@ import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocateType;
 import org.apache.hadoop.hbase.client.RegionStatesCount;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@@ -219,7 +223,6 @@ import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
 import org.apache.hadoop.hbase.zookeeper.SnapshotCleanupTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -882,6 +885,27 @@ public class HMaster extends HRegionServer implements MasterServices {
     return new AssignmentManager(master, masterRegion);
   }
 
+  /**
+   * Load the meta region state from the meta region server ZNode.
+   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
+   * @param replicaId the ID of the replica
+   * @return regionstate
+   * @throws KeeperException if a ZooKeeper operation fails
+   */
+  private static RegionState getMetaRegionState(ZKWatcher zkw, int replicaId)
+    throws KeeperException {
+    RegionState regionState = null;
+    try {
+      byte[] data = ZKUtil.getData(zkw, zkw.getZNodePaths().getZNodeForReplica(replicaId));
+      regionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId);
+    } catch (DeserializationException e) {
+      throw ZKUtil.convert(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    return regionState;
+  }
+
   private void tryMigrateRootTableFromZooKeeper() throws IOException, KeeperException {
     // try migrate data from zookeeper
     try (RegionScanner scanner =
@@ -903,7 +927,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     StringBuilder info = new StringBuilder("Migrating meta location:");
     for (String metaReplicaNode : metaReplicaNodes) {
       int replicaId = zooKeeper.getZNodePaths().getMetaReplicaIdFromZnode(metaReplicaNode);
-      RegionState state = MetaTableLocator.getMetaRegionState(zooKeeper, replicaId);
+      RegionState state = getMetaRegionState(zooKeeper, replicaId);
       info.append(" ").append(state);
       put.setTimestamp(state.getStamp());
       MetaTableAccessor.addRegionInfo(put, state.getRegion());
@@ -3993,4 +4017,85 @@ public class HMaster extends HRegionServer implements MasterServices {
   public RSGroupInfoManager getRSGroupInfoManager() {
     return rsGroupInfoManager;
   }
+
+  public RegionLocations locateMeta(byte[] row, RegionLocateType locateType) throws IOException {
+    if (locateType == RegionLocateType.AFTER) {
+      // as we know the exact row after us, so we can just create the new row, and use the same
+      // algorithm to locate it.
+      row = Arrays.copyOf(row, row.length + 1);
+      locateType = RegionLocateType.CURRENT;
+    }
+    Scan scan =
+      CatalogFamilyFormat.createRegionLocateScan(TableName.META_TABLE_NAME, row, locateType, 1);
+    try (RegionScanner scanner = masterRegion.getScanner(scan)) {
+      boolean moreRows;
+      List<Cell> cells = new ArrayList<>();
+      do {
+        moreRows = scanner.next(cells);
+        if (cells.isEmpty()) {
+          continue;
+        }
+        Result result = Result.create(cells);
+        cells.clear();
+        RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
+        if (locs == null || locs.getDefaultRegionLocation() == null) {
+          LOG.warn("No location found when locating meta region with row='{}', locateType={}",
+            Bytes.toStringBinary(row), locateType);
+          return null;
+        }
+        HRegionLocation loc = locs.getDefaultRegionLocation();
+        RegionInfo info = loc.getRegion();
+        if (info == null) {
+          LOG.warn("HRegionInfo is null when locating meta region with row='{}', locateType={}",
+            Bytes.toStringBinary(row), locateType);
+          return null;
+        }
+        if (info.isSplitParent()) {
+          continue;
+        }
+        return locs;
+      } while (moreRows);
+      LOG.warn("No location available when locating meta region with row='{}', locateType={}",
+        Bytes.toStringBinary(row), locateType);
+      return null;
+    }
+  }
+
+  public List<RegionLocations> getAllMetaRegionLocations(boolean excludeOfflinedSplitParents)
+    throws IOException {
+    Scan scan = new Scan().addFamily(HConstants.CATALOG_FAMILY);
+    List<RegionLocations> list = new ArrayList<>();
+    try (RegionScanner scanner = masterRegion.getScanner(scan)) {
+      boolean moreRows;
+      List<Cell> cells = new ArrayList<>();
+      do {
+        moreRows = scanner.next(cells);
+        if (cells.isEmpty()) {
+          continue;
+        }
+        Result result = Result.create(cells);
+        cells.clear();
+        RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
+        if (locs == null) {
+          LOG.warn("No locations in {}", result);
+          continue;
+        }
+        HRegionLocation loc = locs.getRegionLocation();
+        if (loc == null) {
+          LOG.warn("No non null location in {}", result);
+          continue;
+        }
+        RegionInfo info = loc.getRegion();
+        if (info == null) {
+          LOG.warn("No serialized RegionInfo in {}", result);
+          continue;
+        }
+        if (excludeOfflinedSplitParents && info.isSplitParent()) {
+          continue;
+        }
+        list.add(locs);
+      } while (moreRows);
+    }
+    return list;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 28c9c30..ae29375 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaMutationAnnotation;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ServerName;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocateType;
 import org.apache.hadoop.hbase.client.SharedConnection;
 import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -2039,4 +2041,42 @@ public class MasterCoprocessorHost
       }
     });
   }
+
+  public void preLocateMetaRegion(byte[] row, RegionLocateType locateType) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.preLocateMetaRegion(this, row, locateType);
+      }
+    });
+  }
+
+  public void postLocateMetaRegion(byte[] row, RegionLocateType locateType,
+    List<HRegionLocation> locs) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.postLocateMetaRegion(this, row, locateType, locs);
+      }
+    });
+  }
+
+  public void preGetAllMetaRegionLocations(boolean excludeOfflinedSplitParents) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.preGetAllMetaRegionLocations(this, excludeOfflinedSplitParents);
+      }
+    });
+  }
+
+  public void postGetAllMetaRegionLocations(boolean excludeOfflinedSplitParents,
+    List<HRegionLocation> locs) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.postGetAllMetaRegionLocations(this, excludeOfflinedSplitParents, locs);
+      }
+    });
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
index c3fc33a..914b7ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
@@ -30,7 +30,9 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.region.MasterRegion;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -81,22 +83,27 @@ class MasterMetaBootstrap {
         }
         for (int i = 1; i < numReplicas; i++) {
           RegionInfo secondaryRegionInfo = RegionReplicaUtil.getRegionInfoForReplica(regionInfo, i);
-          RegionState secondaryRegionState = regionStates.getRegionState(secondaryRegionInfo);
-          ServerName sn = null;
-          if (secondaryRegionState != null) {
-            sn = secondaryRegionState.getServerName();
-            if (sn != null && !metaServerNames.add(sn)) {
-              LOG.info("{} old location {} is same with other hbase:meta replica location;" +
-                " setting location as null...", secondaryRegionInfo.getRegionNameAsString(), sn);
-              sn = null;
-            }
-          }
+          RegionStateNode secondaryRegionState =
+            regionStates.getOrCreateRegionStateNode(secondaryRegionInfo);
           // These assigns run inline. All is blocked till they complete. Only interrupt is shutting
           // down hosting server which calls AM#stop.
-          if (sn != null) {
-            am.assignAsync(secondaryRegionInfo, sn);
-          } else {
+          if (secondaryRegionState.getState() == State.OFFLINE) {
+            LOG.info("Assign new meta region replica {}", secondaryRegionInfo);
             am.assignAsync(secondaryRegionInfo);
+          } else if (secondaryRegionState.getProcedure() == null) {
+            ServerName sn = secondaryRegionState.getRegionLocation();
+            if (sn != null) {
+              if (!metaServerNames.add(sn)) {
+                LOG.info(
+                  "{} old location {} is same with other hbase:meta replica location;" +
+                    " setting location as null...",
+                  secondaryRegionInfo.getRegionNameAsString(),
+                  secondaryRegionState.getRegionLocation());
+                am.moveAsync(new RegionPlan(secondaryRegionInfo, sn, null));
+              } else {
+                regionStates.addRegionToServer(secondaryRegionState);
+              }
+            }
           }
         }
       }
@@ -119,8 +126,10 @@ class MasterMetaBootstrap {
       }
       RegionState regionState = regionStates.getRegionState(regionInfo);
       try {
-        ServerManager.closeRegionSilentlyAndWait(master.getAsyncClusterConnection(),
-          regionState.getServerName(), regionInfo, 30000);
+        if (regionState.getServerName() != null) {
+          ServerManager.closeRegionSilentlyAndWait(master.getAsyncClusterConnection(),
+            regionState.getServerName(), regionInfo, 30000);
+        }
         if (regionInfo.isFirst()) {
           // for compatibility, also try to remove the replicas on zk.
           ZKUtil.deleteNode(zooKeeper,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index fc29a38..8882aa8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.ServerMetricsBuilder;
@@ -50,7 +51,7 @@ import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocateType;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableState;
@@ -113,7 +114,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -197,6 +197,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaReq
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
@@ -252,6 +254,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableD
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
@@ -391,10 +395,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.
  */
 @InterfaceAudience.Private
 @SuppressWarnings("deprecation")
-public class MasterRpcServices extends RSRpcServices implements
-    MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
-    LockService.BlockingInterface, HbckService.BlockingInterface,
-    ClientMetaService.BlockingInterface {
+public class MasterRpcServices extends RSRpcServices implements MasterService.BlockingInterface,
+  RegionServerStatusService.BlockingInterface, LockService.BlockingInterface,
+  HbckService.BlockingInterface, ClientMetaService.BlockingInterface {
 
   private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName());
   private static final Logger AUDITLOG =
@@ -529,18 +532,17 @@ public class MasterRpcServices extends RSRpcServices implements
   @Override
   protected List<BlockingServiceAndInterface> getServices() {
     List<BlockingServiceAndInterface> bssi = new ArrayList<>(5);
-    bssi.add(new BlockingServiceAndInterface(
-        MasterService.newReflectiveBlockingService(this),
-        MasterService.BlockingInterface.class));
-    bssi.add(new BlockingServiceAndInterface(
-        RegionServerStatusService.newReflectiveBlockingService(this),
+    bssi.add(new BlockingServiceAndInterface(MasterService.newReflectiveBlockingService(this),
+      MasterService.BlockingInterface.class));
+    bssi.add(
+      new BlockingServiceAndInterface(RegionServerStatusService.newReflectiveBlockingService(this),
         RegionServerStatusService.BlockingInterface.class));
     bssi.add(new BlockingServiceAndInterface(LockService.newReflectiveBlockingService(this),
-        LockService.BlockingInterface.class));
+      LockService.BlockingInterface.class));
     bssi.add(new BlockingServiceAndInterface(HbckService.newReflectiveBlockingService(this),
-        HbckService.BlockingInterface.class));
+      HbckService.BlockingInterface.class));
     bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
-        ClientMetaService.BlockingInterface.class));
+      ClientMetaService.BlockingInterface.class));
     bssi.addAll(super.getServices());
     return bssi;
   }
@@ -1691,40 +1693,33 @@ public class MasterRpcServices extends RSRpcServices implements
   }
 
   @Override
-  public UnassignRegionResponse unassignRegion(RpcController controller,
-      UnassignRegionRequest req) throws ServiceException {
+  public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req)
+    throws ServiceException {
     try {
-      final byte [] regionName = req.getRegion().getValue().toByteArray();
+      final byte[] regionName = req.getRegion().getValue().toByteArray();
       RegionSpecifierType type = req.getRegion().getType();
       final boolean force = req.getForce();
       UnassignRegionResponse urr = UnassignRegionResponse.newBuilder().build();
 
       master.checkInitialized();
       if (type != RegionSpecifierType.REGION_NAME) {
-        LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
-          + " actual: " + type);
+        LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME +
+          " actual: " + type);
       }
-      Pair<RegionInfo, ServerName> pair =
-        MetaTableAccessor.getRegion(master.getConnection(), regionName);
-      if (Bytes.equals(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName(), regionName)) {
-        pair = new Pair<>(RegionInfoBuilder.FIRST_META_REGIONINFO,
-          MetaTableLocator.getMetaRegionLocation(master.getZooKeeper()));
-      }
-      if (pair == null) {
-        throw new UnknownRegionException(Bytes.toString(regionName));
+      final RegionInfo regionInfo = master.getAssignmentManager().getRegionInfo(regionName);
+      if (regionInfo == null) {
+        throw new UnknownRegionException(Bytes.toStringBinary(regionName));
       }
-
-      RegionInfo hri = pair.getFirst();
       if (master.cpHost != null) {
-        master.cpHost.preUnassign(hri, force);
+        master.cpHost.preUnassign(regionInfo, force);
       }
-      LOG.debug(master.getClientIdAuditPrefix() + " unassign " + hri.getRegionNameAsString()
-          + " in current location if it is online and reassign.force=" + force);
-      master.getAssignmentManager().unassign(hri);
+      LOG
+        .debug(master.getClientIdAuditPrefix() + " unassign " + regionInfo.getRegionNameAsString() +
+          " in current location if it is online and reassign.force=" + force);
+      master.getAssignmentManager().unassign(regionInfo);
       if (master.cpHost != null) {
-        master.cpHost.postUnassign(hri, force);
+        master.cpHost.postUnassign(regionInfo, force);
       }
-
       return urr;
     } catch (IOException ioe) {
       throw new ServiceException(ioe);
@@ -3274,4 +3269,67 @@ public class MasterRpcServices extends RSRpcServices implements
     }
     return builder.build();
   }
+
+  @Override
+  public LocateMetaRegionResponse locateMetaRegion(RpcController controller,
+    LocateMetaRegionRequest request) throws ServiceException {
+    byte[] row = request.getRow().toByteArray();
+    RegionLocateType locateType = ProtobufUtil.toRegionLocateType(request.getLocateType());
+    try {
+      master.checkServiceStarted();
+      if (master.getMasterCoprocessorHost() != null) {
+        master.getMasterCoprocessorHost().preLocateMetaRegion(row, locateType);
+      }
+      RegionLocations locs = master.locateMeta(row, locateType);
+      List<HRegionLocation> list = new ArrayList<>();
+      LocateMetaRegionResponse.Builder builder = LocateMetaRegionResponse.newBuilder();
+      if (locs != null) {
+        for (HRegionLocation loc : locs) {
+          if (loc != null) {
+            builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
+            list.add(loc);
+          }
+        }
+      }
+      if (master.getMasterCoprocessorHost() != null) {
+        master.getMasterCoprocessorHost().postLocateMetaRegion(row, locateType, list);
+      }
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public GetAllMetaRegionLocationsResponse getAllMetaRegionLocations(RpcController controller,
+    GetAllMetaRegionLocationsRequest request) throws ServiceException {
+    boolean excludeOfflinedSplitParents = request.getExcludeOfflinedSplitParents();
+    try {
+      master.checkServiceStarted();
+      if (master.getMasterCoprocessorHost() != null) {
+        master.getMasterCoprocessorHost().preGetAllMetaRegionLocations(excludeOfflinedSplitParents);
+      }
+      List<RegionLocations> locs = master.getAllMetaRegionLocations(excludeOfflinedSplitParents);
+      List<HRegionLocation> list = new ArrayList<>();
+      GetAllMetaRegionLocationsResponse.Builder builder =
+        GetAllMetaRegionLocationsResponse.newBuilder();
+      if (locs != null) {
+        for (RegionLocations ls : locs) {
+          for (HRegionLocation loc : ls) {
+            if (loc != null) {
+              builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
+              list.add(loc);
+            }
+          }
+        }
+      }
+      if (master.getMasterCoprocessorHost() != null) {
+        master.getMasterCoprocessorHost().postGetAllMetaRegionLocations(excludeOfflinedSplitParents,
+          list);
+      }
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 02e9dce..b46acfc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
 import java.util.List;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
@@ -552,4 +553,12 @@ public interface MasterServices extends Server {
    * @return The state of the load balancer, or false if the load balancer isn't defined.
    */
   boolean isBalancerOn();
+
+  /**
+   * Get locations for all meta regions.
+   * @param excludeOfflinedSplitParents don't return split parents
+   * @return The locations of all the meta regions
+   */
+  List<RegionLocations> getAllMetaRegionLocations(boolean excludeOfflinedSplitParents)
+    throws IOException;
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java
index 58e57c4..198208f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java
@@ -27,9 +27,9 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.tmpl.master.MasterStatusTmpl;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -80,7 +80,8 @@ public class MasterStatusServlet extends HttpServlet {
   }
 
   private ServerName getMetaLocationOrNull(HMaster master) {
-    return MetaTableLocator.getMetaRegionLocation(master.getZooKeeper());
+    return master.getAssignmentManager().getRegionStates()
+      .getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO).getServerName();
   }
 
   private Map<String, Integer> getFragmentationInfo(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
index f4e91b5..fef1a84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
@@ -39,11 +39,14 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
 /**
- * A cache of meta region location metadata. Registers a listener on ZK to track changes to the
- * meta table znodes. Clients are expected to retry if the meta information is stale. This class
- * is thread-safe (a single instance of this class can be shared by multiple threads without race
+ * A cache of meta region location metadata. Registers a listener on ZK to track changes to the meta
+ * table znodes. Clients are expected to retry if the meta information is stale. This class is
+ * thread-safe (a single instance of this class can be shared by multiple threads without race
  * conditions).
+ * @deprecated Now we store meta location in the local store at master side so we should get the
+ *             meta location from active master instead of zk, keep it here only for compatibility.
  */
+@Deprecated
 @InterfaceAudience.Private
 public class MetaRegionLocationCache extends ZKListener {
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
index 2723185..5a73f9e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -166,8 +166,8 @@ public class RegionStateStore {
     final Put put = new Put(CatalogFamilyFormat.getMetaKeyForRegion(regionInfo), time);
     MetaTableAccessor.addRegionInfo(put, regionInfo);
     final StringBuilder info =
-      new StringBuilder("pid=").append(pid).append(" updating hbase:meta row=")
-        .append(regionInfo.getEncodedName()).append(", regionState=").append(state);
+      new StringBuilder("pid=").append(pid).append(" updating catalog row=")
+        .append(regionInfo.getRegionNameAsString()).append(", regionState=").append(state);
     if (openSeqNum >= 0) {
       Preconditions.checkArgument(state == State.OPEN && regionLocation != null,
           "Open region should be on a server");
@@ -210,7 +210,7 @@ public class RegionStateStore {
   }
 
   public void mirrorMetaLocation(RegionInfo regionInfo, ServerName serverName, State state)
-      throws IOException {
+    throws IOException {
     try {
       MetaTableLocator.setMetaLocation(master.getZooKeeper(), serverName, regionInfo.getReplicaId(),
         state);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index 0cd9972..1c2981c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -365,8 +365,6 @@ public class CreateTableProcedure
       final List<RegionInfo> regions) throws IOException {
     assert (regions != null && regions.size() > 0) : "expected at least 1 region, got " + regions;
 
-    ProcedureSyncWait.waitMetaRegions(env);
-
     // Add replicas if needed
     // we need to create regions with replicaIds starting from 1
     List<RegionInfo> newRegions = RegionReplicaUtil.addReplicas(tableDescriptor, regions, 1,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
index 46621da..b28c95f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
@@ -26,7 +26,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
@@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -222,18 +220,6 @@ public final class ProcedureSyncWait {
     throw new TimeoutIOException("Timed out while waiting on " + purpose);
   }
 
-  protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException {
-    int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000);
-    try {
-      if (MetaTableLocator.waitMetaRegionLocation(env.getMasterServices().getZooKeeper(),
-        timeout) == null) {
-        throw new NotAllMetaRegionsOnlineException();
-      }
-    } catch (InterruptedException e) {
-      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
-    }
-  }
-
   protected static void waitRegionInTransition(final MasterProcedureEnv env,
       final List<RegionInfo> regions) throws IOException {
     final RegionStates states = env.getAssignmentManager().getRegionStates();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
index 23d0263..507058b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
@@ -21,8 +21,11 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -36,7 +39,6 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -159,7 +161,9 @@ public final class MasterSnapshotVerifier {
   private void verifyRegions(final SnapshotManifest manifest) throws IOException {
     List<RegionInfo> regions;
     if (TableName.META_TABLE_NAME.equals(tableName)) {
-      regions = MetaTableLocator.getMetaRegions(services.getZooKeeper());
+      regions = services.getAllMetaRegionLocations(false).stream()
+        .flatMap(locs -> Stream.of(locs.getRegionLocations())).filter(l -> l != null)
+        .map(HRegionLocation::getRegion).collect(Collectors.toList());
     } else {
       regions = MetaTableAccessor.getTableRegions(services.getConnection(), tableName);
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
index e491467..f4b0f3f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
@@ -24,6 +24,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -52,7 +54,6 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -193,12 +194,15 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
       monitor.rethrowException();
 
       List<Pair<RegionInfo, ServerName>> regionsAndLocations;
+
       if (TableName.META_TABLE_NAME.equals(snapshotTable)) {
-        regionsAndLocations = MetaTableLocator.getMetaRegionsAndLocations(
-          server.getZooKeeper());
+        regionsAndLocations = master.getAllMetaRegionLocations(false).stream()
+          .flatMap(locs -> Stream.of(locs.getRegionLocations())).filter(l -> l != null)
+          .map(loc -> Pair.newPair(loc.getRegion(), loc.getServerName()))
+          .collect(Collectors.toList());
       } else {
-        regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(
-          server.getConnection(), snapshotTable, false);
+        regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(server.getConnection(),
+          snapshotTable, false);
       }
 
       // run the snapshot
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
index 1f7a5e2..595d15b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
@@ -24,12 +24,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -43,7 +44,6 @@ import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -129,20 +129,16 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager {
     // Each region server will get its own online regions for the table.
     // We may still miss regions that need to be flushed.
     List<Pair<RegionInfo, ServerName>> regionsAndLocations;
-
-    if (TableName.META_TABLE_NAME.equals(tableName)) {
-      regionsAndLocations = MetaTableLocator.getMetaRegionsAndLocations(
-        master.getZooKeeper());
-    } else {
-      regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(
-        master.getConnection(), tableName, false);
+    try (RegionLocator locator =
+      master.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+      regionsAndLocations = locator.getAllRegionLocations().stream()
+        .map(loc -> Pair.newPair(loc.getRegion(), loc.getServerName()))
+        .collect(Collectors.toList());
     }
 
     Set<String> regionServers = new HashSet<>(regionsAndLocations.size());
     for (Pair<RegionInfo, ServerName> region : regionsAndLocations) {
       if (region != null && region.getFirst() != null && region.getSecond() != null) {
-        RegionInfo hri = region.getFirst();
-        if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
         regionServers.add(region.getSecond().toString());
       }
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index dd5d6c8..589213e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -795,8 +795,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
           + MAX_FLUSH_PER_CHANGES);
     }
-    this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
-                    DEFAULT_ROWLOCK_WAIT_DURATION);
+    this.rowLockWaitDuration =
+      conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION);
 
     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
     this.htableDescriptor = htd;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 43fd908..104c9b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -120,7 +120,6 @@ import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.LoadBalancer;
 import org.apache.hadoop.hbase.master.MasterRpcServicesVersionWrapper;
-import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
 import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
@@ -177,7 +176,6 @@ import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -2309,8 +2307,8 @@ public class HRegionServer extends Thread implements
   }
 
   /**
-   * Helper method for use in tests. Skip the region transition report when there's no master
-   * around to receive it.
+   * Helper method for use in tests. Skip the region transition report when there's no master around
+   * to receive it.
    */
   private boolean skipReportingTransition(final RegionStateTransitionContext context) {
     final TransitionCode code = context.getCode();
@@ -2321,17 +2319,13 @@ public class HRegionServer extends Thread implements
     if (code == TransitionCode.OPENED) {
       Preconditions.checkArgument(hris != null && hris.length == 1);
       if (hris[0].isMetaRegion()) {
-        try {
-          MetaTableLocator.setMetaLocation(getZooKeeper(), serverName,
-              hris[0].getReplicaId(), RegionState.State.OPEN);
-        } catch (KeeperException e) {
-          LOG.info("Failed to update meta location", e);
-          return false;
-        }
+        LOG.warn(
+          "meta table location is stored in master local store, so we can not skip reporting");
+        return false;
       } else {
         try {
           MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
-              serverName, openSeqNum, masterSystemTime);
+            serverName, openSeqNum, masterSystemTime);
         } catch (IOException e) {
           LOG.info("Failed to update meta", e);
           return false;
diff --git a/hbase-server/src/main/resources/hbase-webapps/master/table.jsp b/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
index ef17315..68c1c63 100644
--- a/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
+++ b/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
@@ -62,7 +62,6 @@
 <%@ page import="org.apache.hadoop.hbase.quotas.ThrottleSettings" %>
 <%@ page import="org.apache.hadoop.hbase.util.Bytes" %>
 <%@ page import="org.apache.hadoop.hbase.util.FSUtils" %>
-<%@ page import="org.apache.hadoop.hbase.zookeeper.MetaTableLocator" %>
 <%@ page import="org.apache.hadoop.util.StringUtils" %>
 <%@ page import="org.apache.hbase.thirdparty.com.google.protobuf.ByteString" %>
 <%@ page import="org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos" %>
@@ -263,7 +262,8 @@ if (fqtn != null && master.isInitialized()) {
           for (int j = 0; j < numMetaReplicas; j++) {
             RegionInfo meta = RegionReplicaUtil.getRegionInfoForReplica(
                                     RegionInfoBuilder.FIRST_META_REGIONINFO, j);
-            ServerName metaLocation = MetaTableLocator.waitMetaRegionLocation(master.getZooKeeper(), j, 1);
+            RegionState regionState = master.getAssignmentManager().getRegionStates().getRegionState(meta);
+            ServerName metaLocation = regionState != null ? regionState.getServerName() : null;
             for (int i = 0; i < 1; i++) {
               String hostAndPort = "";
               String readReq = "N/A";
@@ -334,7 +334,8 @@ if (fqtn != null && master.isInitialized()) {
            for (int j = 0; j < numMetaReplicas; j++) {
              RegionInfo meta = RegionReplicaUtil.getRegionInfoForReplica(
                                      RegionInfoBuilder.FIRST_META_REGIONINFO, j);
-             ServerName metaLocation = MetaTableLocator.waitMetaRegionLocation(master.getZooKeeper(), j, 1);
+             RegionState regionState = master.getAssignmentManager().getRegionStates().getRegionState(meta);
+             ServerName metaLocation = regionState != null ? regionState.getServerName() : null;
              for (int i = 0; i < 1; i++) {
                String hostAndPort = "";
                float locality = 0.0f;
@@ -382,7 +383,8 @@ if (fqtn != null && master.isInitialized()) {
           for (int j = 0; j < numMetaReplicas; j++) {
             RegionInfo meta = RegionReplicaUtil.getRegionInfoForReplica(
                                     RegionInfoBuilder.FIRST_META_REGIONINFO, j);
-            ServerName metaLocation = MetaTableLocator.waitMetaRegionLocation(master.getZooKeeper(), j, 1);
+                        RegionState regionState = master.getAssignmentManager().getRegionStates().getRegionState(meta);
+            ServerName metaLocation = regionState != null ? regionState.getServerName() : null;
             for (int i = 0; i < 1; i++) {
               String hostAndPort = "";
               long compactingCells = 0;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index 112fa01..a3d78aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -317,13 +316,6 @@ public class TestMetaTableAccessor {
   }
 
   @Test
-  public void testGetRegionsFromMetaTable() throws IOException, InterruptedException {
-    List<RegionInfo> regions = MetaTableLocator.getMetaRegions(UTIL.getZooKeeperWatcher());
-    assertTrue(regions.size() >= 1);
-    assertTrue(MetaTableLocator.getMetaRegionsAndLocations(UTIL.getZooKeeperWatcher()).size() >= 1);
-  }
-
-  @Test
   public void testTableExists() throws IOException {
     final TableName tableName = TableName.valueOf(name.getMethodName());
     assertFalse(MetaTableAccessor.tableExists(connection, tableName));
@@ -600,7 +592,7 @@ public class TestMetaTableAccessor {
     UTIL.createTable(tableName, FAMILY, SPLIT_KEYS);
     Table table = connection.getTable(tableName);
     // Make sure all the regions are deployed
-    UTIL.countRows(table);
+    HBaseTestingUtility.countRows(table);
 
     ClientMetaTableAccessor.Visitor visitor = mock(ClientMetaTableAccessor.Visitor.class);
     doReturn(true).when(visitor).visit((Result) anyObject());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java
deleted file mode 100644
index 9274fa0..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
-
-/**
- * Test {@link org.apache.hadoop.hbase.zookeeper.MetaTableLocator}
- */
-@Category({ MiscTests.class, MediumTests.class })
-public class TestMetaTableLocator {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestMetaTableLocator.class);
-
-  private static final Logger LOG = LoggerFactory.getLogger(TestMetaTableLocator.class);
-  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-  private static final ServerName SN =
-    ServerName.valueOf("example.org", 1234, System.currentTimeMillis());
-  private ZKWatcher watcher;
-  private Abortable abortable;
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    // Set this down so tests run quicker
-    UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
-    UTIL.startMiniZKCluster();
-  }
-
-  @AfterClass
-  public static void afterClass() throws IOException {
-    UTIL.getZkCluster().shutdown();
-  }
-
-  @Before
-  public void before() throws IOException {
-    this.abortable = new Abortable() {
-      @Override
-      public void abort(String why, Throwable e) {
-        LOG.info(why, e);
-      }
-
-      @Override
-      public boolean isAborted() {
-        return false;
-      }
-    };
-    this.watcher =
-      new ZKWatcher(UTIL.getConfiguration(), this.getClass().getSimpleName(), this.abortable, true);
-  }
-
-  @After
-  public void after() {
-    try {
-      // Clean out meta location or later tests will be confused... they presume
-      // start fresh in zk.
-      MetaTableLocator.deleteMetaLocation(this.watcher);
-    } catch (KeeperException e) {
-      LOG.warn("Unable to delete hbase:meta location", e);
-    }
-
-    this.watcher.close();
-  }
-
-  /**
-   * Test normal operations
-   */
-  @Test
-  public void testMetaLookup()
-      throws IOException, InterruptedException, ServiceException, KeeperException {
-    final ClientProtos.ClientService.BlockingInterface client =
-      Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
-
-    Mockito.when(client.get((RpcController) Mockito.any(), (GetRequest) Mockito.any()))
-      .thenReturn(GetResponse.newBuilder().build());
-
-    assertNull(MetaTableLocator.getMetaRegionLocation(this.watcher));
-    for (RegionState.State state : RegionState.State.values()) {
-      if (state.equals(RegionState.State.OPEN)) {
-        continue;
-      }
-      MetaTableLocator.setMetaLocation(this.watcher, SN, state);
-      assertNull(MetaTableLocator.getMetaRegionLocation(this.watcher));
-      assertEquals(state, MetaTableLocator.getMetaRegionState(this.watcher).getState());
-    }
-    MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPEN);
-    assertEquals(SN, MetaTableLocator.getMetaRegionLocation(this.watcher));
-    assertEquals(RegionState.State.OPEN,
-      MetaTableLocator.getMetaRegionState(this.watcher).getState());
-
-    MetaTableLocator.deleteMetaLocation(this.watcher);
-    assertNull(MetaTableLocator.getMetaRegionState(this.watcher).getServerName());
-    assertEquals(RegionState.State.OFFLINE,
-      MetaTableLocator.getMetaRegionState(this.watcher).getState());
-    assertNull(MetaTableLocator.getMetaRegionLocation(this.watcher));
-  }
-
-  @Test(expected = NotAllMetaRegionsOnlineException.class)
-  public void testTimeoutWaitForMeta() throws IOException, InterruptedException {
-    MetaTableLocator.waitMetaRegionLocation(watcher, 100);
-  }
-
-  /**
-   * Test waiting on meat w/ no timeout specified.
-   */
-  @Test
-  public void testNoTimeoutWaitForMeta() throws IOException, InterruptedException, KeeperException {
-    ServerName hsa = MetaTableLocator.getMetaRegionLocation(watcher);
-    assertNull(hsa);
-
-    // Now test waiting on meta location getting set.
-    Thread t = new WaitOnMetaThread();
-    startWaitAliveThenWaitItLives(t, 1);
-    // Set a meta location.
-    MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPEN);
-    hsa = SN;
-    // Join the thread... should exit shortly.
-    t.join();
-    // Now meta is available.
-    assertTrue(MetaTableLocator.getMetaRegionLocation(watcher).equals(hsa));
-  }
-
-  private void startWaitAliveThenWaitItLives(final Thread t, final int ms) {
-    t.start();
-    UTIL.waitFor(2000, t::isAlive);
-    // Wait one second.
-    Threads.sleep(ms);
-    assertTrue("Assert " + t.getName() + " still waiting", t.isAlive());
-  }
-
-  /**
-   * Wait on META.
-   */
-  class WaitOnMetaThread extends Thread {
-
-    WaitOnMetaThread() {
-      super("WaitOnMeta");
-    }
-
-    @Override
-    public void run() {
-      try {
-        doWaiting();
-      } catch (InterruptedException e) {
-        throw new RuntimeException("Failed wait", e);
-      }
-      LOG.info("Exiting " + getName());
-    }
-
-    void doWaiting() throws InterruptedException {
-      try {
-        for (;;) {
-          if (MetaTableLocator.waitMetaRegionLocation(watcher, 10000) != null) {
-            break;
-          }
-        }
-      } catch (NotAllMetaRegionsOnlineException e) {
-        // Ignore
-      }
-    }
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
index 89f287b..cb10e38 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
@@ -58,10 +58,7 @@ public abstract class AbstractTestRegionLocator {
     }
     UTIL.getAdmin().createTable(td, SPLIT_KEYS);
     UTIL.waitTableAvailable(TABLE_NAME);
-    try (ConnectionRegistry registry =
-             ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration())) {
-      RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
-    }
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL);
     UTIL.getAdmin().balancerSwitch(false, true);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
index c9d67f4..ea1122c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 
 /**
@@ -32,11 +31,6 @@ public class DummyConnectionRegistry implements ConnectionRegistry {
       HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
 
   @Override
-  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
-    return null;
-  }
-
-  @Override
   public CompletableFuture<String> getClusterId() {
     return null;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java
index 78e3e54..977a274 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java
@@ -29,12 +29,12 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNameTestRule;
 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.junit.AfterClass;
 import org.junit.Rule;
 import org.slf4j.Logger;
@@ -64,8 +64,11 @@ public class MetaWithReplicasTestBase {
     TEST_UTIL.startMiniCluster(option);
     AssignmentManager am = TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
     Set<ServerName> sns = new HashSet<ServerName>();
-    ServerName hbaseMetaServerName =
-      MetaTableLocator.getMetaRegionLocation(TEST_UTIL.getZooKeeperWatcher());
+    ServerName hbaseMetaServerName;
+    try (RegionLocator locator =
+      TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+      hbaseMetaServerName = locator.getRegionLocation(HConstants.EMPTY_START_ROW).getServerName();
+    }
     LOG.info("HBASE:META DEPLOY: on " + hbaseMetaServerName);
     sns.add(hbaseMetaServerName);
     for (int replicaId = 1; replicaId < 3; replicaId++) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
index 8e562bd..8e220eb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -43,11 +44,10 @@ public final class RegionReplicaTestHelper {
   }
 
   // waits for all replicas to have region location
-  static void waitUntilAllMetaReplicasAreReady(HBaseTestingUtility util,
-      ConnectionRegistry registry) {
+  static void waitUntilAllMetaReplicasAreReady(HBaseTestingUtility util) {
     Configuration conf = util.getConfiguration();
     int regionReplicaCount = util.getConfiguration().getInt(HConstants.META_REPLICAS_NUM,
-        HConstants.DEFAULT_META_REPLICA_NUM);
+      HConstants.DEFAULT_META_REPLICA_NUM);
     Waiter.waitFor(conf, conf.getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
       new ExplainingPredicate<IOException>() {
         @Override
@@ -58,16 +58,20 @@ public final class RegionReplicaTestHelper {
         @Override
         public boolean evaluate() {
           try {
-            RegionLocations locs = registry.getMetaRegionLocations().get();
+            List<HRegionLocation> locs;
+            try (RegionLocator locator =
+              util.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+              locs = locator.getAllRegionLocations();
+            }
             if (locs.size() < regionReplicaCount) {
               return false;
             }
             for (int i = 0; i < regionReplicaCount; i++) {
-              HRegionLocation loc = locs.getRegionLocation(i);
+              HRegionLocation loc = locs.get(i);
               // Wait until the replica is served by a region server. There could be delay between
               // the replica being available to the connection and region server opening it.
               Optional<ServerName> rsCarryingReplica =
-                  getRSCarryingReplica(util, loc.getRegion().getTable(), i);
+                getRSCarryingReplica(util, loc.getRegion().getTable(), i);
               if (!rsCarryingReplica.isPresent()) {
                 return false;
               }
@@ -82,7 +86,7 @@ public final class RegionReplicaTestHelper {
   }
 
   static Optional<ServerName> getRSCarryingReplica(HBaseTestingUtility util, TableName tableName,
-      int replicaId) {
+    int replicaId) {
     return util.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
       .filter(rs -> rs.getRegions(tableName).stream()
         .anyMatch(r -> r.getRegionInfo().getReplicaId() == replicaId))
@@ -93,7 +97,7 @@ public final class RegionReplicaTestHelper {
    * Return the new location.
    */
   static ServerName moveRegion(HBaseTestingUtility util, HRegionLocation currentLoc)
-      throws Exception {
+    throws Exception {
     ServerName serverName = currentLoc.getServerName();
     RegionInfo regionInfo = currentLoc.getRegion();
     TableName tableName = regionInfo.getTable();
@@ -120,13 +124,13 @@ public final class RegionReplicaTestHelper {
 
   interface Locator {
     RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
-        throws Exception;
+      throws Exception;
 
     void updateCachedLocationOnError(HRegionLocation loc, Throwable error) throws Exception;
   }
 
   static void testLocator(HBaseTestingUtility util, TableName tableName, Locator locator)
-      throws Exception {
+    throws Exception {
     RegionLocations locs =
       locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false);
     assertEquals(3, locs.size());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
index 50111f7..79371a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
@@ -18,9 +18,9 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 import java.util.List;
@@ -53,11 +53,7 @@ public class TestAsyncAdminWithRegionReplicas extends TestAsyncAdminBase {
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
     TestAsyncAdminBase.setUpBeforeClass();
-    try (ConnectionRegistry registry =
-             ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) {
-      RegionReplicaTestHelper
-        .waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry);
-    }
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
   }
 
   private void testMoveNonDefaultReplica(TableName tableName)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
index 003bef3..1382c02 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
@@ -17,9 +17,9 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
 import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -35,6 +35,8 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
 @Category({ MediumTests.class, ClientTests.class })
 public class TestAsyncMetaRegionLocator {
 
@@ -44,24 +46,25 @@ public class TestAsyncMetaRegionLocator {
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
-  private static ConnectionRegistry REGISTRY;
+  private static AsyncConnectionImpl CONN;
 
-  private static AsyncMetaRegionLocator LOCATOR;
+  private static AsyncRegionLocator LOCATOR;
 
   @BeforeClass
   public static void setUp() throws Exception {
     TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.waitUntilNoRegionsInTransition();
-    REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
-    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
+    CONN = (AsyncConnectionImpl) ConnectionFactory
+      .createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
-    LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
+    LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
   }
 
   @AfterClass
   public static void tearDown() throws Exception {
-    IOUtils.closeQuietly(REGISTRY);
+    Closeables.close(CONN, true);
     TEST_UTIL.shutdownMiniCluster();
   }
 
@@ -71,14 +74,15 @@ public class TestAsyncMetaRegionLocator {
 
       @Override
       public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
-          throws Exception {
+        throws Exception {
         LOCATOR.updateCachedLocationOnError(loc, error);
       }
 
       @Override
       public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
-          throws Exception {
-        return LOCATOR.getRegionLocations(replicaId, reload).get();
+        throws Exception {
+        return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
+          RegionLocateType.CURRENT, reload).get();
       }
     });
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index d8388de..b0643ce 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -22,10 +22,10 @@ import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
 import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -34,7 +34,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.IntStream;
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -56,6 +55,8 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
 @Category({ MediumTests.class, ClientTests.class })
 public class TestAsyncNonMetaRegionLocator {
 
@@ -71,7 +72,7 @@ public class TestAsyncNonMetaRegionLocator {
 
   private static AsyncConnectionImpl CONN;
 
-  private static AsyncNonMetaRegionLocator LOCATOR;
+  private static AsyncRegionLocator LOCATOR;
 
   private static byte[][] SPLIT_KEYS;
 
@@ -80,10 +81,10 @@ public class TestAsyncNonMetaRegionLocator {
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     ConnectionRegistry registry =
-        ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
       registry.getClusterId().get(), null, User.getCurrent());
-    LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+    LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
     SPLIT_KEYS = new byte[8][];
     for (int i = 111; i < 999; i += 111) {
       SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
@@ -92,7 +93,7 @@ public class TestAsyncNonMetaRegionLocator {
 
   @AfterClass
   public static void tearDown() throws Exception {
-    IOUtils.closeQuietly(CONN);
+    Closeables.close(CONN, true);
     TEST_UTIL.shutdownMiniCluster();
   }
 
@@ -114,7 +115,7 @@ public class TestAsyncNonMetaRegionLocator {
   }
 
   private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName,
-      byte[] row, RegionLocateType locateType, boolean reload) {
+    byte[] row, RegionLocateType locateType, boolean reload) {
     return LOCATOR
       .getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload)
       .thenApply(RegionLocations::getDefaultRegionLocation);
@@ -145,7 +146,7 @@ public class TestAsyncNonMetaRegionLocator {
   }
 
   private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
-      HRegionLocation loc) {
+    HRegionLocation loc) {
     RegionInfo info = loc.getRegion();
     assertEquals(TABLE_NAME, info.getTable());
     assertArrayEquals(startKey, info.getStartKey());
@@ -359,7 +360,7 @@ public class TestAsyncNonMetaRegionLocator {
   // Testcase for HBASE-20822
   @Test
   public void testLocateBeforeLastRegion()
-      throws IOException, InterruptedException, ExecutionException {
+    throws IOException, InterruptedException, ExecutionException {
     createMultiRegionTable();
     getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
     HRegionLocation loc =
@@ -377,13 +378,13 @@ public class TestAsyncNonMetaRegionLocator {
 
       @Override
       public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
-          throws Exception {
+        throws Exception {
         LOCATOR.updateCachedLocationOnError(loc, error);
       }
 
       @Override
       public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
-          throws Exception {
+        throws Exception {
         return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
           RegionLocateType.CURRENT, reload).get();
       }
@@ -405,9 +406,8 @@ public class TestAsyncNonMetaRegionLocator {
   public void testConcurrentUpdateCachedLocationOnError() throws Exception {
     createSingleRegionTable();
     HRegionLocation loc =
-        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false)
-            .get();
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
     IntStream.range(0, 100).parallel()
-        .forEach(i -> LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException()));
+      .forEach(i -> LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException()));
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java
index 1cbde2f..2ef60f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java
@@ -51,8 +51,8 @@ import org.junit.runners.Parameterized;
 
 /**
  * Class to test asynchronous region admin operations.
- * @see TestAsyncRegionAdminApi This test and it used to be joined it was taking longer than our
- * ten minute timeout so they were split.
+ * @see TestAsyncRegionAdminApi This test and it used to be joined it was taking longer than our ten
+ *      minute timeout so they were split.
  */
 @RunWith(Parameterized.class)
 @Category({ LargeTests.class, ClientTests.class })
@@ -60,7 +60,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncRegionAdminApi2.class);
+    HBaseClassTestRule.forClass(TestAsyncRegionAdminApi2.class);
 
   @Test
   public void testGetRegionLocation() throws Exception {
@@ -79,13 +79,13 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
   @Test
   public void testSplitSwitch() throws Exception {
     createTableWithDefaultConf(tableName);
-    byte[][] families = {FAMILY};
+    byte[][] families = { FAMILY };
     final int rows = 10000;
     TestAsyncRegionAdminApi.loadData(tableName, families, rows);
 
     AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
     List<HRegionLocation> regionLocations =
-        ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).get();
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
     int originalCount = regionLocations.size();
 
     initSplitMergeSwitch();
@@ -93,7 +93,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
     try {
       admin.split(tableName, Bytes.toBytes(rows / 2)).join();
     } catch (Exception e) {
-      //Expected
+      // Expected
     }
     int count = admin.getRegions(tableName).get().size();
     assertTrue(originalCount == count);
@@ -111,12 +111,12 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
   // It was ignored in TestSplitOrMergeStatus, too
   public void testMergeSwitch() throws Exception {
     createTableWithDefaultConf(tableName);
-    byte[][] families = {FAMILY};
+    byte[][] families = { FAMILY };
     TestAsyncRegionAdminApi.loadData(tableName, families, 1000);
 
     AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
     List<HRegionLocation> regionLocations =
-        ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).get();
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
     int originalCount = regionLocations.size();
 
     initSplitMergeSwitch();
@@ -126,7 +126,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
       Threads.sleep(100);
     }
     assertTrue("originalCount=" + originalCount + ", postSplitCount=" + postSplitCount,
-        originalCount != postSplitCount);
+      originalCount != postSplitCount);
 
     // Merge switch is off so merge should NOT succeed.
     assertTrue(admin.mergeSwitch(false).get());
@@ -156,12 +156,12 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
 
   @Test
   public void testMergeRegions() throws Exception {
-    byte[][] splitRows = new byte[][]{Bytes.toBytes("3"), Bytes.toBytes("6")};
+    byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") };
     createTableWithDefaultConf(tableName, splitRows);
 
     AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
     List<HRegionLocation> regionLocations = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+      .getTableHRegionLocations(metaTable, tableName, true).get();
     RegionInfo regionA;
     RegionInfo regionB;
     RegionInfo regionC;
@@ -175,7 +175,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
     admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
 
     regionLocations = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+      .getTableHRegionLocations(metaTable, tableName, true).get();
 
     assertEquals(2, regionLocations.size());
     for (HRegionLocation rl : regionLocations) {
@@ -195,11 +195,10 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
       Thread.sleep(200);
     }
     // merge with encoded name
-    admin.mergeRegions(regionC.getRegionName(), mergedChildRegion.getRegionName(),
-      false).get();
+    admin.mergeRegions(regionC.getRegionName(), mergedChildRegion.getRegionName(), false).get();
 
-    regionLocations = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+    regionLocations =
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
     assertEquals(1, regionLocations.size());
   }
 
@@ -233,18 +232,18 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
     splitTest(TableName.valueOf("testSplitTable"), 3000, false, null);
     splitTest(TableName.valueOf("testSplitTableWithSplitPoint"), 3000, false, Bytes.toBytes("3"));
     splitTest(TableName.valueOf("testSplitTableRegion"), 3000, true, null);
-    splitTest(TableName.valueOf("testSplitTableRegionWithSplitPoint2"), 3000, true, Bytes.toBytes("3"));
+    splitTest(TableName.valueOf("testSplitTableRegionWithSplitPoint2"), 3000, true,
+      Bytes.toBytes("3"));
   }
 
-  private void
-  splitTest(TableName tableName, int rowCount, boolean isSplitRegion, byte[] splitPoint)
-      throws Exception {
+  private void splitTest(TableName tableName, int rowCount, boolean isSplitRegion,
+    byte[] splitPoint) throws Exception {
     // create table
     createTableWithDefaultConf(tableName);
 
     AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
     List<HRegionLocation> regionLocations = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+      .getTableHRegionLocations(metaTable, tableName, true).get();
     assertEquals(1, regionLocations.size());
 
     AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
@@ -273,8 +272,8 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
     int count = 0;
     for (int i = 0; i < 45; i++) {
       try {
-        regionLocations = ClientMetaTableAccessor
-          .getTableHRegionLocations(metaTable, tableName).get();
+        regionLocations =
+          ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
         count = regionLocations.size();
         if (count >= 2) {
           break;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorConcurrenyLimit.java
similarity index 88%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorConcurrenyLimit.java
index 88ab3ad..3f66d39 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorConcurrenyLimit.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static java.util.stream.Collectors.toList;
-import static org.apache.hadoop.hbase.client.AsyncNonMetaRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
 import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
@@ -32,7 +32,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -55,12 +54,14 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
 @Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
+public class TestAsyncRegionLocatorConcurrenyLimit {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
+    HBaseClassTestRule.forClass(TestAsyncRegionLocatorConcurrenyLimit.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
@@ -70,7 +71,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
 
   private static AsyncConnectionImpl CONN;
 
-  private static AsyncNonMetaRegionLocator LOCATOR;
+  private static AsyncRegionLocator LOCATOR;
 
   private static byte[][] SPLIT_KEYS;
 
@@ -89,7 +90,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
 
     @Override
     public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
-        InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
+      InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
       if (c.getEnvironment().getRegionInfo().isMetaRegion()) {
         int concurrency = CONCURRENCY.incrementAndGet();
         for (;;) {
@@ -108,7 +109,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
 
     @Override
     public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
-        InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
+      InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
       if (c.getEnvironment().getRegionInfo().isMetaRegion()) {
         CONCURRENCY.decrementAndGet();
       }
@@ -124,10 +125,10 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     ConnectionRegistry registry =
-        ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
       registry.getClusterId().get(), null, User.getCurrent());
-    LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+    LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
     SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
       .toArray(byte[][]::new);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
@@ -136,12 +137,12 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
 
   @AfterClass
   public static void tearDown() throws Exception {
-    IOUtils.closeQuietly(CONN);
+    Closeables.close(CONN, true);
     TEST_UTIL.shutdownMiniCluster();
   }
 
   private void assertLocs(List<CompletableFuture<RegionLocations>> futures)
-      throws InterruptedException, ExecutionException {
+    throws InterruptedException, ExecutionException {
     assertEquals(256, futures.size());
     for (int i = 0; i < futures.size(); i++) {
       HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
index 52ebc16..c8a5cec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -49,8 +50,8 @@ import org.junit.runners.Parameterized;
 
 /**
  * Class to test asynchronous table admin operations.
- * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our
- *     ten minute timeout so they were split.
+ * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our ten
+ *      minute timeout so they were split.
  * @see TestAsyncTableAdminApi3 Another split out from this class so each runs under ten minutes.
  */
 @RunWith(Parameterized.class)
@@ -59,7 +60,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncTableAdminApi.class);
+    HBaseClassTestRule.forClass(TestAsyncTableAdminApi.class);
 
   @Test
   public void testCreateTable() throws Exception {
@@ -69,13 +70,13 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     tables = admin.listTableDescriptors().get();
     assertEquals(numTables + 1, tables.size());
     assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
-        .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
+      .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
     assertEquals(TableState.State.ENABLED, getStateFromMeta(tableName));
   }
 
   static TableState.State getStateFromMeta(TableName table) throws Exception {
     Optional<TableState> state = ClientMetaTableAccessor
-        .getTableState(ASYNC_CONN.getTable(TableName.META_TABLE_NAME), table).get();
+      .getTableState(ASYNC_CONN.getTable(TableName.META_TABLE_NAME), table).get();
     assertTrue(state.isPresent());
     return state.get().getState();
   }
@@ -86,19 +87,21 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
 
     createTableWithDefaultConf(tableName);
     List<HRegionLocation> regionLocations = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+      .getTableHRegionLocations(metaTable, tableName, true).get();
     assertEquals("Table should have only 1 region", 1, regionLocations.size());
 
     final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "_2");
     createTableWithDefaultConf(tableName2, new byte[][] { new byte[] { 42 } });
-    regionLocations = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2).get();
+    regionLocations =
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2, true).get();
     assertEquals("Table should have only 2 region", 2, regionLocations.size());
 
     final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "_3");
     TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName3);
     builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
     admin.createTable(builder.build(), Bytes.toBytes("a"), Bytes.toBytes("z"), 3).join();
-    regionLocations = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3).get();
+    regionLocations =
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3, true).get();
     assertEquals("Table should have only 3 region", 3, regionLocations.size());
 
     final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "_4");
@@ -115,7 +118,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     builder = TableDescriptorBuilder.newBuilder(tableName5);
     builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
     admin.createTable(builder.build(), new byte[] { 1 }, new byte[] { 127 }, 16).join();
-    regionLocations = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName5).get();
+    regionLocations =
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName5, true).get();
     assertEquals("Table should have 16 region", 16, regionLocations.size());
   }
 
@@ -133,7 +137,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
 
     AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
     List<HRegionLocation> regions = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+      .getTableHRegionLocations(metaTable, tableName, true).get();
     Iterator<HRegionLocation> hris = regions.iterator();
 
     assertEquals(
@@ -191,7 +195,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
     admin.createTable(builder.build(), startKey, endKey, expectedRegions).join();
 
-    regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2).get();
+    regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2, true).get();
     assertEquals(
       "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
       expectedRegions, regions.size());
@@ -243,8 +247,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
     admin.createTable(builder.build(), startKey, endKey, expectedRegions).join();
 
-    regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3)
-      .get();
+    regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3, true).get();
     assertEquals(
       "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
       expectedRegions, regions.size());
@@ -333,7 +336,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
   }
 
   private void testTruncateTable(final TableName tableName, boolean preserveSplits)
-      throws Exception {
+    throws Exception {
     byte[][] splitKeys = new byte[2][];
     splitKeys[0] = Bytes.toBytes(4);
     splitKeys[1] = Bytes.toBytes(8);
@@ -374,8 +377,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     testCloneTableSchema(tableName, newTableName, true);
   }
 
-  private void testCloneTableSchema(final TableName tableName,
-      final TableName newTableName, boolean preserveSplits) throws Exception {
+  private void testCloneTableSchema(final TableName tableName, final TableName newTableName,
+    boolean preserveSplits) throws Exception {
     byte[][] splitKeys = new byte[2][];
     splitKeys[0] = Bytes.toBytes(4);
     splitKeys[1] = Bytes.toBytes(8);
@@ -386,20 +389,16 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     boolean BLOCK_CACHE = false;
 
     // Create the table
-    TableDescriptor tableDesc = TableDescriptorBuilder
-        .newBuilder(tableName)
-        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_0))
-        .setColumnFamily(ColumnFamilyDescriptorBuilder
-            .newBuilder(FAMILY_1)
-            .setBlocksize(BLOCK_SIZE)
-            .setBlockCacheEnabled(BLOCK_CACHE)
-            .setTimeToLive(TTL)
-            .build()).build();
+    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_0))
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).setBlocksize(BLOCK_SIZE)
+        .setBlockCacheEnabled(BLOCK_CACHE).setTimeToLive(TTL).build())
+      .build();
     admin.createTable(tableDesc, splitKeys).join();
 
     assertEquals(NUM_REGIONS, TEST_UTIL.getHBaseCluster().getRegions(tableName).size());
     assertTrue("Table should be created with splitKyes + 1 rows in META",
-        admin.isTableAvailable(tableName).get());
+      admin.isTableAvailable(tableName).get());
 
     // Clone & Verify
     admin.cloneTableSchema(tableName, newTableName, preserveSplits).join();
@@ -414,7 +413,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     if (preserveSplits) {
       assertEquals(NUM_REGIONS, TEST_UTIL.getHBaseCluster().getRegions(newTableName).size());
       assertTrue("New table should be created with splitKyes + 1 rows in META",
-          admin.isTableAvailable(newTableName).get());
+        admin.isTableAvailable(newTableName).get());
     } else {
       assertEquals(1, TEST_UTIL.getHBaseCluster().getRegions(newTableName).size());
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java
index 6ba960b..b84dfb6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java
@@ -47,15 +47,15 @@ import org.junit.runners.Parameterized;
 
 /**
  * Class to test asynchronous table admin operations.
- * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our
- * ten minute timeout so they were split.
+ * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our ten
+ *      minute timeout so they were split.
  */
 @RunWith(Parameterized.class)
 @Category({ LargeTests.class, ClientTests.class })
 public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncTableAdminApi3.class);
+    HBaseClassTestRule.forClass(TestAsyncTableAdminApi3.class);
 
   @Test
   public void testTableExist() throws Exception {
@@ -123,7 +123,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
     assertEquals(tables.length + 1, size);
     for (int i = 0, j = 0; i < tables.length && j < size; i++, j++) {
       assertTrue("tableName should be equal in order",
-          tableDescs.get(j).getTableName().equals(tables[i]));
+        tableDescs.get(j).getTableName().equals(tables[i]));
     }
     assertTrue(tableDescs.get(size - 1).getTableName().equals(TableName.META_TABLE_NAME));
 
@@ -168,7 +168,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
 
     this.admin.disableTable(tableName).join();
     assertTrue("Table must be disabled.", TEST_UTIL.getHBaseCluster().getMaster()
-        .getTableStateManager().isTableState(tableName, TableState.State.DISABLED));
+      .getTableStateManager().isTableState(tableName, TableState.State.DISABLED));
     assertEquals(TableState.State.DISABLED, TestAsyncTableAdminApi.getStateFromMeta(tableName));
 
     // Test that table is disabled
@@ -190,7 +190,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
     assertTrue(ok);
     this.admin.enableTable(tableName).join();
     assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
-        .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
+      .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
     assertEquals(TableState.State.ENABLED, TestAsyncTableAdminApi.getStateFromMeta(tableName));
 
     // Test that table is enabled
@@ -232,7 +232,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
     table2.get(get).get();
 
     admin.listTableNames(Pattern.compile(tableName.getNameAsString() + ".*"), false).get()
-        .forEach(t -> admin.disableTable(t).join());
+      .forEach(t -> admin.disableTable(t).join());
 
     // Test that tables are disabled
     get = new Get(row);
@@ -256,7 +256,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
     assertEquals(TableState.State.DISABLED, TestAsyncTableAdminApi.getStateFromMeta(tableName2));
 
     admin.listTableNames(Pattern.compile(tableName.getNameAsString() + ".*"), false).get()
-        .forEach(t -> admin.enableTable(t).join());
+      .forEach(t -> admin.enableTable(t).join());
 
     // Test that tables are enabled
     try {
@@ -283,8 +283,8 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
     createTableWithDefaultConf(tableName, splitKeys);
 
     AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
-    List<HRegionLocation> regions = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+    List<HRegionLocation> regions =
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
     assertEquals(
       "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
       expectedRegions, regions.size());
@@ -294,8 +294,8 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
     // Enable table, use retain assignment to assign regions.
     admin.enableTable(tableName).join();
 
-    List<HRegionLocation> regions2 = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+    List<HRegionLocation> regions2 =
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
     // Check the assignment.
     assertEquals(regions.size(), regions2.size());
     assertTrue(regions2.containsAll(regions));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
index 6c6bb98..058da39 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.junit.Assert.assertNotNull;
 
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
@@ -47,18 +48,19 @@ public class TestAsyncTableLocatePrefetch {
 
   private static byte[] FAMILY = Bytes.toBytes("cf");
 
-  private static AsyncConnection CONN;
+  private static AsyncConnectionImpl CONN;
 
-  private static AsyncNonMetaRegionLocator LOCATOR;
+  private static AsyncRegionLocator LOCATOR;
 
   @BeforeClass
   public static void setUp() throws Exception {
-    TEST_UTIL.getConfiguration().setInt(AsyncNonMetaRegionLocator.LOCATE_PREFETCH_LIMIT, 100);
+    TEST_UTIL.getConfiguration().setInt(AsyncRegionLocator.LOCATE_PREFETCH_LIMIT, 100);
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
-    LOCATOR = new AsyncNonMetaRegionLocator((AsyncConnectionImpl) CONN);
+    CONN = (AsyncConnectionImpl) ConnectionFactory
+      .createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+    LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
   }
 
   @AfterClass
@@ -70,7 +72,7 @@ public class TestAsyncTableLocatePrefetch {
   @Test
   public void test() throws InterruptedException, ExecutionException {
     assertNotNull(LOCATOR.getRegionLocations(TABLE_NAME, Bytes.toBytes("zzz"),
-      RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get());
+      RegionLocateType.CURRENT, false, TimeUnit.MINUTES.toNanos(1)).get());
     // we finish the request before we adding the remaining results to cache so sleep a bit here
     Thread.sleep(1000);
     // confirm that the locations of all the regions have been cached.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
index 461bf1b..3eeee50 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
@@ -84,8 +84,7 @@ public class TestAsyncTableRSCrashPublish {
   public void test() throws IOException, ExecutionException, InterruptedException {
     Configuration conf = UTIL.getHBaseCluster().getMaster().getConfiguration();
     try (AsyncConnection connection = ConnectionFactory.createAsyncConnection(conf).get()) {
-      AsyncNonMetaRegionLocator locator =
-        ((AsyncConnectionImpl) connection).getLocator().getNonMetaRegionLocator();
+      AsyncRegionLocator locator = ((AsyncConnectionImpl) connection).getLocator();
       connection.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
       ServerName serverName =
         locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
index 3485955..c6a30d0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
@@ -91,9 +91,7 @@ public class TestAsyncTableUseMetaReplicas {
     conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
       FailPrimaryMetaScanCp.class.getName());
     UTIL.startMiniCluster(3);
-    try (ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf)) {
-      RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
-    }
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL);
     try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
       table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
index 0e0060c..fa9370b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
@@ -113,7 +113,7 @@ public class TestMasterRegistry {
       try (MasterRegistry registry = new MasterRegistry(conf)) {
         // Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
         // because not all replicas had made it up before test started.
-        RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry);
+        RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
         assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
         assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
         List<HRegionLocation> metaLocations =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
index abaf092..71abaae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
@@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -46,11 +46,13 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category({SmallTests.class, MasterTests.class })
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ SmallTests.class, MasterTests.class })
 public class TestMetaRegionLocationCache {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestMetaRegionLocationCache.class);
+    HBaseClassTestRule.forClass(TestMetaRegionLocationCache.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private static ConnectionRegistry REGISTRY;
@@ -60,19 +62,19 @@ public class TestMetaRegionLocationCache {
     TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
     TEST_UTIL.startMiniCluster(3);
     REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
-    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
   }
 
   @AfterClass
   public static void cleanUp() throws Exception {
-    IOUtils.closeQuietly(REGISTRY);
+    Closeables.close(REGISTRY, true);
     TEST_UTIL.shutdownMiniCluster();
   }
 
   private List<HRegionLocation> getCurrentMetaLocations(ZKWatcher zk) throws Exception {
     List<HRegionLocation> result = new ArrayList<>();
-    for (String znode: zk.getMetaReplicaNodes()) {
+    for (String znode : zk.getMetaReplicaNodes()) {
       String path = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, znode);
       int replicaId = zk.getZNodePaths().getMetaReplicaIdFromPath(path);
       RegionState state = MetaTableLocator.getMetaRegionState(zk, replicaId);
@@ -92,7 +94,7 @@ public class TestMetaRegionLocationCache {
       }
     }
     List<HRegionLocation> metaHRLs =
-        master.getMetaRegionLocationCache().getMetaRegionLocations().get();
+      master.getMetaRegionLocationCache().getMetaRegionLocations().get();
     assertFalse(metaHRLs.isEmpty());
     ZKWatcher zk = master.getZooKeeper();
     List<String> metaZnodes = zk.getMetaReplicaNodes();
@@ -103,11 +105,13 @@ public class TestMetaRegionLocationCache {
     assertEquals(actualHRLs, metaHRLs);
   }
 
-  @Test public void testInitialMetaLocations() throws Exception {
+  @Test
+  public void testInitialMetaLocations() throws Exception {
     verifyCachedMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster());
   }
 
-  @Test public void testStandByMetaLocations() throws Exception {
+  @Test
+  public void testStandByMetaLocations() throws Exception {
     HMaster standBy = TEST_UTIL.getMiniHBaseCluster().startMaster().getMaster();
     verifyCachedMetaLocations(standBy);
   }
@@ -115,16 +119,17 @@ public class TestMetaRegionLocationCache {
   /*
    * Shuffles the meta region replicas around the cluster and makes sure the cache is not stale.
    */
-  @Test public void testMetaLocationsChange() throws Exception {
+  @Test
+  public void testMetaLocationsChange() throws Exception {
     List<HRegionLocation> currentMetaLocs =
-        getCurrentMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeper());
+      getCurrentMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeper());
     // Move these replicas to random servers.
-    for (HRegionLocation location: currentMetaLocs) {
+    for (HRegionLocation location : currentMetaLocs) {
       RegionReplicaTestHelper.moveRegion(TEST_UTIL, location);
     }
-    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
-    for (JVMClusterUtil.MasterThread masterThread:
-        TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
+    for (JVMClusterUtil.MasterThread masterThread : TEST_UTIL.getMiniHBaseCluster()
+      .getMasterThreads()) {
       verifyCachedMetaLocations(masterThread.getMaster());
     }
   }
@@ -133,7 +138,8 @@ public class TestMetaRegionLocationCache {
    * Tests MetaRegionLocationCache's init procedure to make sure that it correctly watches the base
    * znode for notifications.
    */
-  @Test public void testMetaRegionLocationCache() throws Exception {
+  @Test
+  public void testMetaRegionLocationCache() throws Exception {
     final String parentZnodeName = "/randomznodename";
     Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parentZnodeName);
@@ -143,7 +149,8 @@ public class TestMetaRegionLocationCache {
       // some ZK activity in the background.
       MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
       ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) {
-        @Override public void doAnAction() throws Exception {
+        @Override
+        public void doAnAction() throws Exception {
           final String testZnode = parentZnodeName + "/child";
           ZKUtil.createNodeIfNotExistsAndWatch(zkWatcher, testZnode, testZnode.getBytes());
           ZKUtil.deleteNode(zkWatcher, testZnode);
@@ -163,8 +170,8 @@ public class TestMetaRegionLocationCache {
         // Wait until the meta cache is populated.
         int iters = 0;
         while (iters++ < 10) {
-          if (metaCache.getMetaRegionLocations().isPresent()
-            && metaCache.getMetaRegionLocations().get().size() == 3) {
+          if (metaCache.getMetaRegionLocations().isPresent() &&
+            metaCache.getMetaRegionLocations().get().size() == 3) {
             break;
           }
           Thread.sleep(1000);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java
index db08165..0100ad7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java
@@ -118,6 +118,10 @@ public class TestMetaWithReplicasShutdownHandling extends MetaWithReplicasTestBa
           Thread.sleep(
             conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 30000) * 3);
         }
+        // cache the location for all the meta regions.
+        try (RegionLocator locator = c.getRegionLocator(TableName.META_TABLE_NAME)) {
+          locator.getAllRegionLocations();
+        }
         // Ensure all metas are not on same hbase:meta replica=0 server!
 
         master = util.getHBaseClusterInterface().getClusterMetrics().getMasterName();
@@ -131,7 +135,6 @@ public class TestMetaWithReplicasShutdownHandling extends MetaWithReplicasTestBa
           util.getHBaseClusterInterface().killRegionServer(primary);
           util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000);
         }
-        c.clearRegionLocationCache();
       }
       LOG.info("Running GETs");
       try (Table htable = c.getTable(TABLE)) {
@@ -150,15 +153,15 @@ public class TestMetaWithReplicasShutdownHandling extends MetaWithReplicasTestBa
         util.getHBaseClusterInterface().startRegionServer(primary.getHostname(), 0);
         util.getHBaseClusterInterface().waitForActiveAndReadyMaster();
         LOG.info("Master active!");
-        c.clearRegionLocationCache();
       }
     }
     conf.setBoolean(HConstants.USE_META_REPLICAS, false);
     LOG.info("Running GETs no replicas");
-    try (Connection c = ConnectionFactory.createConnection(conf);
-      Table htable = c.getTable(TABLE)) {
-      Result r = htable.get(new Get(row));
-      assertArrayEquals(row, r.getRow());
+    try (Connection c = ConnectionFactory.createConnection(conf)) {
+      try (Table htable = c.getTable(TABLE)) {
+        Result r = htable.get(new Get(row));
+        assertArrayEquals(r.getRow(), row);
+      }
     }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index d7fc8e0..47a47ce 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -211,7 +211,7 @@ public class TestReplicasClient {
 
     // No master
     LOG.info("Master is going to be stopped");
-    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
     Configuration c = new Configuration(HTU.getConfiguration());
     c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
     LOG.info("Master has stopped");
@@ -225,7 +225,9 @@ public class TestReplicasClient {
 
   @Before
   public void before() throws IOException {
-    HTU.getConnection().clearRegionLocationCache();
+    try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME)) {
+      locator.clearRegionLocationCache();
+    }
     try {
       openRegion(hriPrimary);
     } catch (Exception ignored) {
@@ -247,7 +249,6 @@ public class TestReplicasClient {
       closeRegion(hriPrimary);
     } catch (Exception ignored) {
     }
-    HTU.getConnection().clearRegionLocationCache();
   }
 
   private HRegionServer getRS() {
@@ -326,16 +327,15 @@ public class TestReplicasClient {
     byte[] b1 = Bytes.toBytes("testLocations");
     openRegion(hriSecondary);
 
-    try (Connection conn = ConnectionFactory.createConnection(HTU.getConfiguration());
-        RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
-      conn.clearRegionLocationCache();
+    try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME)) {
+      locator.clearRegionLocationCache();
       List<HRegionLocation> rl = locator.getRegionLocations(b1, true);
       Assert.assertEquals(2, rl.size());
 
       rl = locator.getRegionLocations(b1, false);
       Assert.assertEquals(2, rl.size());
 
-      conn.clearRegionLocationCache();
+      locator.clearRegionLocationCache();
       rl = locator.getRegionLocations(b1, false);
       Assert.assertEquals(2, rl.size());
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
index 137cb28..af4533b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
@@ -19,16 +19,16 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.IntStream;
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -48,6 +48,8 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
 @Category({ MediumTests.class, ClientTests.class })
 public class TestZKConnectionRegistry {
 
@@ -73,7 +75,7 @@ public class TestZKConnectionRegistry {
 
   @AfterClass
   public static void tearDown() throws Exception {
-    IOUtils.closeQuietly(REGISTRY);
+    Closeables.close(REGISTRY, true);
     TEST_UTIL.shutdownMiniCluster();
   }
 
@@ -86,8 +88,7 @@ public class TestZKConnectionRegistry {
       clusterId);
     assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
       REGISTRY.getActiveMaster().get());
-    RegionReplicaTestHelper
-      .waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
     RegionLocations locs = REGISTRY.getMetaRegionLocations().get();
     assertEquals(3, locs.getRegionLocations().length);
     IntStream.range(0, 3).forEach(i -> {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java
index 41a008a..8a760dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AlwaysStandByHMaster.java
@@ -62,10 +62,10 @@ public class AlwaysStandByHMaster extends HMaster {
             if (MasterAddressTracker.getMasterAddress(watcher) != null) {
               clusterHasActiveMaster.set(true);
             }
-            Threads.sleepWithoutInterrupt(100);
           } catch (IOException e) {
             // pass, we will get notified when some other active master creates the znode.
           }
+          Threads.sleepWithoutInterrupt(1000);
         } catch (KeeperException e) {
           master.abort("Received an unexpected KeeperException, aborting", e);
           return false;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 50851c1..4a68f7a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
@@ -506,4 +507,10 @@ public class MockNoopMasterServices implements MasterServices {
   public boolean isBalancerOn() {
     return false;
   }
+
+  @Override
+  public List<RegionLocations> getAllMetaRegionLocations(boolean excludeOfflinedSplitParents)
+    throws IOException {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
index a27936d..45fac6c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
@@ -28,12 +28,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -161,54 +158,5 @@ public class TestMasterFailover {
       TEST_UTIL.shutdownMiniCluster();
     }
   }
-
-  /**
-   * Test meta in transition when master failover.
-   * This test used to manipulate region state up in zk. That is not allowed any more in hbase2
-   * so I removed that messing. That makes this test anemic.
-   */
-  @Test
-  public void testMetaInTransitionWhenMasterFailover() throws Exception {
-    // Start the cluster
-    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-    TEST_UTIL.startMiniCluster();
-    try {
-      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-      LOG.info("Cluster started");
-
-      HMaster activeMaster = cluster.getMaster();
-      ServerName metaServerName = cluster.getServerHoldingMeta();
-      HRegionServer hrs = cluster.getRegionServer(metaServerName);
-
-      // Now kill master, meta should remain on rs, where we placed it before.
-      LOG.info("Aborting master");
-      activeMaster.abort("test-kill");
-      cluster.waitForMasterToStop(activeMaster.getServerName(), 30000);
-      LOG.info("Master has aborted");
-
-      // meta should remain where it was
-      RegionState metaState = MetaTableLocator.getMetaRegionState(hrs.getZooKeeper());
-      assertEquals("hbase:meta should be online on RS",
-          metaState.getServerName(), metaServerName);
-      assertEquals("hbase:meta should be online on RS", State.OPEN, metaState.getState());
-
-      // Start up a new master
-      LOG.info("Starting up a new master");
-      activeMaster = cluster.startMaster().getMaster();
-      LOG.info("Waiting for master to be ready");
-      cluster.waitForActiveAndReadyMaster();
-      LOG.info("Master is ready");
-
-      // ensure meta is still deployed on RS
-      metaState = MetaTableLocator.getMetaRegionState(activeMaster.getZooKeeper());
-      assertEquals("hbase:meta should be online on RS",
-          metaState.getServerName(), metaServerName);
-      assertEquals("hbase:meta should be online on RS", State.OPEN, metaState.getState());
-
-      // Done, shutdown the cluster
-    } finally {
-      TEST_UTIL.shutdownMiniCluster();
-    }
-  }
 }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java
index 425d08b..f2b9a56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.ServerName;
@@ -89,7 +90,20 @@ public class TestMetaAssignmentWithStopMaster {
         }
       }
 
-      ServerName newMetaServer = locator.getAllRegionLocations().get(0).getServerName();
+      ServerName newMetaServer;
+      startTime = System.currentTimeMillis();
+      for (;;) {
+        try {
+          newMetaServer = locator.getAllRegionLocations().get(0).getServerName();
+          break;
+        } catch (IOException e) {
+          LOG.warn("failed to get all locations, retry...", e);
+        }
+        Thread.sleep(3000);
+        if (System.currentTimeMillis() - startTime > WAIT_TIMEOUT) {
+          fail("Wait too long for getting the new meta location");
+        }
+      }
       assertTrue("The new meta server " + newMetaServer + " should be same with" +
         " the old meta server " + oldMetaServer, newMetaServer.equals(oldMetaServer));
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
index 742734e..efb784e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.zookeeper.KeeperException;
@@ -99,8 +97,6 @@ public class TestMetaShutdownHandler {
       metaServerName =
         regionStates.getRegionServerOfRegion(RegionInfoBuilder.FIRST_META_REGIONINFO);
     }
-    RegionState metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper());
-    assertEquals("Wrong state for meta!", RegionState.State.OPEN, metaState.getState());
     assertNotEquals("Meta is on master!", metaServerName, master.getServerName());
 
     // Delete the ephemeral node of the meta-carrying region server.
@@ -126,11 +122,9 @@ public class TestMetaShutdownHandler {
     assertTrue("Meta should be assigned",
       regionStates.isRegionOnline(RegionInfoBuilder.FIRST_META_REGIONINFO));
     // Now, make sure meta is registered in zk
-    metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper());
-    assertEquals("Meta should not be in transition", RegionState.State.OPEN, metaState.getState());
-    assertEquals("Meta should be assigned", metaState.getServerName(),
-      regionStates.getRegionServerOfRegion(RegionInfoBuilder.FIRST_META_REGIONINFO));
-    assertNotEquals("Meta should be assigned on a different server", metaState.getServerName(),
+    ServerName newMetaServerName =
+      regionStates.getRegionServerOfRegion(RegionInfoBuilder.FIRST_META_REGIONINFO);
+    assertNotEquals("Meta should be assigned on a different server", newMetaServerName,
       metaServerName);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java
index 0d3790f..70c5b1c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java
@@ -19,10 +19,10 @@
 package org.apache.hadoop.hbase.master.assignment;
 
 import static org.junit.Assert.assertNotEquals;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -47,15 +47,12 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Category({ RegionServerTests.class, MediumTests.class })
 public class TestRegionReplicaSplit {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestRegionReplicaSplit.class);
-  private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicaSplit.class);
 
   private static final int NB_SERVERS = 4;
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
index 6cd91a7..e706595 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
@@ -165,7 +165,7 @@ public class TestCompactionLifeCycleTracker {
                         .setValue(Bytes.toBytes(i))
                         .build()));
       }
-      UTIL.getAdmin().flush(NAME);
+      UTIL.flush(NAME);
       for (int i = 100; i < 200; i++) {
         byte[] row = Bytes.toBytes(i);
         table.put(new Put(row)
@@ -178,7 +178,7 @@ public class TestCompactionLifeCycleTracker {
                         .setValue(Bytes.toBytes(i))
                         .build()));
       }
-      UTIL.getAdmin().flush(NAME);
+      UTIL.flush(NAME);
     }
     region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
     assertEquals(2, region.getStore(CF1).getStorefilesCount());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
index 504a140..a23aa32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
@@ -110,7 +110,7 @@ public class TestRegionReplicas {
     hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
 
     // No master
-    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
   }
 
   @AfterClass
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index 81a8559..241f425 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -35,10 +34,9 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -90,15 +88,24 @@ public class TestRegionServerNoMaster {
     }
     regionName = hri.getRegionName();
 
-    stopMasterAndAssignMeta(HTU);
+    stopMasterAndCacheMetaLocation(HTU);
   }
 
-  public static void stopMasterAndAssignMeta(HBaseTestingUtility HTU)
-      throws IOException, InterruptedException {
+  public static void stopMasterAndCacheMetaLocation(HBaseTestingUtility HTU)
+    throws IOException, InterruptedException {
+    // cache meta location, so we will not go to master to lookup meta region location
+    for (JVMClusterUtil.RegionServerThread t : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
+      try (RegionLocator locator =
+        t.getRegionServer().getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+        locator.getAllRegionLocations();
+      }
+    }
+    try (RegionLocator locator = HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+      locator.getAllRegionLocations();
+    }
     // Stop master
     HMaster master = HTU.getHBaseCluster().getMaster();
     Thread masterThread = HTU.getHBaseCluster().getMasterThread();
-    ServerName masterAddr = master.getServerName();
     master.stopMaster();
 
     LOG.info("Waiting until master thread exits");
@@ -107,27 +114,6 @@ public class TestRegionServerNoMaster {
     }
 
     HRegionServer.TEST_SKIP_REPORTING_TRANSITION = true;
-    // Master is down, so is the meta. We need to assign it somewhere
-    // so that regions can be assigned during the mocking phase.
-    HRegionServer hrs = HTU.getHBaseCluster()
-      .getLiveRegionServerThreads().get(0).getRegionServer();
-    ZKWatcher zkw = hrs.getZooKeeper();
-    ServerName sn = MetaTableLocator.getMetaRegionLocation(zkw);
-    if (sn != null && !masterAddr.equals(sn)) {
-      return;
-    }
-
-    ProtobufUtil.openRegion(null, hrs.getRSRpcServices(),
-      hrs.getServerName(), RegionInfoBuilder.FIRST_META_REGIONINFO);
-    while (true) {
-      sn = MetaTableLocator.getMetaRegionLocation(zkw);
-      if (sn != null && sn.equals(hrs.getServerName())
-          && hrs.getOnlineRegions().containsKey(
-            RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName())) {
-        break;
-      }
-      Thread.sleep(100);
-    }
   }
 
   /** Flush the given region in the mini cluster. Since no master, we cannot use HBaseAdmin.flush() */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index e56083d..1ec6d36 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -55,7 +54,6 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -133,8 +131,12 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
     // mock a secondary region info to open
     hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
 
+    // cache the location for meta regions
+    try (RegionLocator locator = HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+      locator.getAllRegionLocations();
+    }
     // No master
-    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
     rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
     rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
   }
@@ -186,11 +188,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
     HTU.loadNumericRows(table, f, 0, 1000);
 
     Assert.assertEquals(1000, entries.size());
-    try (AsyncClusterConnection conn = ClusterConnectionFactory
-      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
-      // replay the edits to the secondary using replay callable
-      replicateUsingCallable(conn, entries);
-    }
+    AsyncClusterConnection conn = HTU.getAsyncConnection();
+    // replay the edits to the secondary using replay callable
+    replicateUsingCallable(conn, entries);
 
     Region region = rs0.getRegion(hriSecondary.getEncodedName());
     HTU.verifyNumericRows(region, f, 0, 1000);
@@ -216,36 +216,34 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
   public void testReplayCallableWithRegionMove() throws Exception {
     // tests replaying the edits to a secondary region replica using the Callable directly while
     // the region is moved to another location.It tests handling of RME.
-    try (AsyncClusterConnection conn = ClusterConnectionFactory
-      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
-      openRegion(HTU, rs0, hriSecondary);
-      // load some data to primary
-      HTU.loadNumericRows(table, f, 0, 1000);
+    AsyncClusterConnection conn = HTU.getAsyncConnection();
+    openRegion(HTU, rs0, hriSecondary);
+    // load some data to primary
+    HTU.loadNumericRows(table, f, 0, 1000);
 
-      Assert.assertEquals(1000, entries.size());
+    Assert.assertEquals(1000, entries.size());
 
-      // replay the edits to the secondary using replay callable
-      replicateUsingCallable(conn, entries);
+    // replay the edits to the secondary using replay callable
+    replicateUsingCallable(conn, entries);
 
-      Region region = rs0.getRegion(hriSecondary.getEncodedName());
-      HTU.verifyNumericRows(region, f, 0, 1000);
+    Region region = rs0.getRegion(hriSecondary.getEncodedName());
+    HTU.verifyNumericRows(region, f, 0, 1000);
 
-      HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
+    HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
 
-      // move the secondary region from RS0 to RS1
-      closeRegion(HTU, rs0, hriSecondary);
-      openRegion(HTU, rs1, hriSecondary);
+    // move the secondary region from RS0 to RS1
+    closeRegion(HTU, rs0, hriSecondary);
+    openRegion(HTU, rs1, hriSecondary);
 
-      // replicate the new data
-      replicateUsingCallable(conn, entries);
+    // replicate the new data
+    replicateUsingCallable(conn, entries);
 
-      region = rs1.getRegion(hriSecondary.getEncodedName());
-      // verify the new data. old data may or may not be there
-      HTU.verifyNumericRows(region, f, 1000, 2000);
+    region = rs1.getRegion(hriSecondary.getEncodedName());
+    // verify the new data. old data may or may not be there
+    HTU.verifyNumericRows(region, f, 1000, 2000);
 
-      HTU.deleteNumericRows(table, f, 0, 2000);
-      closeRegion(HTU, rs1, hriSecondary);
-    }
+    HTU.deleteNumericRows(table, f, 0, 2000);
+    closeRegion(HTU, rs1, hriSecondary);
   }
 
   @Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
index 8afc1f1..4cf1508 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
@@ -129,7 +129,7 @@ public class TestHBaseFsckEncryption {
       table.close();
     }
     // Flush it
-    TEST_UTIL.getAdmin().flush(tableDescriptor.getTableName());
+    TEST_UTIL.flush(tableDescriptor.getTableName());
 
     // Verify we have encrypted store files on disk
     final List<Path> paths = findStorefilePaths(tableDescriptor.getTableName());
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
index c7b45fe..0fd3467 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
@@ -17,23 +17,16 @@
  */
 package org.apache.hadoop.hbase.zookeeper;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaRegionServer;
 
@@ -46,9 +39,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaReg
  * <p/>
  * Meta region location is set by <code>RegionServerServices</code>. This class doesn't use ZK
  * watchers, rather accesses ZK directly.
- * <p/>
- * TODO: rewrite using RPC calls to master to find out about hbase:meta.
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. Now we store the meta location in the local
+ *             store of master, the location on zk is only a mirror of the first meta region to keep
+ *             compatibility.
  */
+@Deprecated
 @InterfaceAudience.Private
 public final class MetaTableLocator {
   private static final Logger LOG = LoggerFactory.getLogger(MetaTableLocator.class);
@@ -57,166 +52,16 @@ public final class MetaTableLocator {
   }
 
   /**
-   * @param zkw ZooKeeper watcher to be used
-   * @return meta table regions and their locations.
-   */
-  public static List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw) {
-    return getMetaRegionsAndLocations(zkw, RegionInfo.DEFAULT_REPLICA_ID);
-  }
-
-  /**
-   * Gets the meta regions and their locations for the given path and replica ID.
-   *
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
-   * @param replicaId the ID of the replica
-   * @return meta table regions and their locations.
-   */
-  public static List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw,
-      int replicaId) {
-    ServerName serverName = getMetaRegionLocation(zkw, replicaId);
-    List<Pair<RegionInfo, ServerName>> list = new ArrayList<>(1);
-    list.add(new Pair<>(RegionReplicaUtil.getRegionInfoForReplica(
-        RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId), serverName));
-    return list;
-  }
-
-  /**
-   * Gets the meta regions for the given path with the default replica ID.
-   *
-   * @param zkw ZooKeeper watcher to be used
-   * @return List of meta regions
-   */
-  public static List<RegionInfo> getMetaRegions(ZKWatcher zkw) {
-    return getMetaRegions(zkw, RegionInfo.DEFAULT_REPLICA_ID);
-  }
-
-  /**
-   * Gets the meta regions for the given path and replica ID.
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
-   * @param replicaId the ID of the replica
-   * @return List of meta regions
-   */
-  public static List<RegionInfo> getMetaRegions(ZKWatcher zkw, int replicaId) {
-    List<Pair<RegionInfo, ServerName>> result;
-    result = getMetaRegionsAndLocations(zkw, replicaId);
-    return getListOfRegionInfos(result);
-  }
-
-  private static List<RegionInfo> getListOfRegionInfos(
-      final List<Pair<RegionInfo, ServerName>> pairs) {
-    if (pairs == null || pairs.isEmpty()) {
-      return Collections.emptyList();
-    }
-
-    List<RegionInfo> result = new ArrayList<>(pairs.size());
-    for (Pair<RegionInfo, ServerName> pair : pairs) {
-      result.add(pair.getFirst());
-    }
-    return result;
-  }
-
-  /**
-   * Gets the meta region location, if available.  Does not block.
-   * @param zkw zookeeper connection to use
-   * @return server name or null if we failed to get the data.
-   */
-  public static ServerName getMetaRegionLocation(final ZKWatcher zkw) {
-    try {
-      RegionState state = getMetaRegionState(zkw);
-      return state.isOpened() ? state.getServerName() : null;
-    } catch (KeeperException ke) {
-      return null;
-    }
-  }
-
-  /**
-   * Gets the meta region location, if available.  Does not block.
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
-   * @param replicaId the ID of the replica
-   * @return server name
-   */
-  public static ServerName getMetaRegionLocation(final ZKWatcher zkw, int replicaId) {
-    try {
-      RegionState state = getMetaRegionState(zkw, replicaId);
-      return state.isOpened() ? state.getServerName() : null;
-    } catch (KeeperException ke) {
-      return null;
-    }
-  }
-
-  /**
-   * Gets the meta region location, if available, and waits for up to the specified timeout if not
-   * immediately available. Given the zookeeper notification could be delayed, we will try to get
-   * the latest data.
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
-   * @param timeout maximum time to wait, in millis
-   * @return server name for server hosting meta region formatted as per {@link ServerName}, or null
-   *         if none available
-   * @throws InterruptedException if interrupted while waiting
-   * @throws NotAllMetaRegionsOnlineException if a meta or root region is not online
-   */
-  public static ServerName waitMetaRegionLocation(ZKWatcher zkw, long timeout)
-      throws InterruptedException, NotAllMetaRegionsOnlineException {
-    return waitMetaRegionLocation(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout);
-  }
-
-  /**
-   * Gets the meta region location, if available, and waits for up to the specified timeout if not
-   * immediately available. Given the zookeeper notification could be delayed, we will try to get
-   * the latest data.
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
-   * @param replicaId the ID of the replica
-   * @param timeout maximum time to wait, in millis
-   * @return server name for server hosting meta region formatted as per {@link ServerName}, or null
-   *         if none available
-   * @throws InterruptedException if waiting for the socket operation fails
-   * @throws NotAllMetaRegionsOnlineException if a meta or root region is not online
-   */
-  public static ServerName waitMetaRegionLocation(ZKWatcher zkw, int replicaId, long timeout)
-      throws InterruptedException, NotAllMetaRegionsOnlineException {
-    try {
-      if (ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) == -1) {
-        String errorMsg = "Check the value configured in 'zookeeper.znode.parent'. " +
-          "There could be a mismatch with the one configured in the master.";
-        LOG.error(errorMsg);
-        throw new IllegalArgumentException(errorMsg);
-      }
-    } catch (KeeperException e) {
-      throw new IllegalStateException("KeeperException while trying to check baseZNode:", e);
-    }
-    ServerName sn = blockUntilAvailable(zkw, replicaId, timeout);
-
-    if (sn == null) {
-      throw new NotAllMetaRegionsOnlineException("Timed out; " + timeout + "ms");
-    }
-
-    return sn;
-  }
-
-  /**
-   * Sets the location of <code>hbase:meta</code> in ZooKeeper to the
-   * specified server address.
-   * @param zookeeper zookeeper reference
-   * @param serverName The server hosting <code>hbase:meta</code>
-   * @param state The region transition state
-   * @throws KeeperException unexpected zookeeper exception
-   */
-  public static void setMetaLocation(ZKWatcher zookeeper,
-      ServerName serverName, RegionState.State state) throws KeeperException {
-    setMetaLocation(zookeeper, serverName, RegionInfo.DEFAULT_REPLICA_ID, state);
-  }
-
-  /**
    * Sets the location of <code>hbase:meta</code> in ZooKeeper to the specified server address.
    * @param zookeeper reference to the {@link ZKWatcher} which also contains configuration and
-   *                  operation
+   *          operation
    * @param serverName the name of the server
    * @param replicaId the ID of the replica
    * @param state the state of the region
    * @throws KeeperException if a ZooKeeper operation fails
    */
   public static void setMetaLocation(ZKWatcher zookeeper, ServerName serverName, int replicaId,
-      RegionState.State state) throws KeeperException {
+    RegionState.State state) throws KeeperException {
     if (serverName == null) {
       LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
       return;
@@ -225,43 +70,32 @@ public final class MetaTableLocator {
       serverName);
     // Make the MetaRegionServer pb and then get its bytes and save this as
     // the znode content.
-    MetaRegionServer pbrsr = MetaRegionServer.newBuilder()
-      .setServer(ProtobufUtil.toServerName(serverName))
-      .setRpcVersion(HConstants.RPC_CURRENT_VERSION)
-      .setState(state.convert()).build();
+    MetaRegionServer pbrsr =
+      MetaRegionServer.newBuilder().setServer(ProtobufUtil.toServerName(serverName))
+        .setRpcVersion(HConstants.RPC_CURRENT_VERSION).setState(state.convert()).build();
     byte[] data = ProtobufUtil.prependPBMagic(pbrsr.toByteArray());
     try {
-      ZKUtil.setData(zookeeper,
-          zookeeper.getZNodePaths().getZNodeForReplica(replicaId), data);
-    } catch(KeeperException.NoNodeException nne) {
+      ZKUtil.setData(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId), data);
+    } catch (KeeperException.NoNodeException nne) {
       if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
         LOG.debug("META region location doesn't exist, create it");
       } else {
-        LOG.debug("META region location doesn't exist for replicaId=" + replicaId +
-            ", create it");
+        LOG.debug("META region location doesn't exist for replicaId=" + replicaId + ", create it");
       }
       ZKUtil.createAndWatch(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId),
-              data);
+        data);
     }
   }
 
   /**
-   * Load the meta region state from the meta server ZNode.
-   */
-  public static RegionState getMetaRegionState(ZKWatcher zkw) throws KeeperException {
-    return getMetaRegionState(zkw, RegionInfo.DEFAULT_REPLICA_ID);
-  }
-
-  /**
    * Load the meta region state from the meta region server ZNode.
-   *
    * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
    * @param replicaId the ID of the replica
    * @return regionstate
    * @throws KeeperException if a ZooKeeper operation fails
    */
   public static RegionState getMetaRegionState(ZKWatcher zkw, int replicaId)
-      throws KeeperException {
+    throws KeeperException {
     RegionState regionState = null;
     try {
       byte[] data = ZKUtil.getData(zkw, zkw.getZNodePaths().getZNodeForReplica(replicaId));
@@ -273,109 +107,4 @@ public final class MetaTableLocator {
     }
     return regionState;
   }
-
-  /**
-   * Deletes the location of <code>hbase:meta</code> in ZooKeeper.
-   * @param zookeeper zookeeper reference
-   * @throws KeeperException unexpected zookeeper exception
-   */
-  public static void deleteMetaLocation(ZKWatcher zookeeper)
-    throws KeeperException {
-    deleteMetaLocation(zookeeper, RegionInfo.DEFAULT_REPLICA_ID);
-  }
-
-  public static void deleteMetaLocation(ZKWatcher zookeeper, int replicaId)
-    throws KeeperException {
-    if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
-      LOG.info("Deleting hbase:meta region location in ZooKeeper");
-    } else {
-      LOG.info("Deleting hbase:meta for {} region location in ZooKeeper", replicaId);
-    }
-    try {
-      // Just delete the node.  Don't need any watches.
-      ZKUtil.deleteNode(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId));
-    } catch(KeeperException.NoNodeException nne) {
-      // Has already been deleted
-    }
-  }
-  /**
-   * Wait until the primary meta region is available. Get the secondary locations as well but don't
-   * block for those.
-   *
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
-   * @param timeout maximum time to wait in millis
-   * @param conf the {@link Configuration} to use
-   * @return ServerName or null if we timed out.
-   * @throws InterruptedException if waiting for the socket operation fails
-   */
-  public static List<ServerName> blockUntilAvailable(final ZKWatcher zkw, final long timeout,
-      Configuration conf) throws InterruptedException {
-    int numReplicasConfigured = 1;
-
-    List<ServerName> servers = new ArrayList<>();
-    // Make the blocking call first so that we do the wait to know
-    // the znodes are all in place or timeout.
-    ServerName server = blockUntilAvailable(zkw, timeout);
-
-    if (server == null) {
-      return null;
-    }
-
-    servers.add(server);
-
-    try {
-      List<String> metaReplicaNodes = zkw.getMetaReplicaNodes();
-      numReplicasConfigured = metaReplicaNodes.size();
-    } catch (KeeperException e) {
-      LOG.warn("Got ZK exception {}", e);
-    }
-    for (int replicaId = 1; replicaId < numReplicasConfigured; replicaId++) {
-      // return all replica locations for the meta
-      servers.add(getMetaRegionLocation(zkw, replicaId));
-    }
-    return servers;
-  }
-
-  /**
-   * Wait until the meta region is available and is not in transition.
-   * @param zkw zookeeper connection to use
-   * @param timeout maximum time to wait, in millis
-   * @return ServerName or null if we timed out.
-   * @throws InterruptedException if waiting for the socket operation fails
-   */
-  public static ServerName blockUntilAvailable(final ZKWatcher zkw, final long timeout)
-      throws InterruptedException {
-    return blockUntilAvailable(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout);
-  }
-
-  /**
-   * Wait until the meta region is available and is not in transition.
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and constants
-   * @param replicaId the ID of the replica
-   * @param timeout maximum time to wait in millis
-   * @return ServerName or null if we timed out.
-   * @throws InterruptedException if waiting for the socket operation fails
-   */
-  public static ServerName blockUntilAvailable(final ZKWatcher zkw, int replicaId,
-      final long timeout) throws InterruptedException {
-    if (timeout < 0) {
-      throw new IllegalArgumentException();
-    }
-
-    if (zkw == null) {
-      throw new IllegalArgumentException();
-    }
-
-    long startTime = System.currentTimeMillis();
-    ServerName sn = null;
-    while (true) {
-      sn = getMetaRegionLocation(zkw, replicaId);
-      if (sn != null ||
-        (System.currentTimeMillis() - startTime) > timeout - HConstants.SOCKET_RETRY_WAIT_MS) {
-        break;
-      }
-      Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
-    }
-    return sn;
-  }
 }
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 19d11d0..d1a4dfe 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -37,14 +37,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.AuthUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.security.Superusers;
@@ -78,6 +75,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 
@@ -1859,15 +1857,6 @@ public final class ZKUtil {
           sb.append("\n ").append(child);
         }
       }
-      sb.append("\nRegion server holding hbase:meta: "
-        + MetaTableLocator.getMetaRegionLocation(zkw));
-      Configuration conf = HBaseConfiguration.create();
-      int numMetaReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
-               HConstants.DEFAULT_META_REPLICA_NUM);
-      for (int i = 1; i < numMetaReplicas; i++) {
-        sb.append("\nRegion server holding hbase:meta, replicaId " + i + " "
-                    + MetaTableLocator.getMetaRegionLocation(zkw, i));
-      }
       sb.append("\nRegion servers:");
       final List<String> rsChildrenNoWatchList =
               listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);