You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/07/22 15:48:37 UTC

[hbase] 02/09: HBASE-24389 Introduce new master rpc methods to locate meta region through root region (#1774)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-24950
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 12bff02b2af3b5d919326b03613066837677154b
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Jun 27 15:47:51 2020 +0800

    HBASE-24389 Introduce new master rpc methods to locate meta region through root region (#1774)
    
    Signed-off-by: stack <st...@apache.org>
---
 .../apache/hadoop/hbase/CatalogFamilyFormat.java   |  30 +
 .../hadoop/hbase/ClientMetaTableAccessor.java      |  29 +-
 .../client/AbstractAsyncTableRegionLocator.java    | 310 +++++++++
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |   4 +-
 .../hbase/client/AsyncMetaRegionLocator.java       | 146 -----
 .../hbase/client/AsyncMetaTableRegionLocator.java  | 155 +++++
 .../hbase/client/AsyncNonMetaRegionLocator.java    | 725 ---------------------
 .../client/AsyncNonMetaTableRegionLocator.java     | 206 ++++++
 .../hadoop/hbase/client/AsyncRegionLocator.java    | 170 +++--
 .../hbase/client/AsyncRegionLocatorHelper.java     |  42 +-
 .../hbase/client/AsyncTableRegionLocator.java      |   4 +-
 .../hbase/client/AsyncTableRegionLocatorImpl.java  |  11 +-
 .../hadoop/hbase/client/ConnectionRegistry.java    |   6 -
 .../hadoop/hbase/client/ConnectionUtils.java       |  18 -
 .../apache/hadoop/hbase/client/MasterRegistry.java |   2 +-
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    |  47 +-
 .../hadoop/hbase/client/RegionLocateType.java      |   5 +-
 .../hbase/client/TableRegionLocationCache.java     | 226 +++++++
 .../hadoop/hbase/client/ZKConnectionRegistry.java  |  10 +-
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  27 +
 .../hbase/client/DoNothingConnectionRegistry.java  |   6 -
 .../client/TestAsyncMetaRegionLocatorFailFast.java |  67 --
 .../client/TestAsyncRegionLocatorTracing.java      |  75 ++-
 .../apache/hadoop/hbase/MetaCellComparator.java    |   2 +-
 .../src/main/protobuf/server/master/Master.proto   |  35 +
 .../hadoop/hbase/coprocessor/MasterObserver.java   |  44 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    | 115 +++-
 .../hadoop/hbase/master/MasterCoprocessorHost.java |  40 ++
 .../hadoop/hbase/master/MasterRpcServices.java     | 125 +++-
 .../apache/hadoop/hbase/master/MasterServices.java |   9 +
 .../hbase/master/MetaRegionLocationCache.java      |   9 +-
 .../hbase/master/assignment/RegionStateStore.java  |   6 +-
 .../hbase/master/http/MasterStatusServlet.java     |   5 +-
 .../master/procedure/CreateTableProcedure.java     |   2 -
 .../hbase/master/procedure/ProcedureSyncWait.java  |  14 -
 .../master/snapshot/MasterSnapshotVerifier.java    |   8 +-
 .../hbase/master/snapshot/TakeSnapshotHandler.java |  14 +-
 .../flush/MasterFlushTableProcedureManager.java    |  18 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  18 +-
 .../main/resources/hbase-webapps/master/table.jsp  |  34 +-
 .../apache/hadoop/hbase/TestMetaTableAccessor.java |   8 -
 .../apache/hadoop/hbase/TestMetaTableLocator.java  | 207 ------
 .../hbase/client/AbstractTestRegionLocator.java    |   5 +-
 .../hbase/client/DummyConnectionRegistry.java      |   6 -
 .../hbase/client/MetaWithReplicasTestBase.java     |   9 +-
 .../hbase/client/RegionReplicaTestHelper.java      |  15 +-
 .../client/TestAsyncAdminWithRegionReplicas.java   |   5 +-
 .../hbase/client/TestAsyncMetaRegionLocator.java   |  21 +-
 .../client/TestAsyncNonMetaRegionLocator.java      |  43 +-
 .../hbase/client/TestAsyncRegionAdminApi2.java     |  45 +-
 ... => TestAsyncRegionLocatorConcurrenyLimit.java} |  18 +-
 .../hbase/client/TestAsyncTableAdminApi.java       |  52 +-
 .../hbase/client/TestAsyncTableAdminApi3.java      |  24 +-
 .../hbase/client/TestAsyncTableLocatePrefetch.java |  14 +-
 .../hbase/client/TestAsyncTableRSCrashPublish.java |   3 +-
 .../client/TestAsyncTableUseMetaReplicas.java      |   4 +-
 ...estCatalogReplicaLoadBalanceSimpleSelector.java |  13 +-
 .../hadoop/hbase/client/TestMasterRegistry.java    |   2 +-
 .../hbase/client/TestMetaRegionLocationCache.java  |  39 +-
 .../TestMetaWithReplicasShutdownHandling.java      |  15 +-
 .../hadoop/hbase/client/TestReplicasClient.java    |  14 +-
 .../hbase/client/TestZKConnectionRegistry.java     |   3 +-
 .../hbase/master/MockNoopMasterServices.java       |   6 +
 .../hadoop/hbase/master/TestMasterFailover.java    |  53 +-
 .../master/TestMetaAssignmentWithStopMaster.java   |  16 +-
 .../hbase/master/TestMetaShutdownHandler.java      |  12 +-
 .../master/assignment/TestRegionReplicaSplit.java  |   5 +-
 .../TestCompactionLifeCycleTracker.java            |   4 +-
 .../hbase/regionserver/TestRegionReplicas.java     |   2 +-
 .../regionserver/TestRegionServerNoMaster.java     |  42 +-
 ...stRegionReplicaReplicationEndpointNoMaster.java |  58 +-
 .../hadoop/hbase/util/TestHBaseFsckEncryption.java |   2 +-
 .../hadoop/hbase/zookeeper/MetaTableLocator.java   | 300 +--------
 .../org/apache/hadoop/hbase/zookeeper/ZKUtil.java  |   7 -
 74 files changed, 1865 insertions(+), 2026 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java
index 978198b..0178aeb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CatalogFamilyFormat.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hbase;
 
+import static org.apache.hadoop.hbase.HConstants.NINES;
+import static org.apache.hadoop.hbase.HConstants.ZEROES;
+import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
+
 import edu.umd.cs.findbugs.annotations.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -31,8 +35,11 @@ import java.util.regex.Pattern;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocateType;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -418,4 +425,27 @@ public class CatalogFamilyFormat {
     }
     return deleteReplicaLocations;
   }
+
+  private static byte[] buildRegionLocateStartRow(TableName tableName, byte[] row,
+    RegionLocateType locateType) {
+    if (locateType.equals(RegionLocateType.BEFORE)) {
+      if (Bytes.equals(row, HConstants.EMPTY_END_ROW)) {
+        byte[] binaryTableName = tableName.getName();
+        return Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
+      } else {
+        return createRegionName(tableName, row, ZEROES, false);
+      }
+    } else {
+      return createRegionName(tableName, row, NINES, false);
+    }
+  }
+
+  public static Scan createRegionLocateScan(TableName tableName, byte[] row,
+    RegionLocateType locateType, int prefetchLimit) {
+    byte[] startRow = buildRegionLocateStartRow(tableName, row, locateType);
+    byte[] stopRow = RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
+    return new Scan().withStartRow(startRow).withStopRow(stopRow, true)
+      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(prefetchLimit)
+      .setReadType(ReadType.PREAD);
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java
index ecc6573..74d2322 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClientMetaTableAccessor.java
@@ -164,26 +164,27 @@ public final class ClientMetaTableAccessor {
 
   /**
    * Used to get all region locations for the specific table.
-   * @param metaTable
    * @param tableName table we're looking for, can be null for getting all regions
    * @return the list of region locations. The return value will be wrapped by a
    *         {@link CompletableFuture}.
    */
   public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
-    AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) {
+    AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName,
+    boolean excludeOfflinedSplitParents) {
     CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
-    addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-      } else if (locations == null || locations.isEmpty()) {
-        future.complete(Collections.emptyList());
-      } else {
-        List<HRegionLocation> regionLocations =
-          locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
-            .collect(Collectors.toList());
-        future.complete(regionLocations);
-      }
-    });
+    addListener(getTableRegionsAndLocations(metaTable, tableName, excludeOfflinedSplitParents),
+      (locations, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+        } else if (locations == null || locations.isEmpty()) {
+          future.complete(Collections.emptyList());
+        } else {
+          List<HRegionLocation> regionLocations =
+            locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
+              .collect(Collectors.toList());
+          future.complete(regionLocations);
+        }
+      });
     return future;
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractAsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractAsyncTableRegionLocator.java
new file mode 100644
index 0000000..aa7d7e4
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractAsyncTableRegionLocator.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The base class for locating region of a table.
+ */
+@InterfaceAudience.Private
+abstract class AbstractAsyncTableRegionLocator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractAsyncTableRegionLocator.class);
+
+  protected final AsyncConnectionImpl conn;
+
+  protected final TableName tableName;
+
+  protected final int maxConcurrent;
+
+  protected final TableRegionLocationCache cache;
+
+  protected static final class LocateRequest {
+
+    final byte[] row;
+
+    final RegionLocateType locateType;
+
+    public LocateRequest(byte[] row, RegionLocateType locateType) {
+      this.row = row;
+      this.locateType = locateType;
+    }
+
+    @Override
+    public int hashCode() {
+      return Bytes.hashCode(row) ^ locateType.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || obj.getClass() != LocateRequest.class) {
+        return false;
+      }
+      LocateRequest that = (LocateRequest) obj;
+      return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
+    }
+  }
+
+  private final Set<LocateRequest> pendingRequests = new HashSet<>();
+
+  private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
+    new LinkedHashMap<>();
+
+  AbstractAsyncTableRegionLocator(AsyncConnectionImpl conn, TableName tableName,
+    int maxConcurrent, Comparator<byte[]> comparator) {
+    this.conn = conn;
+    this.tableName = tableName;
+    this.maxConcurrent = maxConcurrent;
+    this.cache = new TableRegionLocationCache(comparator, conn.getConnectionMetrics());
+  }
+
+  private boolean hasQuota() {
+    return pendingRequests.size() < maxConcurrent;
+  }
+
+  protected final Optional<LocateRequest> getCandidate() {
+    return allRequests.keySet().stream().filter(r -> !pendingRequests.contains(r)).findFirst();
+  }
+
+  void clearCompletedRequests(RegionLocations locations) {
+    for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
+      allRequests.entrySet().iterator(); iter.hasNext();) {
+      Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
+      if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
+        iter.remove();
+      }
+    }
+  }
+
+  private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
+    RegionLocations locations) {
+    if (future.isDone()) {
+      return true;
+    }
+    if (locations == null) {
+      return false;
+    }
+    HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
+    // we should at least have one location available, otherwise the request should fail and
+    // should not arrive here
+    assert loc != null;
+    boolean completed;
+    if (req.locateType.equals(RegionLocateType.BEFORE)) {
+      // for locating the row before current row, the common case is to find the previous region
+      // in reverse scan, so we check the endKey first. In general, the condition should be
+      // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
+      // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
+      // endKey.
+      byte[] endKey = loc.getRegion().getEndKey();
+      int c = Bytes.compareTo(endKey, req.row);
+      completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) &&
+        Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
+    } else {
+      completed = loc.getRegion().containsRow(req.row);
+    }
+    if (completed) {
+      future.complete(locations);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  protected final void onLocateComplete(LocateRequest req, RegionLocations locs, Throwable error) {
+    if (error != null) {
+      LOG.warn("Failed to locate region in '" + tableName + "', row='" +
+        Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
+    }
+    Optional<LocateRequest> toSend = Optional.empty();
+    if (locs != null) {
+      RegionLocations addedLocs = cache.add(locs);
+      synchronized (this) {
+        pendingRequests.remove(req);
+        clearCompletedRequests(addedLocs);
+        // Remove a complete locate request in a synchronized block, so the table cache must have
+        // quota to send a candidate request.
+        toSend = getCandidate();
+        toSend.ifPresent(pendingRequests::add);
+      }
+      toSend.ifPresent(this::locate);
+    } else {
+      // we meet an error
+      assert error != null;
+      synchronized (this) {
+        pendingRequests.remove(req);
+        // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
+        // already retried several times
+        CompletableFuture<?> future = allRequests.remove(req);
+        if (future != null) {
+          future.completeExceptionally(error);
+        }
+        clearCompletedRequests(null);
+        // Remove a complete locate request in a synchronized block, so the table cache must have
+        // quota to send a candidate request.
+        toSend = getCandidate();
+        toSend.ifPresent(pendingRequests::add);
+      }
+      toSend.ifPresent(this::locate);
+    }
+  }
+
+  // return false means you do not need to go on, just return. And you do not need to call the above
+  // onLocateComplete either when returning false, as we will call it in this method for you, this
+  // is why we need to pass the LocateRequest as a parameter.
+  protected final boolean validateRegionLocations(RegionLocations locs, LocateRequest req) {
+    // remove HRegionLocation with null location, i.e, getServerName returns null.
+    if (locs != null) {
+      locs = locs.removeElementsWithNullLocation();
+    }
+
+    // the default region location should always be presented when fetching from meta, otherwise
+    // let's fail the request.
+    if (locs == null || locs.getDefaultRegionLocation() == null) {
+      onLocateComplete(req, null,
+        new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
+          tableName, Bytes.toStringBinary(req.row), req.locateType)));
+      return false;
+    }
+    HRegionLocation loc = locs.getDefaultRegionLocation();
+    RegionInfo info = loc.getRegion();
+    if (info == null) {
+      onLocateComplete(req, null,
+        new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
+          tableName, Bytes.toStringBinary(req.row), req.locateType)));
+      return false;
+    }
+    return true;
+  }
+
+  protected abstract void locate(LocateRequest req);
+
+  abstract CompletableFuture<List<HRegionLocation>>
+    getAllRegionLocations(boolean excludeOfflinedSplitParents);
+
+  CompletableFuture<RegionLocations> getRegionLocations(byte[] row, int replicaId,
+    RegionLocateType locateType, boolean reload) {
+    if (locateType.equals(RegionLocateType.AFTER)) {
+      row = createClosestRowAfter(row);
+      locateType = RegionLocateType.CURRENT;
+    }
+    if (!reload) {
+      RegionLocations locs = cache.locate(tableName, row, replicaId, locateType);
+      if (isGood(locs, replicaId)) {
+        return CompletableFuture.completedFuture(locs);
+      }
+    }
+    CompletableFuture<RegionLocations> future;
+    LocateRequest req;
+    boolean sendRequest = false;
+    synchronized (this) {
+      // check again
+      if (!reload) {
+        RegionLocations locs = cache.locate(tableName, row, replicaId, locateType);
+        if (isGood(locs, replicaId)) {
+          return CompletableFuture.completedFuture(locs);
+        }
+      }
+      req = new LocateRequest(row, locateType);
+      future = allRequests.get(req);
+      if (future == null) {
+        future = new CompletableFuture<>();
+        allRequests.put(req, future);
+        if (hasQuota() && !pendingRequests.contains(req)) {
+          pendingRequests.add(req);
+          sendRequest = true;
+        }
+      }
+    }
+    if (sendRequest) {
+      locate(req);
+    }
+    return future;
+  }
+
+  void addToCache(RegionLocations locs) {
+    cache.add(locs);
+  }
+
+  // notice that this is not a constant time operation, do not call it on critical path.
+  int getCacheSize() {
+    return cache.size();
+  }
+
+  void clearPendingRequests() {
+    synchronized (this) {
+      if (!allRequests.isEmpty()) {
+        IOException error = new IOException("Cache cleared");
+        for (CompletableFuture<?> future : allRequests.values()) {
+          future.completeExceptionally(error);
+        }
+      }
+    }
+  }
+
+  void clearCache(ServerName serverName) {
+    cache.clearCache(serverName);
+  }
+
+  void removeLocationFromCache(HRegionLocation loc) {
+    cache.removeLocationFromCache(loc);
+  }
+
+  RegionLocations getInCache(byte[] key) {
+    return cache.get(key);
+  }
+
+  // only used for testing whether we have cached the location for a region.
+  @RestrictedApi(explanation = "Should only be called in AsyncRegionLocator",
+    link = "", allowedOnPath = ".*/AsyncRegionLocator.java")
+  RegionLocations locateInCache(byte[] row) {
+    return cache.locate(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID,
+      RegionLocateType.CURRENT);
+  }
+
+  // only used for testing whether we have cached the location for a table.
+  @RestrictedApi(explanation = "Should only be called in AsyncRegionLocator",
+    link = "", allowedOnPath = ".*/AsyncRegionLocator.java")
+  int getNumberOfCachedRegionLocations() {
+    return cache.getNumberOfCachedRegionLocations();
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 25a98ed..5c24d98 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -86,11 +86,11 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   final AsyncConnectionConfiguration connConf;
 
-  private final User user;
+  final User user;
 
   final ConnectionRegistry registry;
 
-  private final int rpcTimeout;
+  final int rpcTimeout;
 
   protected final RpcClient rpcClient;
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
deleted file mode 100644
index 5ae9de6..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * The asynchronous locator for meta region.
- */
-@InterfaceAudience.Private
-class AsyncMetaRegionLocator {
-
-  private final ConnectionRegistry registry;
-
-  private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();
-
-  private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture =
-    new AtomicReference<>();
-
-  AsyncMetaRegionLocator(ConnectionRegistry registry) {
-    this.registry = registry;
-  }
-
-  /**
-   * Get the region locations for meta region. If the location for the given replica is not
-   * available in the cached locations, then fetch from the HBase cluster.
-   * <p/>
-   * The <code>replicaId</code> parameter is important. If the region replication config for meta
-   * region is changed, then the cached region locations may not have the locations for new
-   * replicas. If we do not check the location for the given replica, we will always return the
-   * cached region locations and cause an infinite loop.
-   */
-  CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
-    return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload,
-      registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location");
-  }
-
-  private HRegionLocation getCacheLocation(HRegionLocation loc) {
-    RegionLocations locs = metaRegionLocations.get();
-    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
-  }
-
-  private void addLocationToCache(HRegionLocation loc) {
-    for (;;) {
-      int replicaId = loc.getRegion().getReplicaId();
-      RegionLocations oldLocs = metaRegionLocations.get();
-      if (oldLocs == null) {
-        RegionLocations newLocs = createRegionLocations(loc);
-        if (metaRegionLocations.compareAndSet(null, newLocs)) {
-          return;
-        }
-      }
-      HRegionLocation oldLoc = oldLocs.getRegionLocation(replicaId);
-      if (oldLoc != null && (oldLoc.getSeqNum() > loc.getSeqNum() ||
-        oldLoc.getServerName().equals(loc.getServerName()))) {
-        return;
-      }
-      RegionLocations newLocs = replaceRegionLocation(oldLocs, loc);
-      if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
-        return;
-      }
-    }
-  }
-
-  private void removeLocationFromCache(HRegionLocation loc) {
-    for (;;) {
-      RegionLocations oldLocs = metaRegionLocations.get();
-      if (oldLocs == null) {
-        return;
-      }
-      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
-      if (!canUpdateOnError(loc, oldLoc)) {
-        return;
-      }
-      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
-      if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
-        return;
-      }
-    }
-  }
-
-  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
-    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
-      this::addLocationToCache, this::removeLocationFromCache, null);
-  }
-
-  void clearCache() {
-    metaRegionLocations.set(null);
-  }
-
-  void clearCache(ServerName serverName) {
-    for (;;) {
-      RegionLocations locs = metaRegionLocations.get();
-      if (locs == null) {
-        return;
-      }
-      RegionLocations newLocs = locs.removeByServer(serverName);
-      if (locs == newLocs) {
-        return;
-      }
-      if (newLocs.isEmpty()) {
-        newLocs = null;
-      }
-      if (metaRegionLocations.compareAndSet(locs, newLocs)) {
-        return;
-      }
-    }
-  }
-
-  // only used for testing whether we have cached the location for a region.
-  RegionLocations getRegionLocationInCache() {
-    return metaRegionLocations.get();
-  }
-
-  // only used for testing whether we have cached the location for a table.
-  int getNumberOfCachedRegionLocations() {
-    RegionLocations locs = metaRegionLocations.get();
-    return locs != null ? locs.numNonNullElements() : 0;
-  }
-}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java
new file mode 100644
index 0000000..c8ffa24
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaTableRegionLocator.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MetaCellComparator;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService.Interface;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;
+
+/**
+ * The class for locating region for meta table.
+ */
+@InterfaceAudience.Private
+class AsyncMetaTableRegionLocator extends AbstractAsyncTableRegionLocator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncMetaTableRegionLocator.class);
+
+  private final AtomicReference<Interface> stub = new AtomicReference<>();
+
+  private final AtomicReference<CompletableFuture<Interface>> stubMakeFuture =
+    new AtomicReference<>();
+
+  AsyncMetaTableRegionLocator(AsyncConnectionImpl conn, TableName tableName, int maxConcurrent) {
+    // for meta region we should use MetaCellComparator to compare the row keys
+    super(conn, tableName, maxConcurrent, (r1, r2) -> MetaCellComparator
+      .compareRows(r1, 0, r1.length, r2, 0, r2.length));
+  }
+
+  private Interface createStub(ServerName serverName) throws IOException {
+    return ClientMetaService.newStub(conn.rpcClient.createRpcChannel(serverName, conn.user,
+      (int) TimeUnit.NANOSECONDS.toMillis(conn.connConf.getReadRpcTimeoutNs())));
+  }
+
+  CompletableFuture<Interface> getStub() {
+    return ConnectionUtils.getOrFetch(stub, stubMakeFuture, false, () -> {
+      CompletableFuture<Interface> future = new CompletableFuture<>();
+      addListener(conn.registry.getActiveMaster(), (addr, error) -> {
+        if (error != null) {
+          future.completeExceptionally(error);
+        } else if (addr == null) {
+          future.completeExceptionally(new MasterNotRunningException(
+            "ZooKeeper available but no active master location found"));
+        } else {
+          LOG.debug("The fetched master address is {}", addr);
+          try {
+            future.complete(createStub(addr));
+          } catch (IOException e) {
+            future.completeExceptionally(e);
+          }
+        }
+
+      });
+      return future;
+    }, stub -> true, "ClientLocateMetaStub");
+  }
+
+  private void tryClearMasterStubCache(IOException error, Interface currentStub) {
+    if (ClientExceptionsUtil.isConnectionException(error) ||
+      error instanceof ServerNotRunningYetException) {
+      stub.compareAndSet(currentStub, null);
+    }
+  }
+
+  @Override
+  protected void locate(LocateRequest req) {
+    addListener(getStub(), (stub, error) -> {
+      if (error != null) {
+        onLocateComplete(req, null, error);
+        return;
+      }
+      HBaseRpcController controller = conn.rpcControllerFactory.newController();
+      stub.locateMetaRegion(controller,
+        LocateMetaRegionRequest.newBuilder().setRow(ByteString.copyFrom(req.row))
+          .setLocateType(ProtobufUtil.toProtoRegionLocateType(req.locateType)).build(),
+        resp -> {
+          if (controller.failed()) {
+            IOException ex = controller.getFailed();
+            tryClearMasterStubCache(ex, stub);
+            onLocateComplete(req, null, ex);
+            return;
+          }
+          RegionLocations locs = new RegionLocations(resp.getMetaLocationsList().stream()
+            .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList()));
+          if (validateRegionLocations(locs, req)) {
+            onLocateComplete(req, locs, null);
+          }
+        });
+    });
+  }
+
+  @Override
+  CompletableFuture<List<HRegionLocation>>
+    getAllRegionLocations(boolean excludeOfflinedSplitParents) {
+    CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
+    addListener(getStub(), (stub, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      HBaseRpcController controller = conn.rpcControllerFactory.newController();
+      stub.getAllMetaRegionLocations(controller, GetAllMetaRegionLocationsRequest.newBuilder()
+        .setExcludeOfflinedSplitParents(excludeOfflinedSplitParents).build(), resp -> {
+          if (controller.failed()) {
+            IOException ex = controller.getFailed();
+            tryClearMasterStubCache(ex, stub);
+            future.completeExceptionally(ex);
+            return;
+          }
+          List<HRegionLocation> locs = resp.getMetaLocationsList().stream()
+            .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList());
+          future.complete(locs);
+        });
+    });
+    return future;
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
deleted file mode 100644
index 1c686ac..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ /dev/null
@@ -1,725 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
-import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
-import static org.apache.hadoop.hbase.HConstants.NINES;
-import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
-import static org.apache.hadoop.hbase.HConstants.ZEROES;
-import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
-import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
-import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
-import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
-import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
-import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.ObjectUtils;
-import org.apache.hadoop.hbase.CatalogFamilyFormat;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Scan.ReadType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.base.Objects;
-
-/**
- * The asynchronous locator for regions other than meta.
- */
-@InterfaceAudience.Private
-class AsyncNonMetaRegionLocator {
-
-  private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class);
-
-  static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
-    "hbase.client.meta.max.concurrent.locate.per.table";
-
-  private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
-
-  static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
-
-  private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
-
-  private final AsyncConnectionImpl conn;
-
-  private final int maxConcurrentLocateRequestPerTable;
-
-  private final int locatePrefetchLimit;
-
-  // The mode tells if HedgedRead, LoadBalance mode is supported.
-  // The default mode is CatalogReplicaMode.None.
-  private CatalogReplicaMode metaReplicaMode;
-  private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
-
-  private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
-
-  private static final class LocateRequest {
-
-    private final byte[] row;
-
-    private final RegionLocateType locateType;
-
-    public LocateRequest(byte[] row, RegionLocateType locateType) {
-      this.row = row;
-      this.locateType = locateType;
-    }
-
-    @Override
-    public int hashCode() {
-      return Bytes.hashCode(row) ^ locateType.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == null || obj.getClass() != LocateRequest.class) {
-        return false;
-      }
-      LocateRequest that = (LocateRequest) obj;
-      return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
-    }
-  }
-
-  private static final class TableCache {
-
-    private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
-      new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
-
-    private final Set<LocateRequest> pendingRequests = new HashSet<>();
-
-    private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
-      new LinkedHashMap<>();
-
-    public boolean hasQuota(int max) {
-      return pendingRequests.size() < max;
-    }
-
-    public boolean isPending(LocateRequest req) {
-      return pendingRequests.contains(req);
-    }
-
-    public void send(LocateRequest req) {
-      pendingRequests.add(req);
-    }
-
-    public Optional<LocateRequest> getCandidate() {
-      return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
-    }
-
-    public void clearCompletedRequests(RegionLocations locations) {
-      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
-        allRequests.entrySet().iterator(); iter.hasNext();) {
-        Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
-        if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
-          iter.remove();
-        }
-      }
-    }
-
-    private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
-        RegionLocations locations) {
-      if (future.isDone()) {
-        return true;
-      }
-      if (locations == null) {
-        return false;
-      }
-      HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
-      // we should at least have one location available, otherwise the request should fail and
-      // should not arrive here
-      assert loc != null;
-      boolean completed;
-      if (req.locateType.equals(RegionLocateType.BEFORE)) {
-        // for locating the row before current row, the common case is to find the previous region
-        // in reverse scan, so we check the endKey first. In general, the condition should be
-        // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
-        // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
-        // endKey.
-        byte[] endKey = loc.getRegion().getEndKey();
-        int c = Bytes.compareTo(endKey, req.row);
-        completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) &&
-          Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
-      } else {
-        completed = loc.getRegion().containsRow(req.row);
-      }
-      if (completed) {
-        future.complete(locations);
-        return true;
-      } else {
-        return false;
-      }
-    }
-  }
-
-  AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
-    this.conn = conn;
-    this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
-      MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
-    this.locatePrefetchLimit =
-      conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
-
-    // Get the region locator's meta replica mode.
-    this.metaReplicaMode = CatalogReplicaMode.fromString(conn.getConfiguration()
-      .get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
-
-    switch (this.metaReplicaMode) {
-      case LOAD_BALANCE:
-        String replicaSelectorClass = conn.getConfiguration().
-          get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
-          CatalogReplicaLoadBalanceSimpleSelector.class.getName());
-
-        this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory.createSelector(
-          replicaSelectorClass, META_TABLE_NAME, conn, () -> {
-            int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
-            try {
-              RegionLocations metaLocations = conn.registry.getMetaRegionLocations().get(
-                conn.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
-              numOfReplicas = metaLocations.size();
-            } catch (Exception e) {
-              LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
-            }
-            return numOfReplicas;
-          });
-        break;
-      case NONE:
-        // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
-        boolean useMetaReplicas = conn.getConfiguration().getBoolean(USE_META_REPLICAS,
-          DEFAULT_USE_META_REPLICAS);
-        if (useMetaReplicas) {
-          this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
-        }
-        break;
-      default:
-        // Doing nothing
-    }
-  }
-
-  private TableCache getTableCache(TableName tableName) {
-    return computeIfAbsent(cache, tableName, TableCache::new);
-  }
-
-  private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
-    HRegionLocation[] locArr1 = locs1.getRegionLocations();
-    HRegionLocation[] locArr2 = locs2.getRegionLocations();
-    if (locArr1.length != locArr2.length) {
-      return false;
-    }
-    for (int i = 0; i < locArr1.length; i++) {
-      // do not need to compare region info
-      HRegionLocation loc1 = locArr1[i];
-      HRegionLocation loc2 = locArr2[i];
-      if (loc1 == null) {
-        if (loc2 != null) {
-          return false;
-        }
-      } else {
-        if (loc2 == null) {
-          return false;
-        }
-        if (loc1.getSeqNum() != loc2.getSeqNum()) {
-          return false;
-        }
-        if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  // if we successfully add the locations to cache, return the locations, otherwise return the one
-  // which prevents us being added. The upper layer can use this value to complete pending requests.
-  private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) {
-    LOG.trace("Try adding {} to cache", locs);
-    byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
-    for (;;) {
-      RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
-      if (oldLocs == null) {
-        return locs;
-      }
-      // check whether the regions are the same, this usually happens when table is split/merged, or
-      // deleted and recreated again.
-      RegionInfo region = locs.getRegionLocation().getRegion();
-      RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
-      if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
-        RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
-        if (isEqual(mergedLocs, oldLocs)) {
-          // the merged one is the same with the old one, give up
-          LOG.trace("Will not add {} to cache because the old value {} " +
-            " is newer than us or has the same server name." +
-            " Maybe it is updated before we replace it", locs, oldLocs);
-          return oldLocs;
-        }
-        if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
-          return mergedLocs;
-        }
-      } else {
-        // the region is different, here we trust the one we fetched. This maybe wrong but finally
-        // the upper layer can detect this and trigger removal of the wrong locations
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," +
-            " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
-        }
-        if (tableCache.cache.replace(startKey, oldLocs, locs)) {
-          return locs;
-        }
-      }
-    }
-  }
-
-  private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
-      Throwable error) {
-    if (error != null) {
-      LOG.warn("Failed to locate region in '" + tableName + "', row='" +
-        Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
-    }
-    Optional<LocateRequest> toSend = Optional.empty();
-    TableCache tableCache = getTableCache(tableName);
-    if (locs != null) {
-      RegionLocations addedLocs = addToCache(tableCache, locs);
-      synchronized (tableCache) {
-        tableCache.pendingRequests.remove(req);
-        tableCache.clearCompletedRequests(addedLocs);
-        // Remove a complete locate request in a synchronized block, so the table cache must have
-        // quota to send a candidate request.
-        toSend = tableCache.getCandidate();
-        toSend.ifPresent(r -> tableCache.send(r));
-      }
-      toSend.ifPresent(r -> locateInMeta(tableName, r));
-    } else {
-      // we meet an error
-      assert error != null;
-      synchronized (tableCache) {
-        tableCache.pendingRequests.remove(req);
-        // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
-        // already retried several times
-        CompletableFuture<?> future = tableCache.allRequests.remove(req);
-        if (future != null) {
-          future.completeExceptionally(error);
-        }
-        tableCache.clearCompletedRequests(null);
-        // Remove a complete locate request in a synchronized block, so the table cache must have
-        // quota to send a candidate request.
-        toSend = tableCache.getCandidate();
-        toSend.ifPresent(r -> tableCache.send(r));
-      }
-      toSend.ifPresent(r -> locateInMeta(tableName, r));
-    }
-  }
-
-  // return whether we should stop the scan
-  private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
-    RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
-        Bytes.toStringBinary(req.row), req.locateType, locs);
-    }
-    // remove HRegionLocation with null location, i.e, getServerName returns null.
-    if (locs != null) {
-      locs = locs.removeElementsWithNullLocation();
-    }
-
-    // the default region location should always be presented when fetching from meta, otherwise
-    // let's fail the request.
-    if (locs == null || locs.getDefaultRegionLocation() == null) {
-      complete(tableName, req, null,
-        new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
-          tableName, Bytes.toStringBinary(req.row), req.locateType)));
-      return true;
-    }
-    HRegionLocation loc = locs.getDefaultRegionLocation();
-    RegionInfo info = loc.getRegion();
-    if (info == null) {
-      complete(tableName, req, null,
-        new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
-          tableName, Bytes.toStringBinary(req.row), req.locateType)));
-      return true;
-    }
-    if (info.isSplitParent()) {
-      return false;
-    }
-    complete(tableName, req, locs, null);
-    return true;
-  }
-
-  private void recordCacheHit() {
-    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit);
-  }
-
-  private void recordCacheMiss() {
-    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
-  }
-
-  private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
-      int replicaId) {
-    Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
-    if (entry == null) {
-      recordCacheMiss();
-      return null;
-    }
-    RegionLocations locs = entry.getValue();
-    HRegionLocation loc = locs.getRegionLocation(replicaId);
-    if (loc == null) {
-      recordCacheMiss();
-      return null;
-    }
-    byte[] endKey = loc.getRegion().getEndKey();
-    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
-          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
-      }
-      recordCacheHit();
-      return locs;
-    } else {
-      recordCacheMiss();
-      return null;
-    }
-  }
-
-  private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
-      byte[] row, int replicaId) {
-    boolean isEmptyStopRow = isEmptyStopRow(row);
-    Map.Entry<byte[], RegionLocations> entry =
-      isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
-    if (entry == null) {
-      recordCacheMiss();
-      return null;
-    }
-    RegionLocations locs = entry.getValue();
-    HRegionLocation loc = locs.getRegionLocation(replicaId);
-    if (loc == null) {
-      recordCacheMiss();
-      return null;
-    }
-    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
-      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
-          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
-      }
-      recordCacheHit();
-      return locs;
-    } else {
-      recordCacheMiss();
-      return null;
-    }
-  }
-
-  private void locateInMeta(TableName tableName, LocateRequest req) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) +
-        "', locateType=" + req.locateType + " in meta");
-    }
-    byte[] metaStartKey;
-    if (req.locateType.equals(RegionLocateType.BEFORE)) {
-      if (isEmptyStopRow(req.row)) {
-        byte[] binaryTableName = tableName.getName();
-        metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
-      } else {
-        metaStartKey = createRegionName(tableName, req.row, ZEROES, false);
-      }
-    } else {
-      metaStartKey = createRegionName(tableName, req.row, NINES, false);
-    }
-    byte[] metaStopKey =
-      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
-    Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
-      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
-      .setReadType(ReadType.PREAD);
-
-    switch (this.metaReplicaMode) {
-      case LOAD_BALANCE:
-        int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType);
-        if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
-          // If the selector gives a non-primary meta replica region, then go with it.
-          // Otherwise, just go to primary in non-hedgedRead mode.
-          scan.setConsistency(Consistency.TIMELINE);
-          scan.setReplicaId(metaReplicaId);
-        }
-        break;
-      case HEDGED_READ:
-        scan.setConsistency(Consistency.TIMELINE);
-        break;
-      default:
-        // do nothing
-    }
-
-    conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
-
-      private boolean completeNormally = false;
-
-      private boolean tableNotFound = true;
-
-      @Override
-      public void onError(Throwable error) {
-        complete(tableName, req, null, error);
-      }
-
-      @Override
-      public void onComplete() {
-        if (tableNotFound) {
-          complete(tableName, req, null, new TableNotFoundException(tableName));
-        } else if (!completeNormally) {
-          complete(tableName, req, null, new IOException(
-            "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
-        }
-      }
-
-      @Override
-      public void onNext(Result[] results, ScanController controller) {
-        if (results.length == 0) {
-          return;
-        }
-        tableNotFound = false;
-        int i = 0;
-        for (; i < results.length; i++) {
-          if (onScanNext(tableName, req, results[i])) {
-            completeNormally = true;
-            controller.terminate();
-            i++;
-            break;
-          }
-        }
-        // Add the remaining results into cache
-        if (i < results.length) {
-          TableCache tableCache = getTableCache(tableName);
-          for (; i < results.length; i++) {
-            RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]);
-            if (locs == null) {
-              continue;
-            }
-            HRegionLocation loc = locs.getDefaultRegionLocation();
-            if (loc == null) {
-              continue;
-            }
-            RegionInfo info = loc.getRegion();
-            if (info == null || info.isOffline() || info.isSplitParent()) {
-              continue;
-            }
-            RegionLocations addedLocs = addToCache(tableCache, locs);
-            synchronized (tableCache) {
-              tableCache.clearCompletedRequests(addedLocs);
-            }
-          }
-        }
-      }
-    });
-  }
-
-  private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
-      int replicaId, RegionLocateType locateType) {
-    return locateType.equals(RegionLocateType.BEFORE)
-      ? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
-      : locateRowInCache(tableCache, tableName, row, replicaId);
-  }
-
-  // locateToPrevious is true means we will use the start key of a region to locate the region
-  // placed before it. Used for reverse scan. See the comment of
-  // AsyncRegionLocator.getPreviousRegionLocation.
-  private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
-      byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
-    // AFTER should be convert to CURRENT before calling this method
-    assert !locateType.equals(RegionLocateType.AFTER);
-    TableCache tableCache = getTableCache(tableName);
-    if (!reload) {
-      RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
-      if (isGood(locs, replicaId)) {
-        return CompletableFuture.completedFuture(locs);
-      }
-    }
-    CompletableFuture<RegionLocations> future;
-    LocateRequest req;
-    boolean sendRequest = false;
-    synchronized (tableCache) {
-      // check again
-      if (!reload) {
-        RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
-        if (isGood(locs, replicaId)) {
-          return CompletableFuture.completedFuture(locs);
-        }
-      }
-      req = new LocateRequest(row, locateType);
-      future = tableCache.allRequests.get(req);
-      if (future == null) {
-        future = new CompletableFuture<>();
-        tableCache.allRequests.put(req, future);
-        if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) {
-          tableCache.send(req);
-          sendRequest = true;
-        }
-      }
-    }
-    if (sendRequest) {
-      locateInMeta(tableName, req);
-    }
-    return future;
-  }
-
-  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
-      int replicaId, RegionLocateType locateType, boolean reload) {
-    // as we know the exact row after us, so we can just create the new row, and use the same
-    // algorithm to locate it.
-    if (locateType.equals(RegionLocateType.AFTER)) {
-      row = createClosestRowAfter(row);
-      locateType = RegionLocateType.CURRENT;
-    }
-    return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
-  }
-
-  private void recordClearRegionCache() {
-    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
-  }
-
-  private void removeLocationFromCache(HRegionLocation loc) {
-    TableCache tableCache = cache.get(loc.getRegion().getTable());
-    if (tableCache == null) {
-      return;
-    }
-    byte[] startKey = loc.getRegion().getStartKey();
-    for (;;) {
-      RegionLocations oldLocs = tableCache.cache.get(startKey);
-      if (oldLocs == null) {
-        return;
-      }
-      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
-      if (!canUpdateOnError(loc, oldLoc)) {
-        return;
-      }
-      // Tell metaReplicaSelector that the location is stale. It will create a stale entry
-      // with timestamp internally. Next time the client looks up the same location,
-      // it will pick a different meta replica region.
-      if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
-        metaReplicaSelector.onError(loc);
-      }
-
-      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
-      if (newLocs == null) {
-        if (tableCache.cache.remove(startKey, oldLocs)) {
-          recordClearRegionCache();
-          return;
-        }
-      } else {
-        if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
-          recordClearRegionCache();
-          return;
-        }
-      }
-    }
-  }
-
-  private void addLocationToCache(HRegionLocation loc) {
-    addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
-  }
-
-  private HRegionLocation getCachedLocation(HRegionLocation loc) {
-    TableCache tableCache = cache.get(loc.getRegion().getTable());
-    if (tableCache == null) {
-      return null;
-    }
-    RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
-    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
-  }
-
-  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
-    Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics();
-    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
-      this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null));
-  }
-
-  void clearCache(TableName tableName) {
-    TableCache tableCache = cache.remove(tableName);
-    if (tableCache == null) {
-      return;
-    }
-    synchronized (tableCache) {
-      if (!tableCache.allRequests.isEmpty()) {
-        IOException error = new IOException("Cache cleared");
-        tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
-      }
-    }
-    conn.getConnectionMetrics()
-      .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
-  }
-
-  void clearCache() {
-    cache.clear();
-  }
-
-  void clearCache(ServerName serverName) {
-    for (TableCache tableCache : cache.values()) {
-      for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) {
-        byte[] regionName = entry.getKey();
-        RegionLocations locs = entry.getValue();
-        RegionLocations newLocs = locs.removeByServer(serverName);
-        if (locs == newLocs) {
-          continue;
-        }
-        if (newLocs.isEmpty()) {
-          tableCache.cache.remove(regionName, locs);
-        } else {
-          tableCache.cache.replace(regionName, locs, newLocs);
-        }
-      }
-    }
-  }
-
-  // only used for testing whether we have cached the location for a region.
-  RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
-    TableCache tableCache = cache.get(tableName);
-    if (tableCache == null) {
-      return null;
-    }
-    return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
-  }
-
-  // only used for testing whether we have cached the location for a table.
-  int getNumberOfCachedRegionLocations(TableName tableName) {
-    TableCache tableCache = cache.get(tableName);
-    if (tableCache == null) {
-      return 0;
-    }
-    return tableCache.cache.values().stream().mapToInt(RegionLocations::numNonNullElements).sum();
-  }
-}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaTableRegionLocator.java
new file mode 100644
index 0000000..c03e37c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaTableRegionLocator.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
+import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.CatalogFamilyFormat;
+import org.apache.hadoop.hbase.ClientMetaTableAccessor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class for locating region for table other than meta.
+ */
+@InterfaceAudience.Private
+class AsyncNonMetaTableRegionLocator extends AbstractAsyncTableRegionLocator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaTableRegionLocator.class);
+
+  private final int prefetchLimit;
+
+  // The mode tells if HedgedRead, LoadBalance mode is supported.
+  // The default mode is CatalogReplicaMode.None.
+  private CatalogReplicaMode metaReplicaMode;
+
+  private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
+
+  AsyncNonMetaTableRegionLocator(AsyncConnectionImpl conn, TableName tableName, int maxConcurrent,
+    int prefetchLimit) {
+    super(conn, tableName, maxConcurrent, Bytes.BYTES_COMPARATOR);
+    this.prefetchLimit = prefetchLimit;
+    // Get the region locator's meta replica mode.
+    this.metaReplicaMode = CatalogReplicaMode.fromString(conn.getConfiguration()
+      .get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
+
+    switch (this.metaReplicaMode) {
+      case LOAD_BALANCE:
+        String replicaSelectorClass = conn.getConfiguration().
+          get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
+          CatalogReplicaLoadBalanceSimpleSelector.class.getName());
+
+        this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory.createSelector(
+          replicaSelectorClass, META_TABLE_NAME, conn, () -> {
+            int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
+            try {
+              RegionLocations metaLocations = conn.getLocator()
+                .getRegionLocations(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
+                  RegionLocateType.CURRENT, true, conn.connConf.getReadRpcTimeoutNs())
+                .get();
+              numOfReplicas = metaLocations.size();
+            } catch (Exception e) {
+              LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
+            }
+            return numOfReplicas;
+          });
+        break;
+      case NONE:
+        // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
+        boolean useMetaReplicas = conn.getConfiguration().getBoolean(USE_META_REPLICAS,
+          DEFAULT_USE_META_REPLICAS);
+        if (useMetaReplicas) {
+          this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
+        }
+        break;
+      default:
+        // Doing nothing
+    }
+  }
+
+  // return whether we should stop the scan
+  private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
+    RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
+        Bytes.toStringBinary(req.row), req.locateType, locs);
+    }
+    if (!validateRegionLocations(locs, req)) {
+      return true;
+    }
+    if (locs.getDefaultRegionLocation().getRegion().isSplitParent()) {
+      return false;
+    }
+    onLocateComplete(req, locs, null);
+    return true;
+  }
+
+  @Override
+  protected void locate(LocateRequest req) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Try locate '{}', row='{}', locateType={} in meta", tableName,
+        Bytes.toStringBinary(req.row), req.locateType);
+    }
+    Scan scan =
+      CatalogFamilyFormat.createRegionLocateScan(tableName, req.row, req.locateType, prefetchLimit);
+    switch (this.metaReplicaMode) {
+      case LOAD_BALANCE:
+        int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType);
+        if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
+          // If the selector gives a non-primary meta replica region, then go with it.
+          // Otherwise, just go to primary in non-hedgedRead mode.
+          scan.setConsistency(Consistency.TIMELINE);
+          scan.setReplicaId(metaReplicaId);
+        }
+        break;
+      case HEDGED_READ:
+        scan.setConsistency(Consistency.TIMELINE);
+        break;
+      default:
+        // do nothing
+    }
+    conn.getTable(TableName.META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
+
+      private boolean completeNormally = false;
+
+      private boolean tableNotFound = true;
+
+      @Override
+      public void onError(Throwable error) {
+        onLocateComplete(req, null, error);
+      }
+
+      @Override
+      public void onComplete() {
+        if (tableNotFound) {
+          onLocateComplete(req, null, new TableNotFoundException(tableName));
+        } else if (!completeNormally) {
+          onLocateComplete(req, null, new IOException(
+            "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
+        }
+      }
+
+      @Override
+      public void onNext(Result[] results, ScanController controller) {
+        if (results.length == 0) {
+          return;
+        }
+        tableNotFound = false;
+        int i = 0;
+        for (; i < results.length; i++) {
+          if (onScanNext(tableName, req, results[i])) {
+            completeNormally = true;
+            controller.terminate();
+            i++;
+            break;
+          }
+        }
+        // Add the remaining results into cache
+        if (i < results.length) {
+          for (; i < results.length; i++) {
+            RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]);
+            if (locs == null) {
+              continue;
+            }
+            HRegionLocation loc = locs.getDefaultRegionLocation();
+            if (loc == null) {
+              continue;
+            }
+            RegionInfo info = loc.getRegion();
+            if (info == null || info.isOffline() || info.isSplitParent()) {
+              continue;
+            }
+            RegionLocations addedLocs = cache.add(locs);
+            synchronized (this) {
+              clearCompletedRequests(addedLocs);
+            }
+          }
+        }
+      }
+    });
+  }
+
+  @Override
+  CompletableFuture<List<HRegionLocation>>
+    getAllRegionLocations(boolean excludeOfflinedSplitParents) {
+    return ClientMetaTableAccessor.getTableHRegionLocations(
+      conn.getTable(TableName.META_TABLE_NAME), tableName, excludeOfflinedSplitParents);
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 716598a..7bfb7f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -17,20 +17,26 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
 import static org.apache.hadoop.hbase.trace.TraceUtil.REGION_NAMES_KEY;
 import static org.apache.hadoop.hbase.trace.TraceUtil.SERVER_NAME_KEY;
 import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan;
 import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.api.trace.StatusCode;
 import io.opentelemetry.context.Scope;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -43,8 +49,6 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 import org.apache.hbase.thirdparty.io.netty.util.Timeout;
@@ -55,21 +59,40 @@ import org.apache.hbase.thirdparty.io.netty.util.Timeout;
 @InterfaceAudience.Private
 class AsyncRegionLocator {
 
-  private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocator.class);
+  static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
+    "hbase.client.meta.max.concurrent.locate.per.table";
+
+  private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
+
+  static final String MAX_CONCURRENT_LOCATE_META_REQUEST =
+    "hbase.client.meta.max.concurrent.locate";
+
+  static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
+
+  private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
 
   private final HashedWheelTimer retryTimer;
 
   private final AsyncConnectionImpl conn;
 
-  private final AsyncMetaRegionLocator metaRegionLocator;
+  private final int maxConcurrentLocateRequestPerTable;
+
+  private final int maxConcurrentLocateMetaRequest;
 
-  private final AsyncNonMetaRegionLocator nonMetaRegionLocator;
+  private final int locatePrefetchLimit;
 
-  AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
+  private final ConcurrentMap<TableName, AbstractAsyncTableRegionLocator> table2Locator =
+    new ConcurrentHashMap<>();
+
+  public AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
     this.conn = conn;
-    this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry);
-    this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn);
     this.retryTimer = retryTimer;
+    this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
+      MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
+    this.maxConcurrentLocateMetaRequest = conn.getConfiguration()
+      .getInt(MAX_CONCURRENT_LOCATE_META_REQUEST, maxConcurrentLocateRequestPerTable);
+    this.locatePrefetchLimit =
+      conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
   }
 
   private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs,
@@ -127,13 +150,30 @@ class AsyncRegionLocator {
     return names;
   }
 
+  private AbstractAsyncTableRegionLocator getOrCreateTableRegionLocator(TableName tableName) {
+    return computeIfAbsent(table2Locator, tableName, () -> {
+      if (isMeta(tableName)) {
+        return new AsyncMetaTableRegionLocator(conn, tableName, maxConcurrentLocateMetaRequest);
+      } else {
+        return new AsyncNonMetaTableRegionLocator(conn, tableName,
+          maxConcurrentLocateRequestPerTable, locatePrefetchLimit);
+      }
+    });
+  }
+
+  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
+    int replicaId, RegionLocateType locateType, boolean reload) {
+    return tracedLocationFuture(() -> {
+      return getOrCreateTableRegionLocator(tableName).getRegionLocations(row, replicaId, locateType,
+        reload);
+    }, this::getRegionName, tableName, "getRegionLocations");
+  }
+
   CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
     RegionLocateType type, boolean reload, long timeoutNs) {
     return tracedLocationFuture(() -> {
-      CompletableFuture<RegionLocations> future = isMeta(tableName) ?
-        metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) :
-        nonMetaRegionLocator.getRegionLocations(tableName, row,
-          RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
+      CompletableFuture<RegionLocations> future =
+        getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
       return withTimeout(future, timeoutNs,
         () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
           "ms) waiting for region locations for " + tableName + ", row='" +
@@ -148,8 +188,7 @@ class AsyncRegionLocator {
       // Change it later if the meta table can have more than one regions.
       CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
       CompletableFuture<RegionLocations> locsFuture =
-        isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload) :
-          nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
+        getRegionLocations(tableName, row, replicaId, type, reload);
       addListener(locsFuture, (locs, error) -> {
         if (error != null) {
           future.completeExceptionally(error);
@@ -193,30 +232,61 @@ class AsyncRegionLocator {
     return getRegionLocation(tableName, row, type, false, timeoutNs);
   }
 
-  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
-    if (loc.getRegion().isMetaRegion()) {
-      metaRegionLocator.updateCachedLocationOnError(loc, exception);
-    } else {
-      nonMetaRegionLocator.updateCachedLocationOnError(loc, exception);
+  /**
+   * Get all region locations for a table.
+   * <p/>
+   * Notice that this method will not read from cache.
+   */
+  CompletableFuture<List<HRegionLocation>> getAllRegionLocations(TableName tableName,
+    boolean excludeOfflinedSplitParents) {
+    CompletableFuture<List<HRegionLocation>> future =
+      getOrCreateTableRegionLocator(tableName).getAllRegionLocations(excludeOfflinedSplitParents);
+    addListener(future, (locs, error) -> {
+      if (error != null) {
+        return;
+      }
+      // add locations to cache
+      AbstractAsyncTableRegionLocator locator = getOrCreateTableRegionLocator(tableName);
+      Map<RegionInfo, List<HRegionLocation>> map = new HashMap<>();
+      for (HRegionLocation loc : locs) {
+        // do not cache split parent
+        if (loc.getRegion() != null && !loc.getRegion().isSplitParent()) {
+          map.computeIfAbsent(RegionReplicaUtil.getRegionInfoForDefaultReplica(loc.getRegion()),
+            k -> new ArrayList<>()).add(loc);
+        }
+      }
+      for (List<HRegionLocation> l : map.values()) {
+        locator.addToCache(new RegionLocations(l));
+      }
+    });
+    return future;
+  }
+
+  private void removeLocationFromCache(HRegionLocation loc) {
+    AbstractAsyncTableRegionLocator locator = table2Locator.get(loc.getRegion().getTable());
+    if (locator == null) {
+      return;
     }
+    locator.removeLocationFromCache(loc);
   }
 
   void clearCache(TableName tableName) {
     TraceUtil.trace(() -> {
-      LOG.debug("Clear meta cache for {}", tableName);
-      if (tableName.equals(META_TABLE_NAME)) {
-        metaRegionLocator.clearCache();
-      } else {
-        nonMetaRegionLocator.clearCache(tableName);
-      }
+    AbstractAsyncTableRegionLocator locator = table2Locator.remove(tableName);
+    if (locator == null) {
+      return;
+    }
+    locator.clearPendingRequests();
+    conn.getConnectionMetrics()
+      .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(locator.getCacheSize()));
     }, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName));
   }
 
   void clearCache(ServerName serverName) {
     TraceUtil.trace(() -> {
-      LOG.debug("Clear meta cache for {}", serverName);
-      metaRegionLocator.clearCache(serverName);
-      nonMetaRegionLocator.clearCache(serverName);
+      for (AbstractAsyncTableRegionLocator locator : table2Locator.values()) {
+        locator.clearCache(serverName);
+      }
       conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
     }, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY,
       serverName.getServerName()));
@@ -224,30 +294,48 @@ class AsyncRegionLocator {
 
   void clearCache() {
     TraceUtil.trace(() -> {
-      metaRegionLocator.clearCache();
-      nonMetaRegionLocator.clearCache();
+      table2Locator.clear();
     }, "AsyncRegionLocator.clearCache");
   }
 
-  AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
-    return nonMetaRegionLocator;
+  private void addLocationToCache(HRegionLocation loc) {
+    getOrCreateTableRegionLocator(loc.getRegion().getTable())
+      .addToCache(createRegionLocations(loc));
+  }
+
+  private HRegionLocation getCachedLocation(HRegionLocation loc) {
+    AbstractAsyncTableRegionLocator locator = table2Locator.get(loc.getRegion().getTable());
+    if (locator == null) {
+      return null;
+    }
+    RegionLocations locs = locator.getInCache(loc.getRegion().getStartKey());
+    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
+  }
+
+  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
+      this::addLocationToCache, this::removeLocationFromCache, conn.getConnectionMetrics());
   }
 
   // only used for testing whether we have cached the location for a region.
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+    allowedOnPath = ".*/src/test/.*")
   RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
-    if (TableName.isMetaTableName(tableName)) {
-      return metaRegionLocator.getRegionLocationInCache();
-    } else {
-      return nonMetaRegionLocator.getRegionLocationInCache(tableName, row);
+    AbstractAsyncTableRegionLocator locator = table2Locator.get(tableName);
+    if (locator == null) {
+      return null;
     }
+    return locator.locateInCache(row);
   }
 
   // only used for testing whether we have cached the location for a table.
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+    allowedOnPath = ".*/src/test/.*")
   int getNumberOfCachedRegionLocations(TableName tableName) {
-    if (TableName.isMetaTableName(tableName)) {
-      return metaRegionLocator.getNumberOfCachedRegionLocations();
-    } else {
-      return nonMetaRegionLocator.getNumberOfCachedRegionLocations(tableName);
+    AbstractAsyncTableRegionLocator locator = table2Locator.get(tableName);
+    if (locator == null) {
+      return 0;
     }
+    return locator.getNumberOfCachedRegionLocations();
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
index 4c6cd5a..c34cc5a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
 import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
 import java.util.Arrays;
+import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.commons.lang3.ObjectUtils;
@@ -30,6 +31,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Objects;
+
 /**
  * Helper class for asynchronous region locator.
  */
@@ -55,9 +58,9 @@ final class AsyncRegionLocatorHelper {
   }
 
   static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
-      Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
-      Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
-      MetricsConnection metrics) {
+    Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
+    Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
+    Optional<MetricsConnection> metrics) {
     HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc,
@@ -85,9 +88,7 @@ final class AsyncRegionLocatorHelper {
       addToCache.accept(newLoc);
     } else {
       LOG.debug("Try removing {} from cache", loc);
-      if (metrics != null) {
-        metrics.incrCacheDroppingExceptions(exception);
-      }
+      metrics.ifPresent(m -> m.incrCacheDroppingExceptions(exception));
       removeFromCache.accept(loc);
     }
   }
@@ -146,4 +147,33 @@ final class AsyncRegionLocatorHelper {
     HRegionLocation loc = locs.getRegionLocation(replicaId);
     return loc != null && loc.getServerName() != null;
   }
+
+  static boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
+    HRegionLocation[] locArr1 = locs1.getRegionLocations();
+    HRegionLocation[] locArr2 = locs2.getRegionLocations();
+    if (locArr1.length != locArr2.length) {
+      return false;
+    }
+    for (int i = 0; i < locArr1.length; i++) {
+      // do not need to compare region info
+      HRegionLocation loc1 = locArr1[i];
+      HRegionLocation loc2 = locArr2[i];
+      if (loc1 == null) {
+        if (loc2 != null) {
+          return false;
+        }
+      } else {
+        if (loc2 == null) {
+          return false;
+        }
+        if (loc1.getSeqNum() != loc2.getSeqNum()) {
+          return false;
+        }
+        if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
index 96e3ec4..5af3283 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
@@ -104,8 +104,8 @@ public interface AsyncTableRegionLocator {
   /**
    * Retrieves all of the regions associated with this table.
    * <p/>
-   * Usually we will go to meta table directly in this method so there is no {@code reload}
-   * parameter.
+   * We will go to meta table directly in this method so there is no {@code reload} parameter. So
+   * please use with caution as this could generate great load to a cluster.
    * <p/>
    * Notice that the location for region replicas other than the default replica are also returned.
    * @return a {@link List} of all regions associated with this table.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index 35bf0e0..2adf4df 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -22,7 +22,6 @@ import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import org.apache.hadoop.hbase.ClientMetaTableAccessor;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -56,14 +55,8 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
 
   @Override
   public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
-    return tracedFuture(() -> {
-      if (TableName.isMetaTableName(tableName)) {
-        return conn.registry.getMetaRegionLocations()
-          .thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
-      }
-      return ClientMetaTableAccessor
-        .getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), tableName);
-    }, getClass().getSimpleName() + ".getAllRegionLocations");
+    return tracedFuture(() -> conn.getLocator().getAllRegionLocations(tableName, true),
+      getClass().getSimpleName() + ".getAllRegionLocations");
   }
 
   @Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
index cd22d78..569d728 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
-import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -32,11 +31,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 interface ConnectionRegistry extends Closeable {
 
   /**
-   * Get the location of meta region(s).
-   */
-  CompletableFuture<RegionLocations> getMetaRegionLocations();
-
-  /**
    * Should only be called once.
    * <p>
    * The upper layer should store this value somewhere as it will not be change any more.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 70312aa..d74e4aa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -172,24 +172,6 @@ public final class ConnectionUtils {
     return Arrays.copyOf(row, row.length + 1);
   }
 
-  /**
-   * Create a row before the specified row and very close to the specified row.
-   */
-  static byte[] createCloseRowBefore(byte[] row) {
-    if (row.length == 0) {
-      return MAX_BYTE_ARRAY;
-    }
-    if (row[row.length - 1] == 0) {
-      return Arrays.copyOf(row, row.length - 1);
-    } else {
-      byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length];
-      System.arraycopy(row, 0, nextRow, 0, row.length - 1);
-      nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1);
-      System.arraycopy(MAX_BYTE_ARRAY, 0, nextRow, row.length, MAX_BYTE_ARRAY.length);
-      return nextRow;
-    }
-  }
-
   static boolean isEmptyStartRow(byte[] row) {
     return Bytes.equals(row, EMPTY_START_ROW);
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
index 9223935..6caa8d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -266,7 +266,7 @@ public class MasterRegistry implements ConnectionRegistry {
     return new RegionLocations(regionLocations);
   }
 
-  @Override
+  // keep the method here just for testing compatibility
   public CompletableFuture<RegionLocations> getMetaRegionLocations() {
     return tracedFuture(
       () -> this
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 1cbcf10..e691bb7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -745,8 +745,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
     if (TableName.isMetaTableName(tableName)) {
-      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
-        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
+      return getTableHRegionLocations(tableName).thenApply(
+        locs -> locs.stream().allMatch(loc -> loc != null && loc.getServerName() != null));
     }
     CompletableFuture<Boolean> future = new CompletableFuture<>();
     addListener(isTableEnabled(tableName), (enabled, error) -> {
@@ -762,7 +762,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         future.complete(false);
       } else {
         addListener(
-          ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
+          ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true),
           (locations, error1) -> {
             if (error1 != null) {
               future.completeExceptionally(error1);
@@ -882,15 +882,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
-    if (tableName.equals(META_TABLE_NAME)) {
-      return connection.registry.getMetaRegionLocations()
-        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
-          .collect(Collectors.toList()));
-    } else {
-      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
-        .thenApply(
-          locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
-    }
+    return getTableHRegionLocations(tableName).thenApply(
+      locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
   }
   @Override
   public CompletableFuture<Void> flush(TableName tableName) {
@@ -1129,23 +1122,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
    * List all region locations for the specific table.
    */
   private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
-    if (TableName.META_TABLE_NAME.equals(tableName)) {
-      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
-      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-        } else if (metaRegions == null || metaRegions.isEmpty() ||
-          metaRegions.getDefaultRegionLocation() == null) {
-          future.completeExceptionally(new IOException("meta region does not found"));
-        } else {
-          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
-        }
-      });
-      return future;
-    } else {
-      // For non-meta table, we fetch all locations by scanning hbase:meta table
-      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
-    }
+    return connection.getRegionLocator(tableName).getAllRegionLocations();
   }
 
   /**
@@ -2394,9 +2371,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
       if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
         // old format encodedName, should be meta region
-        future = connection.registry.getMetaRegionLocations()
-          .thenApply(locs -> Stream.of(locs.getRegionLocations())
-            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
+        future = getTableHRegionLocations(META_TABLE_NAME).thenApply(locs -> locs.stream()
+          .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
       } else {
         future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
           regionNameOrEncodedRegionName);
@@ -2413,10 +2389,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       }
 
       if (regionInfo.isMetaRegion()) {
-        future = connection.registry.getMetaRegionLocations()
-          .thenApply(locs -> Stream.of(locs.getRegionLocations())
-            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
-            .findFirst());
+        future = getTableHRegionLocations(META_TABLE_NAME).thenApply(locs -> locs.stream()
+          .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
+          .findFirst());
       } else {
         future =
           ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java
index 950123c..2380fd8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocateType.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -27,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * <li>{@link #AFTER} locate the region which contains the row after the given row.</li>
  * </ul>
  */
-@InterfaceAudience.Private
-enum RegionLocateType {
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+public enum RegionLocateType {
   BEFORE, CURRENT, AFTER
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java
new file mode 100644
index 0000000..0745796
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableRegionLocationCache.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isEqual;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The location cache for regions of a table.
+ */
+@InterfaceAudience.Private
+class TableRegionLocationCache {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TableRegionLocationCache.class);
+
+  private final Optional<MetricsConnection> metrics;
+
+  private final ConcurrentNavigableMap<byte[], RegionLocations> cache;
+
+  TableRegionLocationCache(Comparator<byte[]> comparator, Optional<MetricsConnection> metrics) {
+    this.metrics = metrics;
+    this.cache = new ConcurrentSkipListMap<>(comparator);
+  }
+
+  private void recordCacheHit() {
+    metrics.ifPresent(MetricsConnection::incrMetaCacheHit);
+  }
+
+  private void recordCacheMiss() {
+    metrics.ifPresent(MetricsConnection::incrMetaCacheMiss);
+  }
+
+  private void recordClearRegionCache() {
+    metrics.ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
+  }
+
+  private RegionLocations locateRow(TableName tableName, byte[] row, int replicaId) {
+    Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
+    if (entry == null) {
+      recordCacheMiss();
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      recordCacheMiss();
+      return null;
+    }
+    byte[] endKey = loc.getRegion().getEndKey();
+    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
+      }
+      recordCacheHit();
+      return locs;
+    } else {
+      recordCacheMiss();
+      return null;
+    }
+  }
+
+  private RegionLocations locateRowBefore(TableName tableName, byte[] row, int replicaId) {
+    boolean isEmptyStopRow = isEmptyStopRow(row);
+    Map.Entry<byte[], RegionLocations> entry =
+      isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
+    if (entry == null) {
+      recordCacheMiss();
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      recordCacheMiss();
+      return null;
+    }
+    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
+      }
+      recordCacheHit();
+      return locs;
+    } else {
+      recordCacheMiss();
+      return null;
+    }
+  }
+
+  RegionLocations locate(TableName tableName, byte[] row, int replicaId,
+    RegionLocateType locateType) {
+    return locateType.equals(RegionLocateType.BEFORE) ? locateRowBefore(tableName, row, replicaId) :
+      locateRow(tableName, row, replicaId);
+  }
+
+  // if we successfully add the locations to cache, return the locations, otherwise return the one
+  // which prevents us being added. The upper layer can use this value to complete pending requests.
+  RegionLocations add(RegionLocations locs) {
+    LOG.trace("Try adding {} to cache", locs);
+    byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
+    for (;;) {
+      RegionLocations oldLocs = cache.putIfAbsent(startKey, locs);
+      if (oldLocs == null) {
+        return locs;
+      }
+      // check whether the regions are the same, this usually happens when table is split/merged, or
+      // deleted and recreated again.
+      RegionInfo region = locs.getRegionLocation().getRegion();
+      RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
+      if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
+        RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
+        if (isEqual(mergedLocs, oldLocs)) {
+          // the merged one is the same with the old one, give up
+          LOG.trace("Will not add {} to cache because the old value {} " +
+            " is newer than us or has the same server name." +
+            " Maybe it is updated before we replace it", locs, oldLocs);
+          return oldLocs;
+        }
+        if (cache.replace(startKey, oldLocs, mergedLocs)) {
+          return mergedLocs;
+        }
+      } else {
+        // the region is different, here we trust the one we fetched. This maybe wrong but finally
+        // the upper layer can detect this and trigger removal of the wrong locations
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," +
+            " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
+        }
+        if (cache.replace(startKey, oldLocs, locs)) {
+          return locs;
+        }
+      }
+    }
+  }
+
+  // notice that this is not a constant time operation, do not call it on critical path.
+  int size() {
+    return cache.size();
+  }
+
+  void clearCache(ServerName serverName) {
+    for (Map.Entry<byte[], RegionLocations> entry : cache.entrySet()) {
+      byte[] regionName = entry.getKey();
+      RegionLocations locs = entry.getValue();
+      RegionLocations newLocs = locs.removeByServer(serverName);
+      if (locs == newLocs) {
+        continue;
+      }
+      if (newLocs.isEmpty()) {
+        cache.remove(regionName, locs);
+      } else {
+        cache.replace(regionName, locs, newLocs);
+      }
+    }
+  }
+
+  void removeLocationFromCache(HRegionLocation loc) {
+    byte[] startKey = loc.getRegion().getStartKey();
+    for (;;) {
+      RegionLocations oldLocs = cache.get(startKey);
+      if (oldLocs == null) {
+        return;
+      }
+      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
+      if (!canUpdateOnError(loc, oldLoc)) {
+        return;
+      }
+      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
+      if (newLocs == null) {
+        if (cache.remove(startKey, oldLocs)) {
+          recordClearRegionCache();
+          return;
+        }
+      } else {
+        if (cache.replace(startKey, oldLocs, newLocs)) {
+          recordClearRegionCache();
+          return;
+        }
+      }
+    }
+  }
+
+  RegionLocations get(byte[] key) {
+    return cache.get(key);
+  }
+
+  // only used for testing whether we have cached the location for a table.
+  @RestrictedApi(explanation = "Should only be called in AbstractAsyncTableRegionLocator",
+    link = "", allowedOnPath = ".*/AbstractAsyncTableRegionLocator.java")
+  int getNumberOfCachedRegionLocations() {
+    return cache.values().stream().mapToInt(RegionLocations::numNonNullElements).sum();
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
index 3918dbc..bf93776 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
@@ -115,7 +115,7 @@ class ZKConnectionRegistry implements ConnectionRegistry {
   }
 
   private static void tryComplete(MutableInt remaining, HRegionLocation[] locs,
-      CompletableFuture<RegionLocations> future) {
+    CompletableFuture<RegionLocations> future) {
     remaining.decrement();
     if (remaining.intValue() > 0) {
       return;
@@ -123,8 +123,8 @@ class ZKConnectionRegistry implements ConnectionRegistry {
     future.complete(new RegionLocations(locs));
   }
 
-  private Pair<RegionState.State, ServerName> getStateAndServerName(
-      ZooKeeperProtos.MetaRegionServer proto) {
+  private Pair<RegionState.State, ServerName>
+    getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) {
     RegionState.State state;
     if (proto.hasState()) {
       state = RegionState.State.convert(proto.getState());
@@ -137,7 +137,7 @@ class ZKConnectionRegistry implements ConnectionRegistry {
   }
 
   private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
-      List<String> metaReplicaZNodes) {
+    List<String> metaReplicaZNodes) {
     if (metaReplicaZNodes.isEmpty()) {
       future.completeExceptionally(new IOException("No meta znode available"));
     }
@@ -193,7 +193,7 @@ class ZKConnectionRegistry implements ConnectionRegistry {
     }
   }
 
-  @Override
+  // keep the method here just for testing compatibility
   public CompletableFuture<RegionLocations> getMetaRegionLocations() {
     return tracedFuture(() -> {
       CompletableFuture<RegionLocations> future = new CompletableFuture<>();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 329894c..b6918ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.RegionLoadStats;
+import org.apache.hadoop.hbase.client.RegionLocateType;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.RegionStatesCount;
 import org.apache.hadoop.hbase.client.Result;
@@ -3826,6 +3827,7 @@ public final class ProtobufUtil {
       .build();
   }
 
+
   public static HBaseProtos.LogRequest toBalancerRejectionRequest(int limit) {
     MasterProtos.BalancerRejectionsRequest balancerRejectionsRequest =
       MasterProtos.BalancerRejectionsRequest.newBuilder().setLimit(limit).build();
@@ -3835,4 +3837,29 @@ public final class ProtobufUtil {
       .build();
   }
 
+  public static MasterProtos.RegionLocateType toProtoRegionLocateType(RegionLocateType pojo) {
+    switch (pojo) {
+      case BEFORE:
+        return MasterProtos.RegionLocateType.REGION_LOCATE_TYPE_BEFORE;
+      case CURRENT:
+        return MasterProtos.RegionLocateType.REGION_LOCATE_TYPE_CURRENT;
+      case AFTER:
+        return MasterProtos.RegionLocateType.REGION_LOCATE_TYPE_AFTER;
+      default:
+        throw new IllegalArgumentException("Unknown RegionLocateType: " + pojo);
+    }
+  }
+
+  public static RegionLocateType toRegionLocateType(MasterProtos.RegionLocateType proto) {
+    switch (proto) {
+      case REGION_LOCATE_TYPE_BEFORE:
+        return RegionLocateType.BEFORE;
+      case REGION_LOCATE_TYPE_CURRENT:
+        return RegionLocateType.CURRENT;
+      case REGION_LOCATE_TYPE_AFTER:
+        return RegionLocateType.AFTER;
+      default:
+        throw new IllegalArgumentException("Unknown proto RegionLocateType: " + proto);
+    }
+  }
 }
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
index 4bd66877..64ded7f 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -33,11 +32,6 @@ class DoNothingConnectionRegistry implements ConnectionRegistry {
   }
 
   @Override
-  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
-    return CompletableFuture.completedFuture(null);
-  }
-
-  @Override
   public CompletableFuture<String> getClusterId() {
     return CompletableFuture.completedFuture(null);
   }
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java
deleted file mode 100644
index b306500..0000000
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.FutureUtils;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ ClientTests.class, SmallTests.class })
-public class TestAsyncMetaRegionLocatorFailFast {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestAsyncMetaRegionLocatorFailFast.class);
-
-  private static Configuration CONF = HBaseConfiguration.create();
-
-  private static AsyncMetaRegionLocator LOCATOR;
-
-  private static final class FaultyConnectionRegistry extends DoNothingConnectionRegistry {
-
-    public FaultyConnectionRegistry(Configuration conf) {
-      super(conf);
-    }
-
-    @Override
-    public CompletableFuture<RegionLocations> getMetaRegionLocations() {
-      return FutureUtils.failedFuture(new DoNotRetryRegionException("inject error"));
-    }
-  }
-
-  @BeforeClass
-  public static void setUp() {
-    LOCATOR = new AsyncMetaRegionLocator(new FaultyConnectionRegistry(CONF));
-  }
-
-  @Test(expected = DoNotRetryIOException.class)
-  public void test() throws IOException {
-    FutureUtils.get(LOCATOR.getRegionLocations(RegionInfo.DEFAULT_REPLICA_ID, false));
-  }
-}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java
index 15b00f6..180d294 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java
@@ -23,6 +23,7 @@ import io.opentelemetry.api.trace.StatusCode;
 import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
 import io.opentelemetry.sdk.trace.data.SpanData;
 import java.io.IOException;
+import java.net.SocketAddress;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -35,6 +36,9 @@ import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -42,12 +46,22 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionResponse;
 
 @Category({ ClientTests.class, MediumTests.class })
 public class TestAsyncRegionLocatorTracing {
@@ -60,14 +74,63 @@ public class TestAsyncRegionLocatorTracing {
 
   private AsyncConnectionImpl conn;
 
-  private RegionLocations locs;
+  private static RegionLocations locs;
 
   @Rule
   public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
 
+  public static final class RpcClientForTest implements RpcClient {
+
+    public RpcClientForTest(Configuration configuration, String clusterId,
+      SocketAddress localAddress, MetricsConnection metrics) {
+    }
+
+    @Override
+    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) {
+      throw new UnsupportedOperationException("should not be called");
+    }
+
+    @Override
+    public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) {
+      return new RpcChannel() {
+
+        @Override
+        public void callMethod(MethodDescriptor method, RpcController controller, Message request,
+          Message responsePrototype, RpcCallback<Message> done) {
+          LocateMetaRegionResponse.Builder builder = LocateMetaRegionResponse.newBuilder();
+          for (HRegionLocation loc : locs) {
+            if (loc != null) {
+              builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
+            }
+          }
+          done.run(builder.build());
+        }
+      };
+    }
+
+    @Override
+    public void cancelConnections(ServerName sn) {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public boolean hasCellBlockSupport() {
+      return false;
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() {
+    CONF.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientForTest.class,
+      RpcClient.class);
+  }
+
   @Before
   public void setUp() throws IOException {
-    RegionInfo metaRegionInfo = RegionInfoBuilder.newBuilder(TableName.META_TABLE_NAME).build();
+    RegionInfo metaRegionInfo = RegionInfoBuilder.FIRST_META_REGIONINFO;
     locs = new RegionLocations(
       new HRegionLocation(metaRegionInfo,
         ServerName.valueOf("127.0.0.1", 12345, EnvironmentEdgeManager.currentTime())),
@@ -78,8 +141,9 @@ public class TestAsyncRegionLocatorTracing {
     conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF) {
 
       @Override
-      public CompletableFuture<RegionLocations> getMetaRegionLocations() {
-        return CompletableFuture.completedFuture(locs);
+      public CompletableFuture<ServerName> getActiveMaster() {
+        return CompletableFuture.completedFuture(
+          ServerName.valueOf("127.0.0.1", 12345, EnvironmentEdgeManager.currentTime()));
       }
     }, "test", null, UserProvider.instantiate(CONF).getCurrent());
   }
@@ -104,8 +168,7 @@ public class TestAsyncRegionLocatorTracing {
 
   @Test
   public void testClearCacheServerName() {
-    ServerName sn = ServerName.valueOf("127.0.0.1", 12345,
-      EnvironmentEdgeManager.currentTime());
+    ServerName sn = ServerName.valueOf("127.0.0.1", 12345, EnvironmentEdgeManager.currentTime());
     conn.getLocator().clearCache(sn);
     SpanData span = waitSpan("AsyncRegionLocator.clearCache");
     assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/MetaCellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/MetaCellComparator.java
index 4c18cfe..1620d9f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/MetaCellComparator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/MetaCellComparator.java
@@ -71,7 +71,7 @@ public class MetaCellComparator extends CellComparatorImpl {
     return ignoreSequenceid ? diff : Longs.compare(b.getSequenceId(), a.getSequenceId());
   }
 
-  private static int compareRows(byte[] left, int loffset, int llength, byte[] right, int roffset,
+  public static int compareRows(byte[] left, int loffset, int llength, byte[] right, int roffset,
       int rlength) {
     int leftDelimiter = Bytes.searchDelimiterIndex(left, loffset, llength, HConstants.DELIMITER);
     int rightDelimiter = Bytes.searchDelimiterIndex(right, roffset, rlength, HConstants.DELIMITER);
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
index 3d265dd..5302d51 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
@@ -1331,6 +1331,29 @@ message GetMetaRegionLocationsResponse {
   repeated RegionLocation meta_locations = 1;
 }
 
+enum RegionLocateType {
+  REGION_LOCATE_TYPE_BEFORE =1;
+  REGION_LOCATE_TYPE_CURRENT = 2;
+  REGION_LOCATE_TYPE_AFTER = 3;
+}
+
+message LocateMetaRegionRequest {
+  required bytes row = 1;
+  required RegionLocateType locateType = 2;
+}
+
+message LocateMetaRegionResponse {
+  repeated RegionLocation meta_locations = 1;
+}
+
+message GetAllMetaRegionLocationsRequest {
+  required bool exclude_offlined_split_parents = 1;
+}
+
+message GetAllMetaRegionLocationsResponse {
+  repeated RegionLocation meta_locations = 1;
+}
+
 /**
  * Implements all the RPCs needed by clients to look up cluster meta information needed for
  * connection establishment.
@@ -1356,4 +1379,16 @@ service ClientMetaService {
    * Get current meta replicas' region locations.
    */
   rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
+
+  /**
+   * Get meta region locations for a given row
+   */
+  rpc LocateMetaRegion(LocateMetaRegionRequest)
+    returns(LocateMetaRegionResponse);
+
+  /**
+   * Get all meta regions locations
+   */
+  rpc GetAllMetaRegionLocations(GetAllMetaRegionLocationsRequest)
+    returns(GetAllMetaRegionLocationsResponse);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index ac35caa..8ca8972 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaMutationAnnotation;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ServerName;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocateType;
 import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.master.RegionPlan;
@@ -1763,7 +1765,7 @@ public interface MasterObserver {
       throws IOException {
   }
 
-  /*
+  /**
    * Called before checking if user has permissions.
    * @param ctx the coprocessor instance's environment
    * @param userName the user name
@@ -1782,4 +1784,44 @@ public interface MasterObserver {
   default void postHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
       String userName, List<Permission> permissions) throws IOException {
   }
+
+  /**
+   * Called before locating meta region.
+   * @param ctx ctx the coprocessor instance's environment
+   * @param row the row key to locate
+   * @param locateType the direction of the locate operation
+   */
+  default void preLocateMetaRegion(ObserverContext<MasterCoprocessorEnvironment> ctx, byte[] row,
+    RegionLocateType locateType) throws IOException {
+  }
+
+  /**
+   * Called after locating meta region.
+   * @param ctx ctx the coprocessor instance's environment
+   * @param row the row key to locate
+   * @param locateType the direction of the locate operation
+   * @param locs the locations of the given meta region, including meta replicas if any.
+   */
+  default void postLocateMetaRegion(ObserverContext<MasterCoprocessorEnvironment> ctx, byte[] row,
+    RegionLocateType locateType, List<HRegionLocation> locs) throws IOException {
+  }
+
+  /**
+   * Called before getting all locations for meta regions.
+   * @param ctx ctx the coprocessor instance's environment
+   * @param excludeOfflinedSplitParents don't return split parents
+   */
+  default void preGetAllMetaRegionLocations(ObserverContext<MasterCoprocessorEnvironment> ctx,
+    boolean excludeOfflinedSplitParents) {
+  }
+
+  /**
+   * Called after getting all locations for meta regions.
+   * @param ctx ctx the coprocessor instance's environment
+   * @param excludeOfflinedSplitParents don't return split parents
+   * @param locs the locations of all meta regions, including meta replicas if any.
+   */
+  default void postGetAllMetaRegionLocations(ObserverContext<MasterCoprocessorEnvironment> ctx,
+    boolean excludeOfflinedSplitParents, List<HRegionLocation> locs) {
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 961c929..171966f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED
 import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
 import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
 import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.reflect.Constructor;
@@ -58,7 +59,6 @@ import org.apache.hadoop.hbase.CatalogFamilyFormat;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellBuilderFactory;
 import org.apache.hadoop.hbase.CellBuilderType;
-import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.ClusterMetrics.Option;
@@ -67,12 +67,14 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.InvalidFamilyOperationException;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.PleaseHoldException;
 import org.apache.hadoop.hbase.PleaseRestartMasterException;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.RegionMetrics;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
 import org.apache.hadoop.hbase.ServerMetrics;
@@ -88,12 +90,15 @@ import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocateType;
 import org.apache.hadoop.hbase.client.RegionStatesCount;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
 import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.favored.FavoredNodesManager;
@@ -221,7 +226,6 @@ import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
 import org.apache.hadoop.hbase.zookeeper.SnapshotCleanupTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -232,6 +236,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
@@ -242,6 +247,8 @@ import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
 import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
 import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
 import org.apache.hbase.thirdparty.org.eclipse.jetty.webapp.WebAppContext;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
@@ -773,6 +780,27 @@ public class HMaster extends HRegionServer implements MasterServices {
     return new AssignmentManager(master, masterRegion);
   }
 
+  /**
+   * Load the meta region state from the meta region server ZNode.
+   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
+   * @param replicaId the ID of the replica
+   * @return regionstate
+   * @throws KeeperException if a ZooKeeper operation fails
+   */
+  private static RegionState getMetaRegionState(ZKWatcher zkw, int replicaId)
+    throws KeeperException {
+    RegionState regionState = null;
+    try {
+      byte[] data = ZKUtil.getData(zkw, zkw.getZNodePaths().getZNodeForReplica(replicaId));
+      regionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId);
+    } catch (DeserializationException e) {
+      throw ZKUtil.convert(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    return regionState;
+  }
+
   private void tryMigrateRootTableFromZooKeeper() throws IOException, KeeperException {
     // try migrate data from zookeeper
     try (RegionScanner scanner =
@@ -794,7 +822,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     StringBuilder info = new StringBuilder("Migrating meta location:");
     for (String metaReplicaNode : metaReplicaNodes) {
       int replicaId = zooKeeper.getZNodePaths().getMetaReplicaIdFromZNode(metaReplicaNode);
-      RegionState state = MetaTableLocator.getMetaRegionState(zooKeeper, replicaId);
+      RegionState state = getMetaRegionState(zooKeeper, replicaId);
       info.append(" ").append(state);
       put.setTimestamp(state.getStamp());
       MetaTableAccessor.addRegionInfo(put, state.getRegion());
@@ -3922,4 +3950,85 @@ public class HMaster extends HRegionServer implements MasterServices {
   public MetaLocationSyncer getMetaLocationSyncer() {
     return metaLocationSyncer;
   }
+
+  public RegionLocations locateMeta(byte[] row, RegionLocateType locateType) throws IOException {
+    if (locateType == RegionLocateType.AFTER) {
+      // as we know the exact row after us, so we can just create the new row, and use the same
+      // algorithm to locate it.
+      row = Arrays.copyOf(row, row.length + 1);
+      locateType = RegionLocateType.CURRENT;
+    }
+    Scan scan =
+      CatalogFamilyFormat.createRegionLocateScan(TableName.META_TABLE_NAME, row, locateType, 1);
+    try (RegionScanner scanner = masterRegion.getScanner(scan)) {
+      boolean moreRows;
+      List<Cell> cells = new ArrayList<>();
+      do {
+        moreRows = scanner.next(cells);
+        if (cells.isEmpty()) {
+          continue;
+        }
+        Result result = Result.create(cells);
+        cells.clear();
+        RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
+        if (locs == null || locs.getDefaultRegionLocation() == null) {
+          LOG.warn("No location found when locating meta region with row='{}', locateType={}",
+            Bytes.toStringBinary(row), locateType);
+          return null;
+        }
+        HRegionLocation loc = locs.getDefaultRegionLocation();
+        RegionInfo info = loc.getRegion();
+        if (info == null) {
+          LOG.warn("HRegionInfo is null when locating meta region with row='{}', locateType={}",
+            Bytes.toStringBinary(row), locateType);
+          return null;
+        }
+        if (info.isSplitParent()) {
+          continue;
+        }
+        return locs;
+      } while (moreRows);
+      LOG.warn("No location available when locating meta region with row='{}', locateType={}",
+        Bytes.toStringBinary(row), locateType);
+      return null;
+    }
+  }
+
+  public List<RegionLocations> getAllMetaRegionLocations(boolean excludeOfflinedSplitParents)
+    throws IOException {
+    Scan scan = new Scan().addFamily(HConstants.CATALOG_FAMILY);
+    List<RegionLocations> list = new ArrayList<>();
+    try (RegionScanner scanner = masterRegion.getScanner(scan)) {
+      boolean moreRows;
+      List<Cell> cells = new ArrayList<>();
+      do {
+        moreRows = scanner.next(cells);
+        if (cells.isEmpty()) {
+          continue;
+        }
+        Result result = Result.create(cells);
+        cells.clear();
+        RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
+        if (locs == null) {
+          LOG.warn("No locations in {}", result);
+          continue;
+        }
+        HRegionLocation loc = locs.getRegionLocation();
+        if (loc == null) {
+          LOG.warn("No non null location in {}", result);
+          continue;
+        }
+        RegionInfo info = loc.getRegion();
+        if (info == null) {
+          LOG.warn("No serialized RegionInfo in {}", result);
+          continue;
+        }
+        if (excludeOfflinedSplitParents && info.isSplitParent()) {
+          continue;
+        }
+        list.add(locs);
+      } while (moreRows);
+    }
+    return list;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 01d1a62..728da5c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaMutationAnnotation;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ServerName;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocateType;
 import org.apache.hadoop.hbase.client.SharedConnection;
 import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -2038,4 +2040,42 @@ public class MasterCoprocessorHost
       }
     });
   }
+
+  public void preLocateMetaRegion(byte[] row, RegionLocateType locateType) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.preLocateMetaRegion(this, row, locateType);
+      }
+    });
+  }
+
+  public void postLocateMetaRegion(byte[] row, RegionLocateType locateType,
+    List<HRegionLocation> locs) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.postLocateMetaRegion(this, row, locateType, locs);
+      }
+    });
+  }
+
+  public void preGetAllMetaRegionLocations(boolean excludeOfflinedSplitParents) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.preGetAllMetaRegionLocations(this, excludeOfflinedSplitParents);
+      }
+    });
+  }
+
+  public void postGetAllMetaRegionLocations(boolean excludeOfflinedSplitParents,
+    List<HRegionLocation> locs) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.postGetAllMetaRegionLocations(this, excludeOfflinedSplitParents, locs);
+      }
+    });
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index e7bf96d..81ea0c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.ServerMetricsBuilder;
@@ -54,7 +55,7 @@ import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocateType;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableState;
@@ -75,7 +76,6 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
-import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
 import org.apache.hadoop.hbase.master.janitor.MetaFixer;
 import org.apache.hadoop.hbase.master.locking.LockProcedure;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -125,7 +125,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -210,6 +209,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaReq
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetAllMetaRegionLocationsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
@@ -268,6 +269,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableD
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.LocateMetaRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
@@ -408,10 +411,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.
  */
 @InterfaceAudience.Private
 @SuppressWarnings("deprecation")
-public class MasterRpcServices extends RSRpcServices implements
-    MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
-    LockService.BlockingInterface, HbckService.BlockingInterface,
-    ClientMetaService.BlockingInterface {
+public class MasterRpcServices extends RSRpcServices implements MasterService.BlockingInterface,
+  RegionServerStatusService.BlockingInterface, LockService.BlockingInterface,
+  HbckService.BlockingInterface, ClientMetaService.BlockingInterface {
 
   private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName());
   private static final Logger AUDITLOG =
@@ -545,18 +547,17 @@ public class MasterRpcServices extends RSRpcServices implements
   @Override
   protected List<BlockingServiceAndInterface> getServices() {
     List<BlockingServiceAndInterface> bssi = new ArrayList<>(5);
-    bssi.add(new BlockingServiceAndInterface(
-        MasterService.newReflectiveBlockingService(this),
-        MasterService.BlockingInterface.class));
-    bssi.add(new BlockingServiceAndInterface(
-        RegionServerStatusService.newReflectiveBlockingService(this),
+    bssi.add(new BlockingServiceAndInterface(MasterService.newReflectiveBlockingService(this),
+      MasterService.BlockingInterface.class));
+    bssi.add(
+      new BlockingServiceAndInterface(RegionServerStatusService.newReflectiveBlockingService(this),
         RegionServerStatusService.BlockingInterface.class));
     bssi.add(new BlockingServiceAndInterface(LockService.newReflectiveBlockingService(this),
-        LockService.BlockingInterface.class));
+      LockService.BlockingInterface.class));
     bssi.add(new BlockingServiceAndInterface(HbckService.newReflectiveBlockingService(this),
-        HbckService.BlockingInterface.class));
+      HbckService.BlockingInterface.class));
     bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
-        ClientMetaService.BlockingInterface.class));
+      ClientMetaService.BlockingInterface.class));
     bssi.addAll(super.getServices());
     return bssi;
   }
@@ -1718,39 +1719,31 @@ public class MasterRpcServices extends RSRpcServices implements
   }
 
   @Override
-  public UnassignRegionResponse unassignRegion(RpcController controller,
-      UnassignRegionRequest req) throws ServiceException {
+  public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req)
+    throws ServiceException {
     try {
-      final byte [] regionName = req.getRegion().getValue().toByteArray();
+      final byte[] regionName = req.getRegion().getValue().toByteArray();
       RegionSpecifierType type = req.getRegion().getType();
       UnassignRegionResponse urr = UnassignRegionResponse.newBuilder().build();
 
       master.checkInitialized();
       if (type != RegionSpecifierType.REGION_NAME) {
-        LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
-          + " actual: " + type);
+        LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME +
+          " actual: " + type);
       }
-      Pair<RegionInfo, ServerName> pair =
-        MetaTableAccessor.getRegion(master.getConnection(), regionName);
-      if (Bytes.equals(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName(), regionName)) {
-        pair = new Pair<>(RegionInfoBuilder.FIRST_META_REGIONINFO,
-          MetaTableLocator.getMetaRegionLocation(master.getZooKeeper()));
-      }
-      if (pair == null) {
-        throw new UnknownRegionException(Bytes.toString(regionName));
+      final RegionInfo regionInfo = master.getAssignmentManager().getRegionInfo(regionName);
+      if (regionInfo == null) {
+        throw new UnknownRegionException(Bytes.toStringBinary(regionName));
       }
-
-      RegionInfo hri = pair.getFirst();
       if (master.cpHost != null) {
-        master.cpHost.preUnassign(hri);
+        master.cpHost.preUnassign(regionInfo);
       }
-      LOG.debug(master.getClientIdAuditPrefix() + " unassign " + hri.getRegionNameAsString()
+      LOG.debug(master.getClientIdAuditPrefix() + " unassign " + regionInfo.getRegionNameAsString()
           + " in current location if it is online");
-      master.getAssignmentManager().unassign(hri);
+      master.getAssignmentManager().unassign(regionInfo);
       if (master.cpHost != null) {
-        master.cpHost.postUnassign(hri);
+        master.cpHost.postUnassign(regionInfo);
       }
-
       return urr;
     } catch (IOException ioe) {
       throw new ServiceException(ioe);
@@ -3460,4 +3453,66 @@ public class MasterRpcServices extends RSRpcServices implements
       .addAllBalancerRejection(balancerRejections).build();
   }
 
+  @Override
+  public LocateMetaRegionResponse locateMetaRegion(RpcController controller,
+    LocateMetaRegionRequest request) throws ServiceException {
+    byte[] row = request.getRow().toByteArray();
+    RegionLocateType locateType = ProtobufUtil.toRegionLocateType(request.getLocateType());
+    try {
+      master.checkServiceStarted();
+      if (master.getMasterCoprocessorHost() != null) {
+        master.getMasterCoprocessorHost().preLocateMetaRegion(row, locateType);
+      }
+      RegionLocations locs = master.locateMeta(row, locateType);
+      List<HRegionLocation> list = new ArrayList<>();
+      LocateMetaRegionResponse.Builder builder = LocateMetaRegionResponse.newBuilder();
+      if (locs != null) {
+        for (HRegionLocation loc : locs) {
+          if (loc != null) {
+            builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
+            list.add(loc);
+          }
+        }
+      }
+      if (master.getMasterCoprocessorHost() != null) {
+        master.getMasterCoprocessorHost().postLocateMetaRegion(row, locateType, list);
+      }
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public GetAllMetaRegionLocationsResponse getAllMetaRegionLocations(RpcController controller,
+    GetAllMetaRegionLocationsRequest request) throws ServiceException {
+    boolean excludeOfflinedSplitParents = request.getExcludeOfflinedSplitParents();
+    try {
+      master.checkServiceStarted();
+      if (master.getMasterCoprocessorHost() != null) {
+        master.getMasterCoprocessorHost().preGetAllMetaRegionLocations(excludeOfflinedSplitParents);
+      }
+      List<RegionLocations> locs = master.getAllMetaRegionLocations(excludeOfflinedSplitParents);
+      List<HRegionLocation> list = new ArrayList<>();
+      GetAllMetaRegionLocationsResponse.Builder builder =
+        GetAllMetaRegionLocationsResponse.newBuilder();
+      if (locs != null) {
+        for (RegionLocations ls : locs) {
+          for (HRegionLocation loc : ls) {
+            if (loc != null) {
+              builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
+              list.add(loc);
+            }
+          }
+        }
+      }
+      if (master.getMasterCoprocessorHost() != null) {
+        master.getMasterCoprocessorHost().postGetAllMetaRegionLocations(excludeOfflinedSplitParents,
+          list);
+      }
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index f24ecd4..1cee59f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
 import java.util.List;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
@@ -577,4 +578,12 @@ public interface MasterServices extends Server {
    * We need to get this in MTP to tell the syncer the new meta replica count.
    */
   MetaLocationSyncer getMetaLocationSyncer();
+
+  /**
+   * Get locations for all meta regions.
+   * @param excludeOfflinedSplitParents don't return split parents
+   * @return The locations of all the meta regions
+   */
+  List<RegionLocations> getAllMetaRegionLocations(boolean excludeOfflinedSplitParents)
+    throws IOException;
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
index 07512d1..b192a67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java
@@ -39,11 +39,14 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
 /**
- * A cache of meta region location metadata. Registers a listener on ZK to track changes to the
- * meta table znodes. Clients are expected to retry if the meta information is stale. This class
- * is thread-safe (a single instance of this class can be shared by multiple threads without race
+ * A cache of meta region location metadata. Registers a listener on ZK to track changes to the meta
+ * table znodes. Clients are expected to retry if the meta information is stale. This class is
+ * thread-safe (a single instance of this class can be shared by multiple threads without race
  * conditions).
+ * @deprecated Now we store meta location in the local store at master side so we should get the
+ *             meta location from active master instead of zk, keep it here only for compatibility.
  */
+@Deprecated
 @InterfaceAudience.Private
 public class MetaRegionLocationCache extends ZKListener {
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
index 87c04da..cce7a81 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -191,8 +191,8 @@ public class RegionStateStore {
     final Put put = new Put(CatalogFamilyFormat.getMetaKeyForRegion(regionInfo), time);
     MetaTableAccessor.addRegionInfo(put, regionInfo);
     final StringBuilder info =
-      new StringBuilder("pid=").append(pid).append(" updating hbase:meta row=")
-        .append(regionInfo.getEncodedName()).append(", regionState=").append(state);
+      new StringBuilder("pid=").append(pid).append(" updating catalog row=")
+        .append(regionInfo.getRegionNameAsString()).append(", regionState=").append(state);
     if (openSeqNum >= 0) {
       Preconditions.checkArgument(state == State.OPEN && regionLocation != null,
         "Open region should be on a server");
@@ -228,7 +228,7 @@ public class RegionStateStore {
   }
 
   public void mirrorMetaLocation(RegionInfo regionInfo, ServerName serverName, State state)
-      throws IOException {
+    throws IOException {
     try {
       MetaTableLocator.setMetaLocation(master.getZooKeeper(), serverName, regionInfo.getReplicaId(),
         state);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/http/MasterStatusServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/http/MasterStatusServlet.java
index 51790aa..3d00e49 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/http/MasterStatusServlet.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/http/MasterStatusServlet.java
@@ -27,11 +27,11 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.ServerManager;
 import org.apache.hadoop.hbase.tmpl.master.MasterStatusTmpl;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -82,7 +82,8 @@ public class MasterStatusServlet extends HttpServlet {
   }
 
   private ServerName getMetaLocationOrNull(HMaster master) {
-    return MetaTableLocator.getMetaRegionLocation(master.getZooKeeper());
+    return master.getAssignmentManager().getRegionStates()
+      .getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO).getServerName();
   }
 
   private Map<String, Integer> getFragmentationInfo(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index 2313e70..30c3458 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -368,8 +368,6 @@ public class CreateTableProcedure
     final TableDescriptor tableDescriptor, final List<RegionInfo> regions) throws IOException {
     assert (regions != null && regions.size() > 0) : "expected at least 1 region, got " + regions;
 
-    ProcedureSyncWait.waitMetaRegions(env);
-
     // Add replicas if needed
     // we need to create regions with replicaIds starting from 1
     List<RegionInfo> newRegions =
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
index 46621da..b28c95f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
@@ -26,7 +26,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
@@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -222,18 +220,6 @@ public final class ProcedureSyncWait {
     throw new TimeoutIOException("Timed out while waiting on " + purpose);
   }
 
-  protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException {
-    int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000);
-    try {
-      if (MetaTableLocator.waitMetaRegionLocation(env.getMasterServices().getZooKeeper(),
-        timeout) == null) {
-        throw new NotAllMetaRegionsOnlineException();
-      }
-    } catch (InterruptedException e) {
-      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
-    }
-  }
-
   protected static void waitRegionInTransition(final MasterProcedureEnv env,
       final List<RegionInfo> regions) throws IOException {
     final RegionStates states = env.getAssignmentManager().getRegionStates();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
index 23d0263..507058b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
@@ -21,8 +21,11 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -36,7 +39,6 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -159,7 +161,9 @@ public final class MasterSnapshotVerifier {
   private void verifyRegions(final SnapshotManifest manifest) throws IOException {
     List<RegionInfo> regions;
     if (TableName.META_TABLE_NAME.equals(tableName)) {
-      regions = MetaTableLocator.getMetaRegions(services.getZooKeeper());
+      regions = services.getAllMetaRegionLocations(false).stream()
+        .flatMap(locs -> Stream.of(locs.getRegionLocations())).filter(l -> l != null)
+        .map(HRegionLocation::getRegion).collect(Collectors.toList());
     } else {
       regions = MetaTableAccessor.getTableRegions(services.getConnection(), tableName);
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
index 5ff8a49..108f0fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
@@ -22,6 +22,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -49,7 +51,6 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -197,12 +198,15 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
       monitor.rethrowException();
 
       List<Pair<RegionInfo, ServerName>> regionsAndLocations;
+
       if (TableName.META_TABLE_NAME.equals(snapshotTable)) {
-        regionsAndLocations = MetaTableLocator.getMetaRegionsAndLocations(
-          server.getZooKeeper());
+        regionsAndLocations = master.getAllMetaRegionLocations(false).stream()
+          .flatMap(locs -> Stream.of(locs.getRegionLocations())).filter(l -> l != null)
+          .map(loc -> Pair.newPair(loc.getRegion(), loc.getServerName()))
+          .collect(Collectors.toList());
       } else {
-        regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(
-          server.getConnection(), snapshotTable, false);
+        regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(server.getConnection(),
+          snapshotTable, false);
       }
 
       // run the snapshot
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
index c6a3b92..2b28886 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
@@ -24,13 +24,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -44,7 +45,6 @@ import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -131,20 +131,16 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager {
     // Each region server will get its own online regions for the table.
     // We may still miss regions that need to be flushed.
     List<Pair<RegionInfo, ServerName>> regionsAndLocations;
-
-    if (TableName.META_TABLE_NAME.equals(tableName)) {
-      regionsAndLocations = MetaTableLocator.getMetaRegionsAndLocations(
-        master.getZooKeeper());
-    } else {
-      regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(
-        master.getConnection(), tableName, false);
+    try (RegionLocator locator =
+      master.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+      regionsAndLocations = locator.getAllRegionLocations().stream()
+        .map(loc -> Pair.newPair(loc.getRegion(), loc.getServerName()))
+        .collect(Collectors.toList());
     }
 
     Set<String> regionServers = new HashSet<>(regionsAndLocations.size());
     for (Pair<RegionInfo, ServerName> region : regionsAndLocations) {
       if (region != null && region.getFirst() != null && region.getSecond() != null) {
-        RegionInfo hri = region.getFirst();
-        if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
         regionServers.add(region.getSecond().toString());
       }
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index c00a8b7..ae602a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -120,7 +120,6 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterRpcServicesVersionWrapper;
-import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
@@ -178,7 +177,6 @@ import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -2416,8 +2414,8 @@ public class HRegionServer extends Thread implements
   }
 
   /**
-   * Helper method for use in tests. Skip the region transition report when there's no master
-   * around to receive it.
+   * Helper method for use in tests. Skip the region transition report when there's no master around
+   * to receive it.
    */
   private boolean skipReportingTransition(final RegionStateTransitionContext context) {
     final TransitionCode code = context.getCode();
@@ -2428,17 +2426,13 @@ public class HRegionServer extends Thread implements
     if (code == TransitionCode.OPENED) {
       Preconditions.checkArgument(hris != null && hris.length == 1);
       if (hris[0].isMetaRegion()) {
-        try {
-          MetaTableLocator.setMetaLocation(getZooKeeper(), serverName,
-              hris[0].getReplicaId(), RegionState.State.OPEN);
-        } catch (KeeperException e) {
-          LOG.info("Failed to update meta location", e);
-          return false;
-        }
+        LOG.warn(
+          "meta table location is stored in master local store, so we can not skip reporting");
+        return false;
       } else {
         try {
           MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
-              serverName, openSeqNum, masterSystemTime);
+            serverName, openSeqNum, masterSystemTime);
         } catch (IOException e) {
           LOG.info("Failed to update meta", e);
           return false;
diff --git a/hbase-server/src/main/resources/hbase-webapps/master/table.jsp b/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
index f6753df..29913b5 100644
--- a/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
+++ b/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
@@ -68,7 +68,6 @@
 <%@ page import="org.apache.hadoop.hbase.quotas.ThrottleSettings" %>
 <%@ page import="org.apache.hadoop.hbase.util.Bytes" %>
 <%@ page import="org.apache.hadoop.hbase.util.FSUtils" %>
-<%@ page import="org.apache.hadoop.hbase.zookeeper.MetaTableLocator" %>
 <%@ page import="org.apache.hadoop.util.StringUtils" %>
 <%@ page import="org.apache.hbase.thirdparty.com.google.protobuf.ByteString" %>
 <%@ page import="org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos" %>
@@ -314,14 +313,9 @@
           for (int j = 0; j < numMetaReplicas; j++) {
             RegionInfo meta = RegionReplicaUtil.getRegionInfoForReplica(
                                     RegionInfoBuilder.FIRST_META_REGIONINFO, j);
-            //If a metaLocation is null, All of its info would be empty here to be displayed.
-            ServerName metaLocation = null;
-            try {
-              metaLocation = MetaTableLocator.waitMetaRegionLocation(master.getZooKeeper(), j, 1);
-            } catch (NotAllMetaRegionsOnlineException e) {
-              //Region in transition state here throw a NotAllMetaRegionsOnlineException causes
-              //the UI crash.
-            }
+            RegionState regionState = master.getAssignmentManager().getRegionStates().getRegionState(meta);
+            // If a metaLocation is null, All of its info would be empty here to be displayed.
+            ServerName metaLocation = regionState != null ? regionState.getServerName() : null;
             for (int i = 0; i < 1; i++) {
               //If metaLocation is null, default value below would be displayed in UI.
               String hostAndPort = "";
@@ -387,14 +381,9 @@
            for (int j = 0; j < numMetaReplicas; j++) {
              RegionInfo meta = RegionReplicaUtil.getRegionInfoForReplica(
                                      RegionInfoBuilder.FIRST_META_REGIONINFO, j);
-             //If a metaLocation is null, All of its info would be empty here to be displayed.
-             ServerName metaLocation = null;
-             try {
-               metaLocation = MetaTableLocator.waitMetaRegionLocation(master.getZooKeeper(), j, 1);
-             } catch (NotAllMetaRegionsOnlineException e) {
-               //Region in transition state here throw a NotAllMetaRegionsOnlineException causes
-               //the UI crash.
-             }
+             RegionState regionState = master.getAssignmentManager().getRegionStates().getRegionState(meta);
+             // If a metaLocation is null, All of its info would be empty here to be displayed.
+             ServerName metaLocation = regionState != null ? regionState.getServerName() : null;
              for (int i = 0; i < 1; i++) {
                //If metaLocation is null, default value below would be displayed in UI.
                String hostAndPort = "";
@@ -443,14 +432,9 @@
           for (int j = 0; j < numMetaReplicas; j++) {
             RegionInfo meta = RegionReplicaUtil.getRegionInfoForReplica(
                                     RegionInfoBuilder.FIRST_META_REGIONINFO, j);
-            //If a metaLocation is null, All of its info would be empty here to be displayed.
-            ServerName metaLocation = null;
-            try {
-              metaLocation = MetaTableLocator.waitMetaRegionLocation(master.getZooKeeper(), j, 1);
-            } catch (NotAllMetaRegionsOnlineException e) {
-              //Region in transition state here throw a NotAllMetaRegionsOnlineException causes
-              //the UI crash.
-            }
+            RegionState regionState = master.getAssignmentManager().getRegionStates().getRegionState(meta);
+            // If a metaLocation is null, All of its info would be empty here to be displayed.
+            ServerName metaLocation = regionState != null ? regionState.getServerName() : null;
             for (int i = 0; i < 1; i++) {
               //If metaLocation is null, default value below would be displayed in UI.
               String hostAndPort = "";
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index 922da6f..9169c22 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -243,13 +242,6 @@ public class TestMetaTableAccessor {
   }
 
   @Test
-  public void testGetRegionsFromMetaTable() throws IOException, InterruptedException {
-    List<RegionInfo> regions = MetaTableLocator.getMetaRegions(UTIL.getZooKeeperWatcher());
-    assertTrue(regions.size() >= 1);
-    assertTrue(MetaTableLocator.getMetaRegionsAndLocations(UTIL.getZooKeeperWatcher()).size() >= 1);
-  }
-
-  @Test
   public void testGetRegion() throws IOException, InterruptedException {
     final String name = this.name.getMethodName();
     LOG.info("Started " + name);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java
deleted file mode 100644
index 3bea0a7..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
-
-/**
- * Test {@link org.apache.hadoop.hbase.zookeeper.MetaTableLocator}
- */
-@Category({ MiscTests.class, MediumTests.class })
-public class TestMetaTableLocator {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestMetaTableLocator.class);
-
-  private static final Logger LOG = LoggerFactory.getLogger(TestMetaTableLocator.class);
-  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
-  private static final ServerName SN =
-    ServerName.valueOf("example.org", 1234, EnvironmentEdgeManager.currentTime());
-  private ZKWatcher watcher;
-  private Abortable abortable;
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    // Set this down so tests run quicker
-    UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
-    UTIL.startMiniZKCluster();
-  }
-
-  @AfterClass
-  public static void afterClass() throws IOException {
-    UTIL.getZkCluster().shutdown();
-  }
-
-  @Before
-  public void before() throws IOException {
-    this.abortable = new Abortable() {
-      @Override
-      public void abort(String why, Throwable e) {
-        LOG.info(why, e);
-      }
-
-      @Override
-      public boolean isAborted() {
-        return false;
-      }
-    };
-    this.watcher =
-      new ZKWatcher(UTIL.getConfiguration(), this.getClass().getSimpleName(), this.abortable, true);
-  }
-
-  @After
-  public void after() {
-    try {
-      // Clean out meta location or later tests will be confused... they presume
-      // start fresh in zk.
-      MetaTableLocator.deleteMetaLocation(this.watcher);
-    } catch (KeeperException e) {
-      LOG.warn("Unable to delete hbase:meta location", e);
-    }
-
-    this.watcher.close();
-  }
-
-  /**
-   * Test normal operations
-   */
-  @Test
-  public void testMetaLookup()
-      throws IOException, InterruptedException, ServiceException, KeeperException {
-    final ClientProtos.ClientService.BlockingInterface client =
-      Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
-
-    Mockito.when(client.get((RpcController) Mockito.any(), (GetRequest) Mockito.any()))
-      .thenReturn(GetResponse.newBuilder().build());
-
-    assertNull(MetaTableLocator.getMetaRegionLocation(this.watcher));
-    for (RegionState.State state : RegionState.State.values()) {
-      if (state.equals(RegionState.State.OPEN)) {
-        continue;
-      }
-      MetaTableLocator.setMetaLocation(this.watcher, SN, state);
-      assertNull(MetaTableLocator.getMetaRegionLocation(this.watcher));
-      assertEquals(state, MetaTableLocator.getMetaRegionState(this.watcher).getState());
-    }
-    MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPEN);
-    assertEquals(SN, MetaTableLocator.getMetaRegionLocation(this.watcher));
-    assertEquals(RegionState.State.OPEN,
-      MetaTableLocator.getMetaRegionState(this.watcher).getState());
-
-    MetaTableLocator.deleteMetaLocation(this.watcher);
-    assertNull(MetaTableLocator.getMetaRegionState(this.watcher).getServerName());
-    assertEquals(RegionState.State.OFFLINE,
-      MetaTableLocator.getMetaRegionState(this.watcher).getState());
-    assertNull(MetaTableLocator.getMetaRegionLocation(this.watcher));
-  }
-
-  @Test(expected = NotAllMetaRegionsOnlineException.class)
-  public void testTimeoutWaitForMeta() throws IOException, InterruptedException {
-    MetaTableLocator.waitMetaRegionLocation(watcher, 100);
-  }
-
-  /**
-   * Test waiting on meat w/ no timeout specified.
-   */
-  @Test
-  public void testNoTimeoutWaitForMeta() throws IOException, InterruptedException, KeeperException {
-    ServerName hsa = MetaTableLocator.getMetaRegionLocation(watcher);
-    assertNull(hsa);
-
-    // Now test waiting on meta location getting set.
-    Thread t = new WaitOnMetaThread();
-    startWaitAliveThenWaitItLives(t, 1);
-    // Set a meta location.
-    MetaTableLocator.setMetaLocation(this.watcher, SN, RegionState.State.OPEN);
-    hsa = SN;
-    // Join the thread... should exit shortly.
-    t.join();
-    // Now meta is available.
-    assertTrue(MetaTableLocator.getMetaRegionLocation(watcher).equals(hsa));
-  }
-
-  private void startWaitAliveThenWaitItLives(final Thread t, final int ms) {
-    t.start();
-    UTIL.waitFor(2000, t::isAlive);
-    // Wait one second.
-    Threads.sleep(ms);
-    assertTrue("Assert " + t.getName() + " still waiting", t.isAlive());
-  }
-
-  /**
-   * Wait on META.
-   */
-  class WaitOnMetaThread extends Thread {
-
-    WaitOnMetaThread() {
-      super("WaitOnMeta");
-    }
-
-    @Override
-    public void run() {
-      try {
-        doWaiting();
-      } catch (InterruptedException e) {
-        throw new RuntimeException("Failed wait", e);
-      }
-      LOG.info("Exiting " + getName());
-    }
-
-    void doWaiting() throws InterruptedException {
-      try {
-        for (;;) {
-          if (MetaTableLocator.waitMetaRegionLocation(watcher, 10000) != null) {
-            break;
-          }
-        }
-      } catch (NotAllMetaRegionsOnlineException e) {
-        // Ignore
-      }
-    }
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
index f14faf7..277f7bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
@@ -58,10 +58,7 @@ public abstract class AbstractTestRegionLocator {
     }
     UTIL.getAdmin().createTable(td, SPLIT_KEYS);
     UTIL.waitTableAvailable(TABLE_NAME);
-    try (ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration())) {
-      RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
-    }
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL);
     UTIL.getAdmin().balancerSwitch(false, true);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
index c9d67f4..ea1122c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 
 /**
@@ -32,11 +31,6 @@ public class DummyConnectionRegistry implements ConnectionRegistry {
       HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
 
   @Override
-  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
-    return null;
-  }
-
-  @Override
   public CompletableFuture<String> getClusterId() {
     return null;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java
index d34b419..3b0fbe8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/MetaWithReplicasTestBase.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.StartTestingClusterOption;
 import org.apache.hadoop.hbase.TableName;
@@ -34,7 +35,6 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.junit.AfterClass;
 import org.junit.Rule;
 import org.slf4j.Logger;
@@ -65,8 +65,11 @@ public class MetaWithReplicasTestBase {
     HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 3);
     AssignmentManager am = TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
     Set<ServerName> sns = new HashSet<ServerName>();
-    ServerName hbaseMetaServerName =
-      MetaTableLocator.getMetaRegionLocation(TEST_UTIL.getZooKeeperWatcher());
+    ServerName hbaseMetaServerName;
+    try (RegionLocator locator =
+      TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+      hbaseMetaServerName = locator.getRegionLocation(HConstants.EMPTY_START_ROW).getServerName();
+    }
     LOG.info("HBASE:META DEPLOY: on " + hbaseMetaServerName);
     sns.add(hbaseMetaServerName);
     for (int replicaId = 1; replicaId < 3; replicaId++) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
index abb0c11..05a1051 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
@@ -43,8 +43,7 @@ public final class RegionReplicaTestHelper {
   }
 
   // waits for all replicas to have region location
-  static void waitUntilAllMetaReplicasAreReady(HBaseTestingUtil util,
-    ConnectionRegistry registry) throws IOException {
+  static void waitUntilAllMetaReplicasAreReady(HBaseTestingUtil util) throws IOException {
     Configuration conf = util.getConfiguration();
     int regionReplicaCount =
       util.getAdmin().getDescriptor(TableName.META_TABLE_NAME).getRegionReplication();
@@ -58,16 +57,20 @@ public final class RegionReplicaTestHelper {
         @Override
         public boolean evaluate() {
           try {
-            RegionLocations locs = registry.getMetaRegionLocations().get();
+            List<HRegionLocation> locs;
+            try (RegionLocator locator =
+              util.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+              locs = locator.getAllRegionLocations();
+            }
             if (locs.size() < regionReplicaCount) {
               return false;
             }
             for (int i = 0; i < regionReplicaCount; i++) {
-              HRegionLocation loc = locs.getRegionLocation(i);
+              HRegionLocation loc = locs.get(i);
               // Wait until the replica is served by a region server. There could be delay between
               // the replica being available to the connection and region server opening it.
               Optional<ServerName> rsCarryingReplica =
-                  getRSCarryingReplica(util, loc.getRegion().getTable(), i);
+                getRSCarryingReplica(util, loc.getRegion().getTable(), i);
               if (!rsCarryingReplica.isPresent()) {
                 return false;
               }
@@ -120,7 +123,7 @@ public final class RegionReplicaTestHelper {
 
   interface Locator {
     RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
-        throws Exception;
+      throws Exception;
 
     void updateCachedLocationOnError(HRegionLocation loc, Throwable error) throws Exception;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
index 81dafae..13e13bc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
@@ -54,10 +54,7 @@ public class TestAsyncAdminWithRegionReplicas extends TestAsyncAdminBase {
   public static void setUpBeforeClass() throws Exception {
     TestAsyncAdminBase.setUpBeforeClass();
     HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
-    try (ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) {
-      RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry);
-    }
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
   }
 
   private void testMoveNonDefaultReplica(TableName tableName)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
index 480d797..6a06f32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
 import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
 
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -44,24 +45,25 @@ public class TestAsyncMetaRegionLocator {
 
   private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
 
-  private static ConnectionRegistry REGISTRY;
+  private static AsyncConnectionImpl CONN;
 
-  private static AsyncMetaRegionLocator LOCATOR;
+  private static AsyncRegionLocator LOCATOR;
 
   @BeforeClass
   public static void setUp() throws Exception {
     TEST_UTIL.startMiniCluster(3);
     HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
     TEST_UTIL.waitUntilNoRegionsInTransition();
-    REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
-    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
+    CONN = (AsyncConnectionImpl) ConnectionFactory
+      .createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
-    LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
+    LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
   }
 
   @AfterClass
   public static void tearDown() throws Exception {
-    Closeables.close(REGISTRY, true);
+    Closeables.close(CONN, true);
     TEST_UTIL.shutdownMiniCluster();
   }
 
@@ -71,14 +73,15 @@ public class TestAsyncMetaRegionLocator {
 
       @Override
       public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
-          throws Exception {
+        throws Exception {
         LOCATOR.updateCachedLocationOnError(loc, error);
       }
 
       @Override
       public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
-          throws Exception {
-        return LOCATOR.getRegionLocations(replicaId, reload).get();
+        throws Exception {
+        return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
+          RegionLocateType.CURRENT, reload).get();
       }
     });
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index 44b954e..9d9090e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -59,8 +59,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 
@@ -72,43 +70,34 @@ public class TestAsyncNonMetaRegionLocator {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
 
-  private static final Logger LOG = LoggerFactory.getLogger(TestAsyncNonMetaRegionLocator.class);
-
   private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
 
   private static TableName TABLE_NAME = TableName.valueOf("async");
 
   private static byte[] FAMILY = Bytes.toBytes("cf");
-  private static final int META_STOREFILE_REFRESH_PERIOD = 100;
   private static final int NB_SERVERS = 4;
   private static int numOfMetaReplica = NB_SERVERS - 1;
+  private static byte[][] SPLIT_KEYS;
 
   private static AsyncConnectionImpl CONN;
 
-  private static AsyncNonMetaRegionLocator LOCATOR;
-  private static ConnectionRegistry registry;
-
-  private static byte[][] SPLIT_KEYS;
-  private CatalogReplicaMode metaReplicaMode;
+  private AsyncRegionLocator LOCATOR;
+  private ConnectionRegistry registry;
 
   @BeforeClass
   public static void setUp() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();
     // Enable hbase:meta replication.
     conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
-    conf.setLong("replication.source.sleepforretries", 10);    // 10 ms
-
+    conf.setLong("replication.source.sleepforretries", 10); // 10 ms
     TEST_UTIL.startMiniCluster(NB_SERVERS);
     Admin admin = TEST_UTIL.getAdmin();
     admin.balancerSwitch(false, true);
 
     // Enable hbase:meta replication.
     HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, numOfMetaReplica);
-    TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getMiniHBaseCluster().getRegions(
-      TableName.META_TABLE_NAME).size() >= numOfMetaReplica);
-
-    registry = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
-
+    TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getMiniHBaseCluster()
+      .getRegions(TableName.META_TABLE_NAME).size() >= numOfMetaReplica);
     SPLIT_KEYS = new byte[8][];
     for (int i = 111; i < 999; i += 111) {
       SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
@@ -146,12 +135,11 @@ public class TestAsyncNonMetaRegionLocator {
     // Enable meta replica LoadBalance mode for this connection.
     if (clientMetaReplicaMode != null) {
       c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, clientMetaReplicaMode);
-      metaReplicaMode = CatalogReplicaMode.fromString(clientMetaReplicaMode);
     }
-
+    registry = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(c, registry,
       registry.getClusterId().get(), null, User.getCurrent());
-    LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+    LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
   }
 
   private void createSingleRegionTable() throws IOException, InterruptedException {
@@ -160,7 +148,7 @@ public class TestAsyncNonMetaRegionLocator {
   }
 
   private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName,
-      byte[] row, RegionLocateType locateType, boolean reload) {
+    byte[] row, RegionLocateType locateType, boolean reload) {
     return LOCATOR
       .getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload)
       .thenApply(RegionLocations::getDefaultRegionLocation);
@@ -191,7 +179,7 @@ public class TestAsyncNonMetaRegionLocator {
   }
 
   private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
-      HRegionLocation loc) {
+    HRegionLocation loc) {
     RegionInfo info = loc.getRegion();
     assertEquals(TABLE_NAME, info.getTable());
     assertArrayEquals(startKey, info.getStartKey());
@@ -418,7 +406,7 @@ public class TestAsyncNonMetaRegionLocator {
   // Testcase for HBASE-20822
   @Test
   public void testLocateBeforeLastRegion()
-      throws IOException, InterruptedException, ExecutionException {
+    throws IOException, InterruptedException, ExecutionException {
     createMultiRegionTable();
     getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
     HRegionLocation loc =
@@ -436,13 +424,13 @@ public class TestAsyncNonMetaRegionLocator {
 
       @Override
       public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
-          throws Exception {
+        throws Exception {
         LOCATOR.updateCachedLocationOnError(loc, error);
       }
 
       @Override
       public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
-          throws Exception {
+        throws Exception {
         return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
           RegionLocateType.CURRENT, reload).get();
       }
@@ -464,9 +452,8 @@ public class TestAsyncNonMetaRegionLocator {
   public void testConcurrentUpdateCachedLocationOnError() throws Exception {
     createSingleRegionTable();
     HRegionLocation loc =
-        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false)
-            .get();
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
     IntStream.range(0, 100).parallel()
-        .forEach(i -> LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException()));
+      .forEach(i -> LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException()));
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java
index c9d47dc..7fadef3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi2.java
@@ -51,8 +51,8 @@ import org.junit.runners.Parameterized;
 
 /**
  * Class to test asynchronous region admin operations.
- * @see TestAsyncRegionAdminApi This test and it used to be joined it was taking longer than our
- * ten minute timeout so they were split.
+ * @see TestAsyncRegionAdminApi This test and it used to be joined it was taking longer than our ten
+ *      minute timeout so they were split.
  */
 @RunWith(Parameterized.class)
 @Category({ LargeTests.class, ClientTests.class })
@@ -60,7 +60,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncRegionAdminApi2.class);
+    HBaseClassTestRule.forClass(TestAsyncRegionAdminApi2.class);
 
   @Test
   public void testGetRegionLocation() throws Exception {
@@ -79,13 +79,13 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
   @Test
   public void testSplitSwitch() throws Exception {
     createTableWithDefaultConf(tableName);
-    byte[][] families = {FAMILY};
+    byte[][] families = { FAMILY };
     final int rows = 10000;
     TestAsyncRegionAdminApi.loadData(tableName, families, rows);
 
     AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
     List<HRegionLocation> regionLocations =
-        ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).get();
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
     int originalCount = regionLocations.size();
 
     initSplitMergeSwitch();
@@ -93,7 +93,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
     try {
       admin.split(tableName, Bytes.toBytes(rows / 2)).join();
     } catch (Exception e) {
-      //Expected
+      // Expected
     }
     int count = admin.getRegions(tableName).get().size();
     assertTrue(originalCount == count);
@@ -111,12 +111,12 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
   // It was ignored in TestSplitOrMergeStatus, too
   public void testMergeSwitch() throws Exception {
     createTableWithDefaultConf(tableName);
-    byte[][] families = {FAMILY};
+    byte[][] families = { FAMILY };
     TestAsyncRegionAdminApi.loadData(tableName, families, 1000);
 
     AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
     List<HRegionLocation> regionLocations =
-        ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).get();
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
     int originalCount = regionLocations.size();
 
     initSplitMergeSwitch();
@@ -126,7 +126,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
       Threads.sleep(100);
     }
     assertTrue("originalCount=" + originalCount + ", postSplitCount=" + postSplitCount,
-        originalCount != postSplitCount);
+      originalCount != postSplitCount);
 
     // Merge switch is off so merge should NOT succeed.
     assertTrue(admin.mergeSwitch(false).get());
@@ -156,12 +156,12 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
 
   @Test
   public void testMergeRegions() throws Exception {
-    byte[][] splitRows = new byte[][]{Bytes.toBytes("3"), Bytes.toBytes("6")};
+    byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") };
     createTableWithDefaultConf(tableName, splitRows);
 
     AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
     List<HRegionLocation> regionLocations = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+      .getTableHRegionLocations(metaTable, tableName, true).get();
     RegionInfo regionA;
     RegionInfo regionB;
     RegionInfo regionC;
@@ -175,7 +175,7 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
     admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
 
     regionLocations = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+      .getTableHRegionLocations(metaTable, tableName, true).get();
 
     assertEquals(2, regionLocations.size());
     for (HRegionLocation rl : regionLocations) {
@@ -195,11 +195,10 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
       Thread.sleep(200);
     }
     // merge with encoded name
-    admin.mergeRegions(regionC.getRegionName(), mergedChildRegion.getRegionName(),
-      false).get();
+    admin.mergeRegions(regionC.getRegionName(), mergedChildRegion.getRegionName(), false).get();
 
-    regionLocations = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+    regionLocations =
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
     assertEquals(1, regionLocations.size());
   }
 
@@ -233,18 +232,18 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
     splitTest(TableName.valueOf("testSplitTable"), 3000, false, null);
     splitTest(TableName.valueOf("testSplitTableWithSplitPoint"), 3000, false, Bytes.toBytes("3"));
     splitTest(TableName.valueOf("testSplitTableRegion"), 3000, true, null);
-    splitTest(TableName.valueOf("testSplitTableRegionWithSplitPoint2"), 3000, true, Bytes.toBytes("3"));
+    splitTest(TableName.valueOf("testSplitTableRegionWithSplitPoint2"), 3000, true,
+      Bytes.toBytes("3"));
   }
 
-  private void
-  splitTest(TableName tableName, int rowCount, boolean isSplitRegion, byte[] splitPoint)
-      throws Exception {
+  private void splitTest(TableName tableName, int rowCount, boolean isSplitRegion,
+    byte[] splitPoint) throws Exception {
     // create table
     createTableWithDefaultConf(tableName);
 
     AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
     List<HRegionLocation> regionLocations = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+      .getTableHRegionLocations(metaTable, tableName, true).get();
     assertEquals(1, regionLocations.size());
 
     AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
@@ -273,8 +272,8 @@ public class TestAsyncRegionAdminApi2 extends TestAsyncAdminBase {
     int count = 0;
     for (int i = 0; i < 45; i++) {
       try {
-        regionLocations = ClientMetaTableAccessor
-          .getTableHRegionLocations(metaTable, tableName).get();
+        regionLocations =
+          ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
         count = regionLocations.size();
         if (count >= 2) {
           break;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorConcurrenyLimit.java
similarity index 90%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorConcurrenyLimit.java
index 690a384..c2d001a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorConcurrenyLimit.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static java.util.stream.Collectors.toList;
-import static org.apache.hadoop.hbase.client.AsyncNonMetaRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
 import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
@@ -57,11 +57,11 @@ import org.junit.experimental.categories.Category;
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 
 @Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
+public class TestAsyncRegionLocatorConcurrenyLimit {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
+    HBaseClassTestRule.forClass(TestAsyncRegionLocatorConcurrenyLimit.class);
 
   private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
 
@@ -71,7 +71,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
 
   private static AsyncConnectionImpl CONN;
 
-  private static AsyncNonMetaRegionLocator LOCATOR;
+  private static AsyncRegionLocator LOCATOR;
 
   private static byte[][] SPLIT_KEYS;
 
@@ -90,7 +90,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
 
     @Override
     public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
-        InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
+      InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
       if (c.getEnvironment().getRegionInfo().isMetaRegion()) {
         int concurrency = CONCURRENCY.incrementAndGet();
         for (;;) {
@@ -109,7 +109,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
 
     @Override
     public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
-        InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
+      InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
       if (c.getEnvironment().getRegionInfo().isMetaRegion()) {
         CONCURRENCY.decrementAndGet();
       }
@@ -125,10 +125,10 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     ConnectionRegistry registry =
-        ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
       registry.getClusterId().get(), null, User.getCurrent());
-    LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+    LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
     SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
       .toArray(byte[][]::new);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
@@ -142,7 +142,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
   }
 
   private void assertLocs(List<CompletableFuture<RegionLocations>> futures)
-      throws InterruptedException, ExecutionException {
+    throws InterruptedException, ExecutionException {
     assertEquals(256, futures.size());
     for (int i = 0; i < futures.size(); i++) {
       HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
index 572a1d5..42d1919 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
@@ -45,8 +45,8 @@ import org.junit.runners.Parameterized;
 
 /**
  * Class to test asynchronous table admin operations.
- * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our
- *     ten minute timeout so they were split.
+ * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our ten
+ *      minute timeout so they were split.
  * @see TestAsyncTableAdminApi3 Another split out from this class so each runs under ten minutes.
  */
 @RunWith(Parameterized.class)
@@ -55,7 +55,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncTableAdminApi.class);
+    HBaseClassTestRule.forClass(TestAsyncTableAdminApi.class);
 
   @Test
   public void testCreateTable() throws Exception {
@@ -65,13 +65,13 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     tables = admin.listTableDescriptors().get();
     assertEquals(numTables + 1, tables.size());
     assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
-        .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
+      .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
     assertEquals(TableState.State.ENABLED, getStateFromMeta(tableName));
   }
 
   static TableState.State getStateFromMeta(TableName table) throws Exception {
     Optional<TableState> state = ClientMetaTableAccessor
-        .getTableState(ASYNC_CONN.getTable(TableName.META_TABLE_NAME), table).get();
+      .getTableState(ASYNC_CONN.getTable(TableName.META_TABLE_NAME), table).get();
     assertTrue(state.isPresent());
     return state.get().getState();
   }
@@ -82,19 +82,21 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
 
     createTableWithDefaultConf(tableName);
     List<HRegionLocation> regionLocations = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+      .getTableHRegionLocations(metaTable, tableName, true).get();
     assertEquals("Table should have only 1 region", 1, regionLocations.size());
 
     final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "_2");
     createTableWithDefaultConf(tableName2, new byte[][] { new byte[] { 42 } });
-    regionLocations = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2).get();
+    regionLocations =
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2, true).get();
     assertEquals("Table should have only 2 region", 2, regionLocations.size());
 
     final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "_3");
     TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName3);
     builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
     admin.createTable(builder.build(), Bytes.toBytes("a"), Bytes.toBytes("z"), 3).join();
-    regionLocations = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3).get();
+    regionLocations =
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3, true).get();
     assertEquals("Table should have only 3 region", 3, regionLocations.size());
 
     final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "_4");
@@ -111,7 +113,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     builder = TableDescriptorBuilder.newBuilder(tableName5);
     builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
     admin.createTable(builder.build(), new byte[] { 1 }, new byte[] { 127 }, 16).join();
-    regionLocations = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName5).get();
+    regionLocations =
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName5, true).get();
     assertEquals("Table should have 16 region", 16, regionLocations.size());
   }
 
@@ -128,7 +131,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
 
     AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
     List<HRegionLocation> regions = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+      .getTableHRegionLocations(metaTable, tableName, true).get();
     Iterator<HRegionLocation> hris = regions.iterator();
 
     assertEquals(
@@ -183,7 +186,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
     admin.createTable(builder.build(), startKey, endKey, expectedRegions).join();
 
-    regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2).get();
+    regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName2, true).get();
     assertEquals(
       "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
       expectedRegions, regions.size());
@@ -231,8 +234,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
     admin.createTable(builder.build(), startKey, endKey, expectedRegions).join();
 
-    regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3)
-      .get();
+    regions = ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName3, true).get();
     assertEquals(
       "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
       expectedRegions, regions.size());
@@ -296,7 +298,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
   }
 
   private void testTruncateTable(final TableName tableName, boolean preserveSplits)
-      throws Exception {
+    throws Exception {
     byte[][] splitKeys = new byte[2][];
     splitKeys[0] = Bytes.toBytes(4);
     splitKeys[1] = Bytes.toBytes(8);
@@ -337,8 +339,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     testCloneTableSchema(tableName, newTableName, true);
   }
 
-  private void testCloneTableSchema(final TableName tableName,
-      final TableName newTableName, boolean preserveSplits) throws Exception {
+  private void testCloneTableSchema(final TableName tableName, final TableName newTableName,
+    boolean preserveSplits) throws Exception {
     byte[][] splitKeys = new byte[2][];
     splitKeys[0] = Bytes.toBytes(4);
     splitKeys[1] = Bytes.toBytes(8);
@@ -349,20 +351,16 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     boolean BLOCK_CACHE = false;
 
     // Create the table
-    TableDescriptor tableDesc = TableDescriptorBuilder
-        .newBuilder(tableName)
-        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_0))
-        .setColumnFamily(ColumnFamilyDescriptorBuilder
-            .newBuilder(FAMILY_1)
-            .setBlocksize(BLOCK_SIZE)
-            .setBlockCacheEnabled(BLOCK_CACHE)
-            .setTimeToLive(TTL)
-            .build()).build();
+    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_0))
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).setBlocksize(BLOCK_SIZE)
+        .setBlockCacheEnabled(BLOCK_CACHE).setTimeToLive(TTL).build())
+      .build();
     admin.createTable(tableDesc, splitKeys).join();
 
     assertEquals(NUM_REGIONS, TEST_UTIL.getHBaseCluster().getRegions(tableName).size());
     assertTrue("Table should be created with splitKyes + 1 rows in META",
-        admin.isTableAvailable(tableName).get());
+      admin.isTableAvailable(tableName).get());
 
     // Clone & Verify
     admin.cloneTableSchema(tableName, newTableName, preserveSplits).join();
@@ -377,7 +375,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     if (preserveSplits) {
       assertEquals(NUM_REGIONS, TEST_UTIL.getHBaseCluster().getRegions(newTableName).size());
       assertTrue("New table should be created with splitKyes + 1 rows in META",
-          admin.isTableAvailable(newTableName).get());
+        admin.isTableAvailable(newTableName).get());
     } else {
       assertEquals(1, TEST_UTIL.getHBaseCluster().getRegions(newTableName).size());
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java
index 4a71baf..793d159 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi3.java
@@ -46,15 +46,15 @@ import org.junit.runners.Parameterized;
 
 /**
  * Class to test asynchronous table admin operations.
- * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our
- * ten minute timeout so they were split.
+ * @see TestAsyncTableAdminApi2 This test and it used to be joined it was taking longer than our ten
+ *      minute timeout so they were split.
  */
 @RunWith(Parameterized.class)
 @Category({ LargeTests.class, ClientTests.class })
 public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncTableAdminApi3.class);
+    HBaseClassTestRule.forClass(TestAsyncTableAdminApi3.class);
 
   @Test
   public void testTableExist() throws Exception {
@@ -122,7 +122,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
     assertEquals(tables.length + 1, size);
     for (int i = 0, j = 0; i < tables.length && j < size; i++, j++) {
       assertTrue("tableName should be equal in order",
-          tableDescs.get(j).getTableName().equals(tables[i]));
+        tableDescs.get(j).getTableName().equals(tables[i]));
     }
     assertTrue(tableDescs.get(size - 1).getTableName().equals(TableName.META_TABLE_NAME));
 
@@ -166,7 +166,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
 
     this.admin.disableTable(tableName).join();
     assertTrue("Table must be disabled.", TEST_UTIL.getHBaseCluster().getMaster()
-        .getTableStateManager().isTableState(tableName, TableState.State.DISABLED));
+      .getTableStateManager().isTableState(tableName, TableState.State.DISABLED));
     assertEquals(TableState.State.DISABLED, TestAsyncTableAdminApi.getStateFromMeta(tableName));
 
     // Test that table is disabled
@@ -188,7 +188,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
     assertTrue(ok);
     this.admin.enableTable(tableName).join();
     assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
-        .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
+      .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
     assertEquals(TableState.State.ENABLED, TestAsyncTableAdminApi.getStateFromMeta(tableName));
 
     // Test that table is enabled
@@ -230,7 +230,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
     table2.get(get).get();
 
     admin.listTableNames(Pattern.compile(tableName.getNameAsString() + ".*"), false).get()
-        .forEach(t -> admin.disableTable(t).join());
+      .forEach(t -> admin.disableTable(t).join());
 
     // Test that tables are disabled
     get = new Get(row);
@@ -254,7 +254,7 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
     assertEquals(TableState.State.DISABLED, TestAsyncTableAdminApi.getStateFromMeta(tableName2));
 
     admin.listTableNames(Pattern.compile(tableName.getNameAsString() + ".*"), false).get()
-        .forEach(t -> admin.enableTable(t).join());
+      .forEach(t -> admin.enableTable(t).join());
 
     // Test that tables are enabled
     try {
@@ -281,8 +281,8 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
     createTableWithDefaultConf(tableName, splitKeys);
 
     AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME);
-    List<HRegionLocation> regions = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+    List<HRegionLocation> regions =
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
     assertEquals(
       "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
       expectedRegions, regions.size());
@@ -292,8 +292,8 @@ public class TestAsyncTableAdminApi3 extends TestAsyncAdminBase {
     // Enable table, use retain assignment to assign regions.
     admin.enableTable(tableName).join();
 
-    List<HRegionLocation> regions2 = ClientMetaTableAccessor
-      .getTableHRegionLocations(metaTable, tableName).get();
+    List<HRegionLocation> regions2 =
+      ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName, true).get();
     // Check the assignment.
     assertEquals(regions.size(), regions2.size());
     assertTrue(regions2.containsAll(regions));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
index 245d755..ff186f5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.junit.Assert.assertNotNull;
 
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.TableName;
@@ -47,18 +48,19 @@ public class TestAsyncTableLocatePrefetch {
 
   private static byte[] FAMILY = Bytes.toBytes("cf");
 
-  private static AsyncConnection CONN;
+  private static AsyncConnectionImpl CONN;
 
-  private static AsyncNonMetaRegionLocator LOCATOR;
+  private static AsyncRegionLocator LOCATOR;
 
   @BeforeClass
   public static void setUp() throws Exception {
-    TEST_UTIL.getConfiguration().setInt(AsyncNonMetaRegionLocator.LOCATE_PREFETCH_LIMIT, 100);
+    TEST_UTIL.getConfiguration().setInt(AsyncRegionLocator.LOCATE_PREFETCH_LIMIT, 100);
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
-    LOCATOR = new AsyncNonMetaRegionLocator((AsyncConnectionImpl) CONN);
+    CONN = (AsyncConnectionImpl) ConnectionFactory
+      .createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+    LOCATOR = new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER);
   }
 
   @AfterClass
@@ -70,7 +72,7 @@ public class TestAsyncTableLocatePrefetch {
   @Test
   public void test() throws InterruptedException, ExecutionException {
     assertNotNull(LOCATOR.getRegionLocations(TABLE_NAME, Bytes.toBytes("zzz"),
-      RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get());
+      RegionLocateType.CURRENT, false, TimeUnit.MINUTES.toNanos(1)).get());
     // we finish the request before we adding the remaining results to cache so sleep a bit here
     Thread.sleep(1000);
     // confirm that the locations of all the regions have been cached.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
index 6c538f5..77ee820 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java
@@ -84,8 +84,7 @@ public class TestAsyncTableRSCrashPublish {
   public void test() throws IOException, ExecutionException, InterruptedException {
     Configuration conf = UTIL.getHBaseCluster().getMaster().getConfiguration();
     try (AsyncConnection connection = ConnectionFactory.createAsyncConnection(conf).get()) {
-      AsyncNonMetaRegionLocator locator =
-        ((AsyncConnectionImpl) connection).getLocator().getNonMetaRegionLocator();
+      AsyncRegionLocator locator = ((AsyncConnectionImpl) connection).getLocator();
       connection.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
       ServerName serverName =
         locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
index 61bb163..6a32ff8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
@@ -92,9 +92,7 @@ public class TestAsyncTableUseMetaReplicas {
       FailPrimaryMetaScanCp.class.getName());
     UTIL.startMiniCluster(3);
     HBaseTestingUtil.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
-    try (ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf)) {
-      RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
-    }
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL);
     try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
       table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
index bebc843..4143d4a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
@@ -23,10 +23,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.security.User;
@@ -96,8 +99,9 @@ public class TestCatalogReplicaLoadBalanceSimpleSelector {
       .createSelector(replicaSelectorClass, META_TABLE_NAME, CONN, () -> {
         int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
         try {
-          RegionLocations metaLocations = CONN.registry.getMetaRegionLocations().get
-            (CONN.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
+          List<HRegionLocation> metaLocations = CONN.getRegionLocator(TableName.META_TABLE_NAME)
+            .getRegionLocations(HConstants.EMPTY_START_ROW, true)
+            .get(CONN.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
           numOfReplicas = metaLocations.size();
         } catch (Exception e) {
           LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
@@ -119,8 +123,9 @@ public class TestCatalogReplicaLoadBalanceSimpleSelector {
         replicaSelectorClass, META_TABLE_NAME, CONN, () -> {
         int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
         try {
-          RegionLocations metaLocations = CONN.registry.getMetaRegionLocations().get(
-            CONN.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
+          List<HRegionLocation> metaLocations = CONN.getRegionLocator(TableName.META_TABLE_NAME)
+            .getRegionLocations(HConstants.EMPTY_START_ROW, true)
+            .get(CONN.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
           numOfReplicas = metaLocations.size();
         } catch (Exception e) {
           LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
index e4bdff9..1044d2d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
@@ -119,7 +119,7 @@ public class TestMasterRegistry {
       try (MasterRegistry registry = new MasterRegistry(conf)) {
         // Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
         // because not all replicas had made it up before test started.
-        RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry);
+        RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
         assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
         assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
         List<HRegionLocation> metaLocations =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
index 2197a21..d79f8ca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
@@ -49,11 +49,11 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 
-@Category({SmallTests.class, MasterTests.class })
+@Category({ SmallTests.class, MasterTests.class })
 public class TestMetaRegionLocationCache {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestMetaRegionLocationCache.class);
+    HBaseClassTestRule.forClass(TestMetaRegionLocationCache.class);
 
   private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
   private static ConnectionRegistry REGISTRY;
@@ -63,7 +63,7 @@ public class TestMetaRegionLocationCache {
     TEST_UTIL.startMiniCluster(3);
     HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
     REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
-    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
   }
 
@@ -75,7 +75,7 @@ public class TestMetaRegionLocationCache {
 
   private List<HRegionLocation> getCurrentMetaLocations(ZKWatcher zk) throws Exception {
     List<HRegionLocation> result = new ArrayList<>();
-    for (String znode: zk.getMetaReplicaNodes()) {
+    for (String znode : zk.getMetaReplicaNodes()) {
       String path = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, znode);
       int replicaId = zk.getZNodePaths().getMetaReplicaIdFromPath(path);
       RegionState state = MetaTableLocator.getMetaRegionState(zk, replicaId);
@@ -95,7 +95,7 @@ public class TestMetaRegionLocationCache {
       }
     }
     List<HRegionLocation> metaHRLs =
-        master.getMetaRegionLocationCache().getMetaRegionLocations().get();
+      master.getMetaRegionLocationCache().getMetaRegionLocations().get();
     assertFalse(metaHRLs.isEmpty());
     ZKWatcher zk = master.getZooKeeper();
     List<String> metaZnodes = zk.getMetaReplicaNodes();
@@ -115,11 +115,13 @@ public class TestMetaRegionLocationCache {
     assertEquals(actualHRLs, metaHRLs);
   }
 
-  @Test public void testInitialMetaLocations() throws Exception {
+  @Test
+  public void testInitialMetaLocations() throws Exception {
     verifyCachedMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster());
   }
 
-  @Test public void testStandByMetaLocations() throws Exception {
+  @Test
+  public void testStandByMetaLocations() throws Exception {
     HMaster standBy = TEST_UTIL.getMiniHBaseCluster().startMaster().getMaster();
     standBy.isInitialized();
     verifyCachedMetaLocations(standBy);
@@ -128,16 +130,17 @@ public class TestMetaRegionLocationCache {
   /*
    * Shuffles the meta region replicas around the cluster and makes sure the cache is not stale.
    */
-  @Test public void testMetaLocationsChange() throws Exception {
+  @Test
+  public void testMetaLocationsChange() throws Exception {
     List<HRegionLocation> currentMetaLocs =
-        getCurrentMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeper());
+      getCurrentMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeper());
     // Move these replicas to random servers.
-    for (HRegionLocation location: currentMetaLocs) {
+    for (HRegionLocation location : currentMetaLocs) {
       RegionReplicaTestHelper.moveRegion(TEST_UTIL, location);
     }
-    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
-    for (JVMClusterUtil.MasterThread masterThread:
-        TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
+    for (JVMClusterUtil.MasterThread masterThread : TEST_UTIL.getMiniHBaseCluster()
+      .getMasterThreads()) {
       verifyCachedMetaLocations(masterThread.getMaster());
     }
   }
@@ -146,7 +149,8 @@ public class TestMetaRegionLocationCache {
    * Tests MetaRegionLocationCache's init procedure to make sure that it correctly watches the base
    * znode for notifications.
    */
-  @Test public void testMetaRegionLocationCache() throws Exception {
+  @Test
+  public void testMetaRegionLocationCache() throws Exception {
     final String parentZnodeName = "/randomznodename";
     Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parentZnodeName);
@@ -156,7 +160,8 @@ public class TestMetaRegionLocationCache {
       // some ZK activity in the background.
       MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
       ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) {
-        @Override public void doAnAction() throws Exception {
+        @Override
+        public void doAnAction() throws Exception {
           final String testZnode = parentZnodeName + "/child";
           ZKUtil.createNodeIfNotExistsAndWatch(zkWatcher, testZnode, testZnode.getBytes());
           ZKUtil.deleteNode(zkWatcher, testZnode);
@@ -176,8 +181,8 @@ public class TestMetaRegionLocationCache {
         // Wait until the meta cache is populated.
         int iters = 0;
         while (iters++ < 10) {
-          if (metaCache.getMetaRegionLocations().isPresent()
-            && metaCache.getMetaRegionLocations().get().size() == 3) {
+          if (metaCache.getMetaRegionLocations().isPresent() &&
+            metaCache.getMetaRegionLocations().get().size() == 3) {
             break;
           }
           Thread.sleep(1000);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java
index e7c872d..ba5c9d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicasShutdownHandling.java
@@ -118,6 +118,10 @@ public class TestMetaWithReplicasShutdownHandling extends MetaWithReplicasTestBa
           Thread.sleep(
             conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 30000) * 3);
         }
+        // cache the location for all the meta regions.
+        try (RegionLocator locator = c.getRegionLocator(TableName.META_TABLE_NAME)) {
+          locator.getAllRegionLocations();
+        }
         // Ensure all metas are not on same hbase:meta replica=0 server!
 
         master = util.getHBaseClusterInterface().getClusterMetrics().getMasterName();
@@ -131,7 +135,6 @@ public class TestMetaWithReplicasShutdownHandling extends MetaWithReplicasTestBa
           util.getHBaseClusterInterface().killRegionServer(primary);
           util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000);
         }
-        c.clearRegionLocationCache();
       }
       LOG.info("Running GETs");
       try (Table htable = c.getTable(TABLE)) {
@@ -150,15 +153,15 @@ public class TestMetaWithReplicasShutdownHandling extends MetaWithReplicasTestBa
         util.getHBaseClusterInterface().startRegionServer(primary.getHostname(), 0);
         util.getHBaseClusterInterface().waitForActiveAndReadyMaster();
         LOG.info("Master active!");
-        c.clearRegionLocationCache();
       }
     }
     conf.setBoolean(HConstants.USE_META_REPLICAS, false);
     LOG.info("Running GETs no replicas");
-    try (Connection c = ConnectionFactory.createConnection(conf);
-      Table htable = c.getTable(TABLE)) {
-      Result r = htable.get(new Get(row));
-      assertArrayEquals(row, r.getRow());
+    try (Connection c = ConnectionFactory.createConnection(conf)) {
+      try (Table htable = c.getTable(TABLE)) {
+        Result r = htable.get(new Get(row));
+        assertArrayEquals(r.getRow(), row);
+      }
     }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 4828cea..6bf61ef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -210,7 +210,7 @@ public class TestReplicasClient {
 
     // No master
     LOG.info("Master is going to be stopped");
-    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
     Configuration c = new Configuration(HTU.getConfiguration());
     c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
     LOG.info("Master has stopped");
@@ -224,7 +224,9 @@ public class TestReplicasClient {
 
   @Before
   public void before() throws IOException {
-    HTU.getConnection().clearRegionLocationCache();
+    try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME)) {
+      locator.clearRegionLocationCache();
+    }
     try {
       openRegion(hriPrimary);
     } catch (Exception ignored) {
@@ -246,7 +248,6 @@ public class TestReplicasClient {
       closeRegion(hriPrimary);
     } catch (Exception ignored) {
     }
-    HTU.getConnection().clearRegionLocationCache();
   }
 
   private HRegionServer getRS() {
@@ -325,16 +326,15 @@ public class TestReplicasClient {
     byte[] b1 = Bytes.toBytes("testLocations");
     openRegion(hriSecondary);
 
-    try (Connection conn = ConnectionFactory.createConnection(HTU.getConfiguration());
-        RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
-      conn.clearRegionLocationCache();
+    try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME)) {
+      locator.clearRegionLocationCache();
       List<HRegionLocation> rl = locator.getRegionLocations(b1, true);
       Assert.assertEquals(2, rl.size());
 
       rl = locator.getRegionLocations(b1, false);
       Assert.assertEquals(2, rl.size());
 
-      conn.clearRegionLocationCache();
+      locator.clearRegionLocationCache();
       rl = locator.getRegionLocations(b1, false);
       Assert.assertEquals(2, rl.size());
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
index 4894c52..c00dd39 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java
@@ -83,8 +83,7 @@ public class TestZKConnectionRegistry {
       clusterId);
     assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
       REGISTRY.getActiveMaster().get());
-    RegionReplicaTestHelper
-      .waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL);
     RegionLocations locs = REGISTRY.getMetaRegionLocations().get();
     assertEquals(3, locs.getRegionLocations().length);
     IntStream.range(0, 3).forEach(i -> {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 933addf..21dd128 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
@@ -520,4 +521,9 @@ public class MockNoopMasterServices implements MasterServices {
   public MetaLocationSyncer getMetaLocationSyncer() {
     return null;
   }
+
+  public List<RegionLocations> getAllMetaRegionLocations(boolean excludeOfflinedSplitParents)
+    throws IOException {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
index 9e0333b..d7b4eda 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.ClusterMetrics;
@@ -28,12 +29,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
 import org.apache.hadoop.hbase.StartTestingClusterOption;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -166,54 +164,5 @@ public class TestMasterFailover {
       TEST_UTIL.shutdownMiniCluster();
     }
   }
-
-  /**
-   * Test meta in transition when master failover.
-   * This test used to manipulate region state up in zk. That is not allowed any more in hbase2
-   * so I removed that messing. That makes this test anemic.
-   */
-  @Test
-  public void testMetaInTransitionWhenMasterFailover() throws Exception {
-    // Start the cluster
-    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
-    TEST_UTIL.startMiniCluster();
-    try {
-      SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-      LOG.info("Cluster started");
-
-      HMaster activeMaster = cluster.getMaster();
-      ServerName metaServerName = cluster.getServerHoldingMeta();
-      HRegionServer hrs = cluster.getRegionServer(metaServerName);
-
-      // Now kill master, meta should remain on rs, where we placed it before.
-      LOG.info("Aborting master");
-      activeMaster.abort("test-kill");
-      cluster.waitForMasterToStop(activeMaster.getServerName(), 30000);
-      LOG.info("Master has aborted");
-
-      // meta should remain where it was
-      RegionState metaState = MetaTableLocator.getMetaRegionState(hrs.getZooKeeper());
-      assertEquals("hbase:meta should be online on RS",
-          metaState.getServerName(), metaServerName);
-      assertEquals("hbase:meta should be online on RS", State.OPEN, metaState.getState());
-
-      // Start up a new master
-      LOG.info("Starting up a new master");
-      activeMaster = cluster.startMaster().getMaster();
-      LOG.info("Waiting for master to be ready");
-      cluster.waitForActiveAndReadyMaster();
-      LOG.info("Master is ready");
-
-      // ensure meta is still deployed on RS
-      metaState = MetaTableLocator.getMetaRegionState(activeMaster.getZooKeeper());
-      assertEquals("hbase:meta should be online on RS",
-          metaState.getServerName(), metaServerName);
-      assertEquals("hbase:meta should be online on RS", State.OPEN, metaState.getState());
-
-      // Done, shutdown the cluster
-    } finally {
-      TEST_UTIL.shutdownMiniCluster();
-    }
-  }
 }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java
index 6ad4f08..b216eb4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaAssignmentWithStopMaster.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.ServerName;
@@ -90,7 +91,20 @@ public class TestMetaAssignmentWithStopMaster {
         }
       }
 
-      ServerName newMetaServer = locator.getAllRegionLocations().get(0).getServerName();
+      ServerName newMetaServer;
+      startTime = System.currentTimeMillis();
+      for (;;) {
+        try {
+          newMetaServer = locator.getAllRegionLocations().get(0).getServerName();
+          break;
+        } catch (IOException e) {
+          LOG.warn("failed to get all locations, retry...", e);
+        }
+        Thread.sleep(3000);
+        if (System.currentTimeMillis() - startTime > WAIT_TIMEOUT) {
+          fail("Wait too long for getting the new meta location");
+        }
+      }
       assertTrue("The new meta server " + newMetaServer + " should be same with" +
         " the old meta server " + oldMetaServer, newMetaServer.equals(oldMetaServer));
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
index cf35ae2..29bafe3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.zookeeper.KeeperException;
@@ -100,8 +98,6 @@ public class TestMetaShutdownHandler {
       metaServerName =
         regionStates.getRegionServerOfRegion(RegionInfoBuilder.FIRST_META_REGIONINFO);
     }
-    RegionState metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper());
-    assertEquals("Wrong state for meta!", RegionState.State.OPEN, metaState.getState());
     assertNotEquals("Meta is on master!", metaServerName, master.getServerName());
     HRegionServer metaRegionServer = cluster.getRegionServer(metaServerName);
 
@@ -129,11 +125,9 @@ public class TestMetaShutdownHandler {
     assertTrue("Meta should be assigned",
       regionStates.isRegionOnline(RegionInfoBuilder.FIRST_META_REGIONINFO));
     // Now, make sure meta is registered in zk
-    metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper());
-    assertEquals("Meta should not be in transition", RegionState.State.OPEN, metaState.getState());
-    assertEquals("Meta should be assigned", metaState.getServerName(),
-      regionStates.getRegionServerOfRegion(RegionInfoBuilder.FIRST_META_REGIONINFO));
-    assertNotEquals("Meta should be assigned on a different server", metaState.getServerName(),
+    ServerName newMetaServerName =
+      regionStates.getRegionServerOfRegion(RegionInfoBuilder.FIRST_META_REGIONINFO);
+    assertNotEquals("Meta should be assigned on a different server", newMetaServerName,
       metaServerName);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java
index f308a71..b1f5491 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionReplicaSplit.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.master.assignment;
 
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
@@ -50,15 +50,12 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Category({ RegionServerTests.class, MediumTests.class })
 public class TestRegionReplicaSplit {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestRegionReplicaSplit.class);
-  private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicaSplit.class);
 
   private static final int NB_SERVERS = 4;
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
index a4a3f86..0ea4f75 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java
@@ -165,7 +165,7 @@ public class TestCompactionLifeCycleTracker {
                         .setValue(Bytes.toBytes(i))
                         .build()));
       }
-      UTIL.getAdmin().flush(NAME);
+      UTIL.flush(NAME);
       for (int i = 100; i < 200; i++) {
         byte[] row = Bytes.toBytes(i);
         table.put(new Put(row)
@@ -178,7 +178,7 @@ public class TestCompactionLifeCycleTracker {
                         .setValue(Bytes.toBytes(i))
                         .build()));
       }
-      UTIL.getAdmin().flush(NAME);
+      UTIL.flush(NAME);
     }
     region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
     assertEquals(2, region.getStore(CF1).getStorefilesCount());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
index 61f8fe8..f7af83b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
@@ -111,7 +111,7 @@ public class TestRegionReplicas {
     hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
 
     // No master
-    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
   }
 
   @AfterClass
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index 4260b1d..71e92fb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -35,10 +34,9 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -90,15 +88,24 @@ public class TestRegionServerNoMaster {
     }
     regionName = hri.getRegionName();
 
-    stopMasterAndAssignMeta(HTU);
+    stopMasterAndCacheMetaLocation(HTU);
   }
 
-  public static void stopMasterAndAssignMeta(HBaseTestingUtil HTU)
-      throws IOException, InterruptedException {
+  public static void stopMasterAndCacheMetaLocation(HBaseTestingUtil HTU)
+    throws IOException, InterruptedException {
+    // cache meta location, so we will not go to master to lookup meta region location
+    for (JVMClusterUtil.RegionServerThread t : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
+      try (RegionLocator locator =
+        t.getRegionServer().getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+        locator.getAllRegionLocations();
+      }
+    }
+    try (RegionLocator locator = HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+      locator.getAllRegionLocations();
+    }
     // Stop master
     HMaster master = HTU.getHBaseCluster().getMaster();
     Thread masterThread = HTU.getHBaseCluster().getMasterThread();
-    ServerName masterAddr = master.getServerName();
     master.stopMaster();
 
     LOG.info("Waiting until master thread exits");
@@ -107,27 +114,6 @@ public class TestRegionServerNoMaster {
     }
 
     HRegionServer.TEST_SKIP_REPORTING_TRANSITION = true;
-    // Master is down, so is the meta. We need to assign it somewhere
-    // so that regions can be assigned during the mocking phase.
-    HRegionServer hrs = HTU.getHBaseCluster()
-      .getLiveRegionServerThreads().get(0).getRegionServer();
-    ZKWatcher zkw = hrs.getZooKeeper();
-    ServerName sn = MetaTableLocator.getMetaRegionLocation(zkw);
-    if (sn != null && !masterAddr.equals(sn)) {
-      return;
-    }
-
-    ProtobufUtil.openRegion(null, hrs.getRSRpcServices(),
-      hrs.getServerName(), RegionInfoBuilder.FIRST_META_REGIONINFO);
-    while (true) {
-      sn = MetaTableLocator.getMetaRegionLocation(zkw);
-      if (sn != null && sn.equals(hrs.getServerName())
-          && hrs.getOnlineRegions().containsKey(
-            RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName())) {
-        break;
-      }
-      Thread.sleep(100);
-    }
   }
 
   /** Flush the given region in the mini cluster. Since no master, we cannot use HBaseAdmin.flush() */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index 74de90b..7f56095 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.StartTestingClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionLocator;
@@ -55,7 +54,6 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -133,8 +131,12 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
     // mock a secondary region info to open
     hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
 
+    // cache the location for meta regions
+    try (RegionLocator locator = HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
+      locator.getAllRegionLocations();
+    }
     // No master
-    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
     rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
     rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
   }
@@ -186,11 +188,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
     HTU.loadNumericRows(table, f, 0, 1000);
 
     Assert.assertEquals(1000, entries.size());
-    try (AsyncClusterConnection conn = ClusterConnectionFactory
-      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
-      // replay the edits to the secondary using replay callable
-      replicateUsingCallable(conn, entries);
-    }
+    AsyncClusterConnection conn = HTU.getAsyncConnection();
+    // replay the edits to the secondary using replay callable
+    replicateUsingCallable(conn, entries);
 
     Region region = rs0.getRegion(hriSecondary.getEncodedName());
     HTU.verifyNumericRows(region, f, 0, 1000);
@@ -216,36 +216,34 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
   public void testReplayCallableWithRegionMove() throws Exception {
     // tests replaying the edits to a secondary region replica using the Callable directly while
     // the region is moved to another location.It tests handling of RME.
-    try (AsyncClusterConnection conn = ClusterConnectionFactory
-      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
-      openRegion(HTU, rs0, hriSecondary);
-      // load some data to primary
-      HTU.loadNumericRows(table, f, 0, 1000);
+    AsyncClusterConnection conn = HTU.getAsyncConnection();
+    openRegion(HTU, rs0, hriSecondary);
+    // load some data to primary
+    HTU.loadNumericRows(table, f, 0, 1000);
 
-      Assert.assertEquals(1000, entries.size());
+    Assert.assertEquals(1000, entries.size());
 
-      // replay the edits to the secondary using replay callable
-      replicateUsingCallable(conn, entries);
+    // replay the edits to the secondary using replay callable
+    replicateUsingCallable(conn, entries);
 
-      Region region = rs0.getRegion(hriSecondary.getEncodedName());
-      HTU.verifyNumericRows(region, f, 0, 1000);
+    Region region = rs0.getRegion(hriSecondary.getEncodedName());
+    HTU.verifyNumericRows(region, f, 0, 1000);
 
-      HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
+    HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
 
-      // move the secondary region from RS0 to RS1
-      closeRegion(HTU, rs0, hriSecondary);
-      openRegion(HTU, rs1, hriSecondary);
+    // move the secondary region from RS0 to RS1
+    closeRegion(HTU, rs0, hriSecondary);
+    openRegion(HTU, rs1, hriSecondary);
 
-      // replicate the new data
-      replicateUsingCallable(conn, entries);
+    // replicate the new data
+    replicateUsingCallable(conn, entries);
 
-      region = rs1.getRegion(hriSecondary.getEncodedName());
-      // verify the new data. old data may or may not be there
-      HTU.verifyNumericRows(region, f, 1000, 2000);
+    region = rs1.getRegion(hriSecondary.getEncodedName());
+    // verify the new data. old data may or may not be there
+    HTU.verifyNumericRows(region, f, 1000, 2000);
 
-      HTU.deleteNumericRows(table, f, 0, 2000);
-      closeRegion(HTU, rs1, hriSecondary);
-    }
+    HTU.deleteNumericRows(table, f, 0, 2000);
+    closeRegion(HTU, rs1, hriSecondary);
   }
 
   @Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
index 90fe71e..59ca67d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckEncryption.java
@@ -129,7 +129,7 @@ public class TestHBaseFsckEncryption {
       table.close();
     }
     // Flush it
-    TEST_UTIL.getAdmin().flush(tableDescriptor.getTableName());
+    TEST_UTIL.flush(tableDescriptor.getTableName());
 
     // Verify we have encrypted store files on disk
     final List<Path> paths = findStorefilePaths(tableDescriptor.getTableName());
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
index 807bf6f..82d4621 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
@@ -17,24 +17,16 @@
  */
 package org.apache.hadoop.hbase.zookeeper;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaRegionServer;
 
@@ -47,9 +39,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaReg
  * <p/>
  * Meta region location is set by <code>RegionServerServices</code>. This class doesn't use ZK
  * watchers, rather accesses ZK directly.
- * <p/>
- * TODO: rewrite using RPC calls to master to find out about hbase:meta.
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. Now we store the meta location in the local
+ *             store of master, the location on zk is only a mirror of the first meta region to keep
+ *             compatibility.
  */
+@Deprecated
 @InterfaceAudience.Private
 public final class MetaTableLocator {
   private static final Logger LOG = LoggerFactory.getLogger(MetaTableLocator.class);
@@ -58,166 +52,16 @@ public final class MetaTableLocator {
   }
 
   /**
-   * @param zkw ZooKeeper watcher to be used
-   * @return meta table regions and their locations.
-   */
-  public static List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw) {
-    return getMetaRegionsAndLocations(zkw, RegionInfo.DEFAULT_REPLICA_ID);
-  }
-
-  /**
-   * Gets the meta regions and their locations for the given path and replica ID.
-   *
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
-   * @param replicaId the ID of the replica
-   * @return meta table regions and their locations.
-   */
-  public static List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw,
-      int replicaId) {
-    ServerName serverName = getMetaRegionLocation(zkw, replicaId);
-    List<Pair<RegionInfo, ServerName>> list = new ArrayList<>(1);
-    list.add(new Pair<>(RegionReplicaUtil.getRegionInfoForReplica(
-        RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId), serverName));
-    return list;
-  }
-
-  /**
-   * Gets the meta regions for the given path with the default replica ID.
-   *
-   * @param zkw ZooKeeper watcher to be used
-   * @return List of meta regions
-   */
-  public static List<RegionInfo> getMetaRegions(ZKWatcher zkw) {
-    return getMetaRegions(zkw, RegionInfo.DEFAULT_REPLICA_ID);
-  }
-
-  /**
-   * Gets the meta regions for the given path and replica ID.
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
-   * @param replicaId the ID of the replica
-   * @return List of meta regions
-   */
-  public static List<RegionInfo> getMetaRegions(ZKWatcher zkw, int replicaId) {
-    List<Pair<RegionInfo, ServerName>> result;
-    result = getMetaRegionsAndLocations(zkw, replicaId);
-    return getListOfRegionInfos(result);
-  }
-
-  private static List<RegionInfo> getListOfRegionInfos(
-      final List<Pair<RegionInfo, ServerName>> pairs) {
-    if (pairs == null || pairs.isEmpty()) {
-      return Collections.emptyList();
-    }
-
-    List<RegionInfo> result = new ArrayList<>(pairs.size());
-    for (Pair<RegionInfo, ServerName> pair : pairs) {
-      result.add(pair.getFirst());
-    }
-    return result;
-  }
-
-  /**
-   * Gets the meta region location, if available.  Does not block.
-   * @param zkw zookeeper connection to use
-   * @return server name or null if we failed to get the data.
-   */
-  public static ServerName getMetaRegionLocation(final ZKWatcher zkw) {
-    try {
-      RegionState state = getMetaRegionState(zkw);
-      return state.isOpened() ? state.getServerName() : null;
-    } catch (KeeperException ke) {
-      return null;
-    }
-  }
-
-  /**
-   * Gets the meta region location, if available.  Does not block.
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
-   * @param replicaId the ID of the replica
-   * @return server name
-   */
-  public static ServerName getMetaRegionLocation(final ZKWatcher zkw, int replicaId) {
-    try {
-      RegionState state = getMetaRegionState(zkw, replicaId);
-      return state.isOpened() ? state.getServerName() : null;
-    } catch (KeeperException ke) {
-      return null;
-    }
-  }
-
-  /**
-   * Gets the meta region location, if available, and waits for up to the specified timeout if not
-   * immediately available. Given the zookeeper notification could be delayed, we will try to get
-   * the latest data.
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
-   * @param timeout maximum time to wait, in millis
-   * @return server name for server hosting meta region formatted as per {@link ServerName}, or null
-   *         if none available
-   * @throws InterruptedException if interrupted while waiting
-   * @throws NotAllMetaRegionsOnlineException if a meta or root region is not online
-   */
-  public static ServerName waitMetaRegionLocation(ZKWatcher zkw, long timeout)
-      throws InterruptedException, NotAllMetaRegionsOnlineException {
-    return waitMetaRegionLocation(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout);
-  }
-
-  /**
-   * Gets the meta region location, if available, and waits for up to the specified timeout if not
-   * immediately available. Given the zookeeper notification could be delayed, we will try to get
-   * the latest data.
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
-   * @param replicaId the ID of the replica
-   * @param timeout maximum time to wait, in millis
-   * @return server name for server hosting meta region formatted as per {@link ServerName}, or null
-   *         if none available
-   * @throws InterruptedException if waiting for the socket operation fails
-   * @throws NotAllMetaRegionsOnlineException if a meta or root region is not online
-   */
-  public static ServerName waitMetaRegionLocation(ZKWatcher zkw, int replicaId, long timeout)
-      throws InterruptedException, NotAllMetaRegionsOnlineException {
-    try {
-      if (ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) == -1) {
-        String errorMsg = "Check the value configured in 'zookeeper.znode.parent'. " +
-          "There could be a mismatch with the one configured in the master.";
-        LOG.error(errorMsg);
-        throw new IllegalArgumentException(errorMsg);
-      }
-    } catch (KeeperException e) {
-      throw new IllegalStateException("KeeperException while trying to check baseZNode:", e);
-    }
-    ServerName sn = blockUntilAvailable(zkw, replicaId, timeout);
-
-    if (sn == null) {
-      throw new NotAllMetaRegionsOnlineException("Timed out; " + timeout + "ms");
-    }
-
-    return sn;
-  }
-
-  /**
-   * Sets the location of <code>hbase:meta</code> in ZooKeeper to the
-   * specified server address.
-   * @param zookeeper zookeeper reference
-   * @param serverName The server hosting <code>hbase:meta</code>
-   * @param state The region transition state
-   * @throws KeeperException unexpected zookeeper exception
-   */
-  public static void setMetaLocation(ZKWatcher zookeeper,
-      ServerName serverName, RegionState.State state) throws KeeperException {
-    setMetaLocation(zookeeper, serverName, RegionInfo.DEFAULT_REPLICA_ID, state);
-  }
-
-  /**
    * Sets the location of <code>hbase:meta</code> in ZooKeeper to the specified server address.
    * @param zookeeper reference to the {@link ZKWatcher} which also contains configuration and
-   *                  operation
+   *          operation
    * @param serverName the name of the server
    * @param replicaId the ID of the replica
    * @param state the state of the region
    * @throws KeeperException if a ZooKeeper operation fails
    */
   public static void setMetaLocation(ZKWatcher zookeeper, ServerName serverName, int replicaId,
-      RegionState.State state) throws KeeperException {
+    RegionState.State state) throws KeeperException {
     if (serverName == null) {
       LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
       return;
@@ -226,15 +70,13 @@ public final class MetaTableLocator {
       serverName, state);
     // Make the MetaRegionServer pb and then get its bytes and save this as
     // the znode content.
-    MetaRegionServer pbrsr = MetaRegionServer.newBuilder()
-      .setServer(ProtobufUtil.toServerName(serverName))
-      .setRpcVersion(HConstants.RPC_CURRENT_VERSION)
-      .setState(state.convert()).build();
+    MetaRegionServer pbrsr =
+      MetaRegionServer.newBuilder().setServer(ProtobufUtil.toServerName(serverName))
+        .setRpcVersion(HConstants.RPC_CURRENT_VERSION).setState(state.convert()).build();
     byte[] data = ProtobufUtil.prependPBMagic(pbrsr.toByteArray());
     try {
-      ZKUtil.setData(zookeeper,
-          zookeeper.getZNodePaths().getZNodeForReplica(replicaId), data);
-    } catch(KeeperException.NoNodeException nne) {
+      ZKUtil.setData(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId), data);
+    } catch (KeeperException.NoNodeException nne) {
       if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
         LOG.debug("hbase:meta region location doesn't exist, create it");
       } else {
@@ -242,27 +84,19 @@ public final class MetaTableLocator {
             ", create it");
       }
       ZKUtil.createAndWatch(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId),
-              data);
+        data);
     }
   }
 
   /**
-   * Load the meta region state from the meta server ZNode.
-   */
-  public static RegionState getMetaRegionState(ZKWatcher zkw) throws KeeperException {
-    return getMetaRegionState(zkw, RegionInfo.DEFAULT_REPLICA_ID);
-  }
-
-  /**
    * Load the meta region state from the meta region server ZNode.
-   *
    * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
    * @param replicaId the ID of the replica
    * @return regionstate
    * @throws KeeperException if a ZooKeeper operation fails
    */
   public static RegionState getMetaRegionState(ZKWatcher zkw, int replicaId)
-      throws KeeperException {
+    throws KeeperException {
     RegionState regionState = null;
     try {
       byte[] data = ZKUtil.getData(zkw, zkw.getZNodePaths().getZNodeForReplica(replicaId));
@@ -274,110 +108,4 @@ public final class MetaTableLocator {
     }
     return regionState;
   }
-
-  /**
-   * Deletes the location of <code>hbase:meta</code> in ZooKeeper.
-   * @param zookeeper zookeeper reference
-   * @throws KeeperException unexpected zookeeper exception
-   */
-  public static void deleteMetaLocation(ZKWatcher zookeeper)
-    throws KeeperException {
-    deleteMetaLocation(zookeeper, RegionInfo.DEFAULT_REPLICA_ID);
-  }
-
-  public static void deleteMetaLocation(ZKWatcher zookeeper, int replicaId)
-    throws KeeperException {
-    if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
-      LOG.info("Deleting hbase:meta region location in ZooKeeper");
-    } else {
-      LOG.info("Deleting hbase:meta for {} region location in ZooKeeper", replicaId);
-    }
-    try {
-      // Just delete the node.  Don't need any watches.
-      ZKUtil.deleteNode(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId));
-    } catch(KeeperException.NoNodeException nne) {
-      // Has already been deleted
-    }
-  }
-  /**
-   * Wait until the primary meta region is available. Get the secondary locations as well but don't
-   * block for those.
-   *
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
-   * @param timeout maximum time to wait in millis
-   * @param conf the {@link Configuration} to use
-   * @return ServerName or null if we timed out.
-   * @throws InterruptedException if waiting for the socket operation fails
-   */
-  public static List<ServerName> blockUntilAvailable(final ZKWatcher zkw, final long timeout,
-      Configuration conf) throws InterruptedException {
-    int numReplicasConfigured = 1;
-
-    List<ServerName> servers = new ArrayList<>();
-    // Make the blocking call first so that we do the wait to know
-    // the znodes are all in place or timeout.
-    ServerName server = blockUntilAvailable(zkw, timeout);
-
-    if (server == null) {
-      return null;
-    }
-
-    servers.add(server);
-
-    try {
-      List<String> metaReplicaNodes = zkw.getMetaReplicaNodes();
-      numReplicasConfigured = metaReplicaNodes.size();
-    } catch (KeeperException e) {
-      LOG.warn("Got ZK exception {}", e);
-    }
-    for (int replicaId = 1; replicaId < numReplicasConfigured; replicaId++) {
-      // return all replica locations for the meta
-      servers.add(getMetaRegionLocation(zkw, replicaId));
-    }
-    return servers;
-  }
-
-  /**
-   * Wait until the meta region is available and is not in transition.
-   * @param zkw zookeeper connection to use
-   * @param timeout maximum time to wait, in millis
-   * @return ServerName or null if we timed out.
-   * @throws InterruptedException if waiting for the socket operation fails
-   */
-  public static ServerName blockUntilAvailable(final ZKWatcher zkw, final long timeout)
-      throws InterruptedException {
-    return blockUntilAvailable(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout);
-  }
-
-  /**
-   * Wait until the meta region is available and is not in transition.
-   * @param zkw reference to the {@link ZKWatcher} which also contains configuration and constants
-   * @param replicaId the ID of the replica
-   * @param timeout maximum time to wait in millis
-   * @return ServerName or null if we timed out.
-   * @throws InterruptedException if waiting for the socket operation fails
-   */
-  public static ServerName blockUntilAvailable(final ZKWatcher zkw, int replicaId,
-      final long timeout) throws InterruptedException {
-    if (timeout < 0) {
-      throw new IllegalArgumentException();
-    }
-
-    if (zkw == null) {
-      throw new IllegalArgumentException();
-    }
-
-    long startTime = EnvironmentEdgeManager.currentTime();
-    ServerName sn = null;
-    while (true) {
-      sn = getMetaRegionLocation(zkw, replicaId);
-      if (sn != null ||
-        (EnvironmentEdgeManager.currentTime() - startTime) >
-          timeout - HConstants.SOCKET_RETRY_WAIT_MS) {
-        break;
-      }
-      Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
-    }
-    return sn;
-  }
 }
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index d124a9a..5bfb8bf 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -1857,13 +1857,6 @@ public final class ZKUtil {
           sb.append("\n ").append(child);
         }
       }
-      sb.append("\nRegion server holding hbase:meta:");
-      sb.append("\n ").append(MetaTableLocator.getMetaRegionLocation(zkw));
-      int numMetaReplicas = zkw.getMetaReplicaNodes().size();
-      for (int i = 1; i < numMetaReplicas; i++) {
-        sb.append("\n replica" + i + ": "
-          + MetaTableLocator.getMetaRegionLocation(zkw, i));
-      }
       sb.append("\nRegion servers:");
       final List<String> rsChildrenNoWatchList =
               listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);