You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/11/01 02:20:25 UTC

hbase git commit: HBASE-16945 Implement AsyncRegionLocator

Repository: hbase
Updated Branches:
  refs/heads/master 45a259424 -> 6cf9333e9


HBASE-16945 Implement AsyncRegionLocator


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6cf9333e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6cf9333e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6cf9333e

Branch: refs/heads/master
Commit: 6cf9333e978eb91d54935abdd6ebe9152e4ea183
Parents: 45a2594
Author: zhangduo <zh...@apache.org>
Authored: Mon Oct 31 20:39:43 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Nov 1 10:18:55 2016 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncConnectionImpl.java       |  11 +-
 .../hadoop/hbase/client/AsyncRegionLocator.java | 442 +++++++++++++++++--
 .../client/AsyncRpcRetryingCallerFactory.java   |  13 +-
 .../AsyncSingleRequestRpcRetryingCaller.java    |  40 +-
 .../client/AsyncTableRegionLocatorImpl.java     |   2 +-
 .../hadoop/hbase/util/CollectionUtils.java      |  14 +
 .../hbase/client/TestAsyncGetMultiThread.java   | 148 +++++++
 .../hbase/client/TestAsyncRegionLocator.java    | 239 ++++++++++
 ...TestAsyncSingleRequestRpcRetryingCaller.java |  22 +-
 9 files changed, 849 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 121a16b..6cad6a2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -66,7 +66,7 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   private final User user;
 
-  private final AsyncRegistry registry;
+  final AsyncRegistry registry;
 
   private final String clusterId;
 
@@ -87,15 +87,11 @@ class AsyncConnectionImpl implements AsyncConnection {
   private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
 
   @SuppressWarnings("deprecation")
-  public AsyncConnectionImpl(Configuration conf, User user) throws IOException {
+  public AsyncConnectionImpl(Configuration conf, User user) {
     this.conf = conf;
     this.user = user;
-
     this.connConf = new AsyncConnectionConfiguration(conf);
-
-    this.locator = new AsyncRegionLocator(conf);
-
-    // action below will not throw exception so no need to catch and close.
+    this.locator = new AsyncRegionLocator(this);
     this.registry = ClusterRegistryFactory.getRegistry(conf);
     this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
       if (LOG.isDebugEnabled()) {
@@ -122,7 +118,6 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   @Override
   public void close() {
-    IOUtils.closeQuietly(locator);
     IOUtils.closeQuietly(rpcClient);
     IOUtils.closeQuietly(registry);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 321fd71..ba5a0e0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -17,71 +17,437 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
+import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY;
+import static org.apache.hadoop.hbase.HConstants.NINES;
+import static org.apache.hadoop.hbase.HConstants.ZEROES;
+import static org.apache.hadoop.hbase.HRegionInfo.createRegionName;
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
+import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
+import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
 
-import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * TODO: reimplement using aync connection when the scan logic is ready. The current implementation
- * is based on the blocking client.
+ * The asynchronous region locator.
  */
 @InterfaceAudience.Private
-class AsyncRegionLocator implements Closeable {
+class AsyncRegionLocator {
 
-  private final ConnectionImplementation conn;
+  private static final Log LOG = LogFactory.getLog(AsyncRegionLocator.class);
 
-  AsyncRegionLocator(Configuration conf) throws IOException {
-    conn = (ConnectionImplementation) ConnectionFactory.createConnection(conf);
+  private final AsyncConnectionImpl conn;
+
+  private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
+
+  private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
+      new AtomicReference<>();
+
+  private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], HRegionLocation>> cache =
+      new ConcurrentHashMap<>();
+
+  AsyncRegionLocator(AsyncConnectionImpl conn) {
+    this.conn = conn;
+  }
+
+  private CompletableFuture<HRegionLocation> locateMetaRegion() {
+    for (;;) {
+      HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
+      if (metaRegionLocation != null) {
+        return CompletableFuture.completedFuture(metaRegionLocation);
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Meta region location cache is null, try fetching from registry.");
+      }
+      if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Start fetching meta region location from registry.");
+        }
+        CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+        conn.registry.getMetaRegionLocation().whenComplete((locs, error) -> {
+          if (error != null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Failed to fetch meta region location from registry", error);
+            }
+            metaRelocateFuture.getAndSet(null).completeExceptionally(error);
+            return;
+          }
+          HRegionLocation loc = locs.getDefaultRegionLocation();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("The fetched meta region location is " + loc);
+          }
+          // Here we update cache before reset future, so it is possible that someone can get a
+          // stale value. Consider this:
+          // 1. update cache
+          // 2. someone clear the cache and relocate again
+          // 3. the metaRelocateFuture is not null so the old future is used.
+          // 4. we clear metaRelocateFuture and complete the future in it with the value being
+          // cleared in step 2.
+          // But we do not think it is a big deal as it rarely happens, and even if it happens, the
+          // caller will retry again later, no correctness problems.
+          this.metaRegionLocation.set(loc);
+          metaRelocateFuture.set(null);
+          future.complete(loc);
+        });
+      } else {
+        CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+        if (future != null) {
+          return future;
+        }
+      }
+    }
+  }
+
+  private static ConcurrentNavigableMap<byte[], HRegionLocation> createTableCache() {
+    return new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+  }
+
+  private void removeFromCache(HRegionLocation loc) {
+    ConcurrentNavigableMap<byte[], HRegionLocation> tableCache =
+        cache.get(loc.getRegionInfo().getTable());
+    if (tableCache == null) {
+      return;
+    }
+    tableCache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> {
+      if (oldLoc.getSeqNum() > loc.getSeqNum()
+          || !oldLoc.getServerName().equals(loc.getServerName())) {
+        return oldLoc;
+      }
+      return null;
+    });
+  }
+
+  private void addToCache(HRegionLocation loc) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Try adding " + loc + " to cache");
+    }
+    ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = computeIfAbsent(cache,
+      loc.getRegionInfo().getTable(), AsyncRegionLocator::createTableCache);
+    byte[] startKey = loc.getRegionInfo().getStartKey();
+    HRegionLocation oldLoc = tableCache.putIfAbsent(startKey, loc);
+    if (oldLoc == null) {
+      return;
+    }
+    if (oldLoc.getSeqNum() > loc.getSeqNum()
+        || oldLoc.getServerName().equals(loc.getServerName())) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc
+            + " is newer than us or has the same server name");
+      }
+      return;
+    }
+    tableCache.compute(startKey, (k, oldValue) -> {
+      if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
+        return loc;
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue
+            + " is newer than us or has the same server name."
+            + " Maybe it is updated before we replace it");
+      }
+      return oldValue;
+    });
+  }
+
+  private HRegionLocation locateInCache(TableName tableName, byte[] row) {
+    ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = cache.get(tableName);
+    if (tableCache == null) {
+      return null;
+    }
+    Map.Entry<byte[], HRegionLocation> entry = tableCache.floorEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    HRegionLocation loc = entry.getValue();
+    byte[] endKey = loc.getRegionInfo().getEndKey();
+    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+      return loc;
+    } else {
+      return null;
+    }
+  }
+
+  private void onScanComplete(CompletableFuture<HRegionLocation> future, TableName tableName,
+      byte[] row, List<Result> results, Throwable error, String rowNameInErrorMsg,
+      Consumer<HRegionLocation> otherCheck) {
+    if (error != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Failed to fetch location of '" + tableName + "', " + rowNameInErrorMsg + "='"
+            + Bytes.toStringBinary(row) + "'",
+          error);
+      }
+      future.completeExceptionally(error);
+      return;
+    }
+    if (results.isEmpty()) {
+      future.completeExceptionally(new TableNotFoundException(tableName));
+      return;
+    }
+    RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='"
+          + Bytes.toStringBinary(row) + "' is " + locs);
+    }
+    if (locs == null || locs.getDefaultRegionLocation() == null) {
+      future.completeExceptionally(
+        new IOException(String.format("No location found for '%s', %s='%s'", tableName,
+          rowNameInErrorMsg, Bytes.toStringBinary(row))));
+      return;
+    }
+    HRegionLocation loc = locs.getDefaultRegionLocation();
+    HRegionInfo info = loc.getRegionInfo();
+    if (info == null) {
+      future.completeExceptionally(
+        new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName,
+          rowNameInErrorMsg, Bytes.toStringBinary(row))));
+      return;
+    }
+    if (!info.getTable().equals(tableName)) {
+      future.completeExceptionally(new TableNotFoundException(
+          "Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"));
+      return;
+    }
+    if (info.isSplit()) {
+      future.completeExceptionally(new RegionOfflineException(
+          "the only available region for the required row is a split parent,"
+              + " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"));
+      return;
+    }
+    if (info.isOffline()) {
+      future.completeExceptionally(new RegionOfflineException("the region is offline, could"
+          + " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"));
+      return;
+    }
+    if (loc.getServerName() == null) {
+      future.completeExceptionally(new NoServerForRegionException(
+          String.format("No server address listed for region '%s', %s='%s'",
+            info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(row))));
+      return;
+    }
+    otherCheck.accept(loc);
+    if (future.isDone()) {
+      return;
+    }
+    addToCache(loc);
+    future.complete(loc);
   }
 
-  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
-      boolean reload) {
+  private CompletableFuture<HRegionLocation> locateInMeta(TableName tableName, byte[] row) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(row) + "' in meta");
+    }
     CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
-    try {
-      future.complete(conn.getRegionLocation(tableName, row, reload));
-    } catch (IOException e) {
-      future.completeExceptionally(e);
+    byte[] metaKey = createRegionName(tableName, row, NINES, false);
+    conn.getTable(META_TABLE_NAME)
+        .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
+        .whenComplete(
+          (results, error) -> onScanComplete(future, tableName, row, results, error, "row", loc -> {
+          }));
+    return future;
+  }
+
+  private CompletableFuture<HRegionLocation> locateRegion(TableName tableName, byte[] row) {
+    HRegionLocation loc = locateInCache(tableName, row);
+    if (loc != null) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
+            + Bytes.toStringBinary(row) + "'");
+      }
+      return CompletableFuture.completedFuture(loc);
+    }
+    return locateInMeta(tableName, row);
+  }
+
+  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) {
+    if (tableName.equals(META_TABLE_NAME)) {
+      return locateMetaRegion();
+    } else {
+      return locateRegion(tableName, row);
+    }
+  }
+
+  private HRegionLocation locatePreviousInCache(TableName tableName,
+      byte[] startRowOfCurrentRegion) {
+    ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = cache.get(tableName);
+    if (tableCache == null) {
+      return null;
+    }
+    Map.Entry<byte[], HRegionLocation> entry;
+    if (isEmptyStopRow(startRowOfCurrentRegion)) {
+      entry = tableCache.lastEntry();
+    } else {
+      entry = tableCache.lowerEntry(startRowOfCurrentRegion);
+    }
+    if (entry == null) {
+      return null;
     }
+    HRegionLocation loc = entry.getValue();
+    if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) {
+      return loc;
+    } else {
+      return null;
+    }
+  }
+
+  private CompletableFuture<HRegionLocation> locatePreviousInMeta(TableName tableName,
+      byte[] startRowOfCurrentRegion) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='"
+          + Bytes.toStringBinary(startRowOfCurrentRegion) + "' in meta");
+    }
+    byte[] metaKey;
+    if (isEmptyStopRow(startRowOfCurrentRegion)) {
+      byte[] binaryTableName = tableName.getName();
+      metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
+    } else {
+      metaKey = createRegionName(tableName, startRowOfCurrentRegion, ZEROES, false);
+    }
+    CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+    conn.getTable(META_TABLE_NAME)
+        .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
+        .whenComplete((results, error) -> onScanComplete(future, tableName, startRowOfCurrentRegion,
+          results, error, "startRowOfCurrentRegion", loc -> {
+            HRegionInfo info = loc.getRegionInfo();
+            if (!Bytes.equals(info.getEndKey(), startRowOfCurrentRegion)) {
+              future.completeExceptionally(new IOException("The end key of '"
+                  + info.getRegionNameAsString() + "' is '" + Bytes.toStringBinary(info.getEndKey())
+                  + "', expected '" + Bytes.toStringBinary(startRowOfCurrentRegion) + "'"));
+            }
+          }));
     return future;
   }
 
+  private CompletableFuture<HRegionLocation> locatePreviousRegion(TableName tableName,
+      byte[] startRowOfCurrentRegion) {
+    HRegionLocation loc = locatePreviousInCache(tableName, startRowOfCurrentRegion);
+    if (loc != null) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='"
+            + Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
+      }
+      return CompletableFuture.completedFuture(loc);
+    }
+    return locatePreviousInMeta(tableName, startRowOfCurrentRegion);
+  }
+
+  /**
+   * Locate the previous region using the current regions start key. Used for reverse scan.
+   */
   CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
-      byte[] startRowOfCurrentRegion, boolean reload) {
-    CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
-    byte[] toLocateRow = createClosestRowBefore(startRowOfCurrentRegion);
-    try {
-      for (;;) {
-        HRegionLocation loc = conn.getRegionLocation(tableName, toLocateRow, reload);
-        byte[] endKey = loc.getRegionInfo().getEndKey();
-        if (Bytes.equals(startRowOfCurrentRegion, endKey)) {
-          future.complete(loc);
-          break;
-        }
-        toLocateRow = endKey;
+      byte[] startRowOfCurrentRegion) {
+    if (tableName.equals(META_TABLE_NAME)) {
+      return locateMetaRegion();
+    } else {
+      return locatePreviousRegion(tableName, startRowOfCurrentRegion);
+    }
+  }
+
+  private boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
+    // Do not need to update if no such location, or the location is newer, or the location is not
+    // same with us
+    return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum()
+        && oldLoc.getServerName().equals(loc.getServerName());
+  }
+
+  private void updateCachedLoation(HRegionLocation loc, Throwable exception,
+      Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
+      Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
+    HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Try updating " + loc + ", the old value is " + oldLoc, exception);
+    }
+    if (!canUpdate(loc, oldLoc)) {
+      return;
+    }
+    Throwable cause = findException(exception);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The actual exception when updating " + loc, cause);
+    }
+    if (cause == null || !isMetaClearingException(cause)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          "Will not update " + loc + " because the exception is null or not the one we care about");
       }
-    } catch (IOException e) {
-      future.completeExceptionally(e);
+      return;
+    }
+    if (cause instanceof RegionMovedException) {
+      RegionMovedException rme = (RegionMovedException) cause;
+      HRegionLocation newLoc =
+          new HRegionLocation(loc.getRegionInfo(), rme.getServerName(), rme.getLocationSeqNum());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          "Try updating " + loc + " with the new location " + newLoc + " constructed by " + rme);
+      }
+      addToCache.accept(newLoc);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Try removing " + loc + " from cache");
+      }
+      removeFromCache.accept(loc);
     }
-    return future;
   }
 
-  void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception,
-      ServerName source) {
-    conn.updateCachedLocations(tableName, regionName, row, exception, source);
+  void updateCachedLocation(HRegionLocation loc, Throwable exception) {
+    if (loc.getRegionInfo().isMetaTable()) {
+      updateCachedLoation(loc, exception, l -> metaRegionLocation.get(), newLoc -> {
+        for (;;) {
+          HRegionLocation oldLoc = metaRegionLocation.get();
+          if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum()
+              || oldLoc.getServerName().equals(newLoc.getServerName()))) {
+            return;
+          }
+          if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
+            return;
+          }
+        }
+      }, l -> {
+        for (;;) {
+          HRegionLocation oldLoc = metaRegionLocation.get();
+          if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
+            return;
+          }
+        }
+      });
+    } else {
+      updateCachedLoation(loc, exception, l -> {
+        ConcurrentNavigableMap<byte[], HRegionLocation> tableCache =
+            cache.get(l.getRegionInfo().getTable());
+        if (tableCache == null) {
+          return null;
+        }
+        return tableCache.get(l.getRegionInfo().getStartKey());
+      }, this::addToCache, this::removeFromCache);
+    }
   }
 
-  @Override
-  public void close() {
-    IOUtils.closeQuietly(conn);
+  void clearCache(TableName tableName) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Clear meta cache for " + tableName);
+    }
+    cache.remove(tableName);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 9020ce5..0d23c39 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -68,8 +68,8 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
-    public SingleRequestCallerBuilder<T>
-        action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
+    public SingleRequestCallerBuilder<T> action(
+        AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
       this.callable = callable;
       return this;
     }
@@ -92,12 +92,9 @@ class AsyncRpcRetryingCallerFactory {
     public AsyncSingleRequestRpcRetryingCaller<T> build() {
       return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
           checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
-          locateToPreviousRegion
-              ? (c, tn, r, re) -> c.getLocator().getPreviousRegionLocation(tn, r, re)
-              : (c, tn, r, re) -> c.getLocator().getRegionLocation(tn, r, re),
-          checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(),
-          conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs,
-          conn.connConf.getStartLogErrorsCnt());
+          locateToPreviousRegion, checkNotNull(callable, "action is null"),
+          conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs,
+          rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 1d0357d..f10c9a5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -60,12 +60,6 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
         ClientService.Interface stub);
   }
 
-  @FunctionalInterface
-  public interface RegionLocator {
-    CompletableFuture<HRegionLocation> locate(AsyncConnectionImpl conn, TableName tableName,
-        byte[] row, boolean reload);
-  }
-
   private final HashedWheelTimer retryTimer;
 
   private final AsyncConnectionImpl conn;
@@ -74,7 +68,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
 
   private final byte[] row;
 
-  private final RegionLocator locator;
+  private final Supplier<CompletableFuture<HRegionLocation>> locate;
 
   private final Callable<T> callable;
 
@@ -97,13 +91,18 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
   private final long startNs;
 
   public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
-      TableName tableName, byte[] row, RegionLocator locator, Callable<T> callable, long pauseNs,
-      int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+      TableName tableName, byte[] row, boolean locateToPreviousRegion, Callable<T> callable,
+      long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs,
+      int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.conn = conn;
     this.tableName = tableName;
     this.row = row;
-    this.locator = locator;
+    if (locateToPreviousRegion) {
+      this.locate = this::locatePrevious;
+    } else {
+      this.locate = this::locate;
+    }
     this.callable = callable;
     this.pauseNs = pauseNs;
     this.maxAttempts = retries2Attempts(maxRetries);
@@ -145,8 +144,9 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
     if (tries > startLogErrorsCnt) {
       LOG.warn(errMsg.get(), error);
     }
-    RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(
-        error, EnvironmentEdgeManager.currentTime(), "");
+    RetriesExhaustedException.ThrowableWithExtraContext qt =
+        new RetriesExhaustedException.ThrowableWithExtraContext(error,
+            EnvironmentEdgeManager.currentTime(), "");
     exceptions.add(qt);
     if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
       completeExceptionally();
@@ -194,8 +194,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
             + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
             + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
             + elapsedMs() + " ms",
-        err -> conn.getLocator().updateCachedLocations(tableName,
-          loc.getRegionInfo().getRegionName(), row, err, loc.getServerName()));
+        err -> conn.getLocator().updateCachedLocation(loc, err));
       return;
     }
     resetController();
@@ -207,8 +206,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
               + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
               + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
               + elapsedMs() + " ms",
-          err -> conn.getLocator().updateCachedLocations(tableName,
-            loc.getRegionInfo().getRegionName(), row, err, loc.getServerName()));
+          err -> conn.getLocator().updateCachedLocation(loc, err));
         return;
       }
       future.complete(result);
@@ -216,7 +214,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
   }
 
   private void locateThenCall() {
-    locator.locate(conn, tableName, row, tries > 1).whenComplete((loc, error) -> {
+    locate.get().whenComplete((loc, error) -> {
       if (error != null) {
         onError(error,
           () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = "
@@ -231,6 +229,14 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
     });
   }
 
+  private CompletableFuture<HRegionLocation> locate() {
+    return conn.getLocator().getRegionLocation(tableName, row);
+  }
+
+  private CompletableFuture<HRegionLocation> locatePrevious() {
+    return conn.getLocator().getPreviousRegionLocation(tableName, row);
+  }
+
   public CompletableFuture<T> call() {
     locateThenCall();
     return future;

http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index d715e24..b29f878 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -45,6 +45,6 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
 
   @Override
   public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
-    return locator.getRegionLocation(tableName, row, reload);
+    return locator.getRegionLocation(tableName, row);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java
index 775f8bd..4e19b77 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
@@ -108,6 +109,19 @@ public class CollectionUtils {
   }
 
   /**
+   * In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the
+   * value already exists. So here we copy the implementation of
+   * {@link ConcurrentMap#computeIfAbsent(Object, java.util.function.Function)}. It uses get and
+   * putIfAbsent to implement computeIfAbsent. And notice that the implementation does not guarantee
+   * that the supplier will only be executed once.
+   */
+  public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Supplier<V> supplier) {
+    V v, newValue;
+    return ((v = map.get(key)) == null && (newValue = supplier.get()) != null
+        && (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v;
+  }
+
+  /**
    * A supplier that throws IOException when get.
    */
   @FunctionalInterface

http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
new file mode 100644
index 0000000..b20e616
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
+import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Will split the table, and move region randomly when testing.
+ */
+@Category({ LargeTests.class, ClientTests.class })
+public class TestAsyncGetMultiThread {
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+  private static int COUNT = 1000;
+
+  private static AsyncConnection CONN;
+
+  private static byte[][] SPLIT_KEYS;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
+    TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
+    TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L);
+    TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000);
+    TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
+    TEST_UTIL.startMiniCluster(5);
+    SPLIT_KEYS = new byte[8][];
+    for (int i = 111; i < 999; i += 111) {
+      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+    }
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    AsyncTable table = CONN.getTable(TABLE_NAME);
+    List<CompletableFuture<?>> futures = new ArrayList<>();
+    IntStream.range(0, COUNT)
+        .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
+            .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))));
+    CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
+    while (!stop.get()) {
+      int i = ThreadLocalRandom.current().nextInt(COUNT);
+      assertEquals(i,
+        Bytes.toInt(CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i))))
+            .get().getValue(FAMILY, QUALIFIER)));
+    }
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException, ExecutionException {
+    int numThreads = 20;
+    AtomicBoolean stop = new AtomicBoolean(false);
+    ExecutorService executor =
+        Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-"));
+    List<Future<?>> futures = new ArrayList<>();
+    IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
+      run(stop);
+      return null;
+    })));
+    Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123));
+    Admin admin = TEST_UTIL.getAdmin();
+    for (byte[] splitPoint : SPLIT_KEYS) {
+      admin.split(TABLE_NAME, splitPoint);
+      for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
+        region.compact(true);
+      }
+      Thread.sleep(5000);
+      admin.balancer(true);
+      Thread.sleep(5000);
+      ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
+      ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+          .map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer))
+          .findAny().get();
+      admin.move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
+        Bytes.toBytes(newMetaServer.getServerName()));
+      Thread.sleep(5000);
+    }
+    stop.set(true);
+    executor.shutdown();
+    for (Future<?> future : futures) {
+      future.get();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
new file mode 100644
index 0000000..2e46d8a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncRegionLocator {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static AsyncConnectionImpl CONN;
+
+  private static AsyncRegionLocator LOCATOR;
+
+  private static byte[][] SPLIT_KEYS;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.getAdmin().setBalancerRunning(false, true);
+    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
+    LOCATOR = CONN.getLocator();
+    SPLIT_KEYS = new byte[8][];
+    for (int i = 111; i < 999; i += 111) {
+      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @After
+  public void tearDownAfterTest() throws IOException {
+    Admin admin = TEST_UTIL.getAdmin();
+    if (admin.tableExists(TABLE_NAME)) {
+      if (admin.isTableEnabled(TABLE_NAME)) {
+        TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+      }
+      TEST_UTIL.getAdmin().deleteTable(TABLE_NAME);
+    }
+    LOCATOR.clearCache(TABLE_NAME);
+  }
+
+  private void createSingleRegionTable() throws IOException, InterruptedException {
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+  }
+
+  @Test
+  public void testNoTable() throws InterruptedException {
+    try {
+      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+    }
+    try {
+      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+    }
+  }
+
+  @Test
+  public void testDisableTable() throws IOException, InterruptedException {
+    createSingleRegionTable();
+    TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+    try {
+      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+    }
+    try {
+      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+    }
+  }
+
+  private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
+      HRegionLocation loc) {
+    HRegionInfo info = loc.getRegionInfo();
+    assertEquals(TABLE_NAME, info.getTable());
+    assertArrayEquals(startKey, info.getStartKey());
+    assertArrayEquals(endKey, info.getEndKey());
+    assertEquals(serverName, loc.getServerName());
+  }
+
+  @Test
+  public void testSingleRegionTable() throws IOException, InterruptedException, ExecutionException {
+    createSingleRegionTable();
+    ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
+    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+    byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
+    ThreadLocalRandom.current().nextBytes(randKey);
+    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+      LOCATOR.getRegionLocation(TABLE_NAME, randKey).get());
+    // Use a key which is not the endKey of a region will cause error
+    try {
+      assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+        LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }).get());
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(IOException.class));
+      assertTrue(e.getCause().getMessage().contains("end key of"));
+    }
+  }
+
+  private void createMultiRegionTable() throws IOException, InterruptedException {
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+  }
+
+  private static byte[][] getStartKeys() {
+    byte[][] startKeys = new byte[SPLIT_KEYS.length + 1][];
+    startKeys[0] = EMPTY_START_ROW;
+    System.arraycopy(SPLIT_KEYS, 0, startKeys, 1, SPLIT_KEYS.length);
+    return startKeys;
+  }
+
+  private static byte[][] getEndKeys() {
+    byte[][] endKeys = Arrays.copyOf(SPLIT_KEYS, SPLIT_KEYS.length + 1);
+    endKeys[endKeys.length - 1] = EMPTY_START_ROW;
+    return endKeys;
+  }
+
+  @Test
+  public void testMultiRegionTable() throws IOException, InterruptedException {
+    createMultiRegionTable();
+    byte[][] startKeys = getStartKeys();
+    ServerName[] serverNames = new ServerName[startKeys.length];
+    TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+        .forEach(rs -> {
+          rs.getOnlineRegions(TABLE_NAME).forEach(r -> {
+            serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
+              Bytes::compareTo)] = rs.getServerName();
+          });
+        });
+    IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
+      try {
+        assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
+          serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i]).get());
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    }));
+    LOCATOR.clearCache(TABLE_NAME);
+    byte[][] endKeys = getEndKeys();
+    IntStream.range(0, 2).forEach(
+      n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
+        try {
+          assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
+            LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i]).get());
+        } catch (InterruptedException | ExecutionException e) {
+          throw new RuntimeException(e);
+        }
+      }));
+  }
+
+  @Test
+  public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
+    createSingleRegionTable();
+    ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
+    HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
+    ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+        .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
+        .findAny().get();
+
+    TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegionInfo().getEncodedName()),
+      Bytes.toBytes(newServerName.getServerName()));
+    while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName()
+        .equals(newServerName)) {
+      Thread.sleep(100);
+    }
+    // Should be same as it is in cache
+    assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+    LOCATOR.updateCachedLocation(loc, null);
+    // null error will not trigger a cache cleanup
+    assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+    LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
+    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
+      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6cf9333e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index fd3938e..67d2661 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -151,11 +150,9 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     AtomicBoolean errorTriggered = new AtomicBoolean(false);
     AtomicInteger count = new AtomicInteger(0);
     HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
-
-    try (AsyncRegionLocator mockedLocator = new AsyncRegionLocator(asyncConn.getConfiguration()) {
+    AsyncRegionLocator mockedLocator = new AsyncRegionLocator(asyncConn) {
       @Override
-      CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
-          boolean reload) {
+      CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) {
         if (tableName.equals(TABLE_NAME)) {
           CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
           if (count.getAndIncrement() == 0) {
@@ -166,17 +163,22 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
           }
           return future;
         } else {
-          return super.getRegionLocation(tableName, row, reload);
+          return super.getRegionLocation(tableName, row);
         }
       }
 
       @Override
-      void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row,
-          Object exception, ServerName source) {
+      CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
+          byte[] startRowOfCurrentRegion) {
+        return super.getPreviousRegionLocation(tableName, startRowOfCurrentRegion);
+      }
+
+      @Override
+      void updateCachedLocation(HRegionLocation loc, Throwable exception) {
       }
     };
-        AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(asyncConn.getConfiguration(),
-            User.getCurrent()) {
+    try (AsyncConnectionImpl mockedConn =
+        new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) {
 
           @Override
           AsyncRegionLocator getLocator() {