You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/12/18 00:53:11 UTC

hbase git commit: HBASE-17282 Reduce the redundant requests to meta table

Repository: hbase
Updated Branches:
  refs/heads/master da356069f -> f041306cd


HBASE-17282 Reduce the redundant requests to meta table


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f041306c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f041306c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f041306c

Branch: refs/heads/master
Commit: f041306cdae701e91b314234b413af98fd1f6b18
Parents: da35606
Author: zhangduo <zh...@apache.org>
Authored: Sat Dec 17 21:01:14 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Dec 18 08:52:06 2016 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncConnectionImpl.java       |   2 +-
 .../hbase/client/AsyncMetaRegionLocator.java    | 121 +++++
 .../hbase/client/AsyncNonMetaRegionLocator.java | 487 +++++++++++++++++++
 .../hadoop/hbase/client/AsyncRegionLocator.java | 377 ++------------
 .../hbase/client/TestAsyncGetMultiThread.java   |   3 +-
 .../client/TestAsyncNonMetaRegionLocator.java   | 240 +++++++++
 ...syncNonMetaRegionLocatorConcurrenyLimit.java | 159 ++++++
 .../hbase/client/TestAsyncRegionLocator.java    | 239 ---------
 .../client/TestAsyncRegionLocatorTimeout.java   |   3 +-
 .../hadoop/hbase/client/TestAsyncTable.java     |   3 +-
 .../hbase/client/TestAsyncTableNoncedRetry.java |   3 +-
 11 files changed, 1048 insertions(+), 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 92785fb..d660b02 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -95,7 +95,6 @@ class AsyncConnectionImpl implements AsyncConnection {
     this.conf = conf;
     this.user = user;
     this.connConf = new AsyncConnectionConfiguration(conf);
-    this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
     this.registry = AsyncRegistryFactory.getRegistry(conf);
     this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
       if (LOG.isDebugEnabled()) {
@@ -107,6 +106,7 @@ class AsyncConnectionImpl implements AsyncConnection {
     this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
     this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
     this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
+    this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
     this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
     if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
       nonceGenerator = PerClientRandomNonceGenerator.get();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
new file mode 100644
index 0000000..5b7a68f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.AsyncRegionLocator.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * The asynchronous locator for meta region.
+ */
+@InterfaceAudience.Private
+class AsyncMetaRegionLocator {
+
+  private static final Log LOG = LogFactory.getLog(AsyncMetaRegionLocator.class);
+
+  private final AsyncRegistry registry;
+
+  private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
+
+  private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
+      new AtomicReference<>();
+
+  AsyncMetaRegionLocator(AsyncRegistry registry) {
+    this.registry = registry;
+  }
+
+  CompletableFuture<HRegionLocation> getRegionLocation() {
+    for (;;) {
+      HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
+      if (metaRegionLocation != null) {
+        return CompletableFuture.completedFuture(metaRegionLocation);
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Meta region location cache is null, try fetching from registry.");
+      }
+      if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Start fetching meta region location from registry.");
+        }
+        CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+        registry.getMetaRegionLocation().whenComplete((locs, error) -> {
+          if (error != null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Failed to fetch meta region location from registry", error);
+            }
+            metaRelocateFuture.getAndSet(null).completeExceptionally(error);
+            return;
+          }
+          HRegionLocation loc = locs.getDefaultRegionLocation();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("The fetched meta region location is " + loc);
+          }
+          // Here we update cache before reset future, so it is possible that someone can get a
+          // stale value. Consider this:
+          // 1. update cache
+          // 2. someone clear the cache and relocate again
+          // 3. the metaRelocateFuture is not null so the old future is used.
+          // 4. we clear metaRelocateFuture and complete the future in it with the value being
+          // cleared in step 2.
+          // But we do not think it is a big deal as it rarely happens, and even if it happens, the
+          // caller will retry again later, no correctness problems.
+          this.metaRegionLocation.set(loc);
+          metaRelocateFuture.set(null);
+          future.complete(loc);
+        });
+      } else {
+        CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+        if (future != null) {
+          return future;
+        }
+      }
+    }
+  }
+
+  void updateCachedLocation(HRegionLocation loc, Throwable exception) {
+    updateCachedLoation(loc, exception, l -> metaRegionLocation.get(), newLoc -> {
+      for (;;) {
+        HRegionLocation oldLoc = metaRegionLocation.get();
+        if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum()
+            || oldLoc.getServerName().equals(newLoc.getServerName()))) {
+          return;
+        }
+        if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
+          return;
+        }
+      }
+    }, l -> {
+      for (;;) {
+        HRegionLocation oldLoc = metaRegionLocation.get();
+        if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
+          return;
+        }
+      }
+    });
+  }
+
+  void clearCache() {
+    metaRegionLocation.set(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
new file mode 100644
index 0000000..c22d210
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY;
+import static org.apache.hadoop.hbase.HConstants.NINES;
+import static org.apache.hadoop.hbase.HConstants.ZEROES;
+import static org.apache.hadoop.hbase.HRegionInfo.createRegionName;
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocator.updateCachedLoation;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * The asynchronous locator for regions other than meta.
+ */
+@InterfaceAudience.Private
+class AsyncNonMetaRegionLocator {
+
+  private static final Log LOG = LogFactory.getLog(AsyncNonMetaRegionLocator.class);
+
+  static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
+      "hbase.client.meta.max.concurrent.locate.per.table";
+
+  private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
+
+  private final AsyncConnectionImpl conn;
+
+  private final int maxConcurrentLocateRequestPerTable;
+
+  private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
+
+  private static final class LocateRequest {
+
+    public final byte[] row;
+
+    public final boolean locateToPrevious;
+
+    public LocateRequest(byte[] row, boolean locateToPrevious) {
+      this.row = row;
+      this.locateToPrevious = locateToPrevious;
+    }
+
+    @Override
+    public int hashCode() {
+      return Bytes.hashCode(row) ^ Boolean.hashCode(locateToPrevious);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || obj.getClass() != LocateRequest.class) {
+        return false;
+      }
+      LocateRequest that = (LocateRequest) obj;
+      return locateToPrevious == that.locateToPrevious && Bytes.equals(row, that.row);
+    }
+  }
+
+  private static final class TableCache {
+
+    public final ConcurrentNavigableMap<byte[], HRegionLocation> cache =
+        new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
+
+    public final Set<LocateRequest> pendingRequests = new HashSet<>();
+
+    public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests =
+        new HashMap<>();
+
+    public boolean hasQuota(int max) {
+      return pendingRequests.size() < max;
+    }
+
+    public boolean isPending(LocateRequest req) {
+      return pendingRequests.contains(req);
+    }
+
+    public void send(LocateRequest req) {
+      pendingRequests.add(req);
+    }
+  }
+
+  AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
+    this.conn = conn;
+    this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
+      MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
+  }
+
+  private TableCache getTableCache(TableName tableName) {
+    return computeIfAbsent(cache, tableName, TableCache::new);
+  }
+
+  private void removeFromCache(HRegionLocation loc) {
+    TableCache tableCache = cache.get(loc.getRegionInfo().getTable());
+    if (tableCache == null) {
+      return;
+    }
+    tableCache.cache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> {
+      if (oldLoc.getSeqNum() > loc.getSeqNum()
+          || !oldLoc.getServerName().equals(loc.getServerName())) {
+        return oldLoc;
+      }
+      return null;
+    });
+  }
+
+  // return whether we add this loc to cache
+  private boolean addToCache(TableCache tableCache, HRegionLocation loc) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Try adding " + loc + " to cache");
+    }
+    byte[] startKey = loc.getRegionInfo().getStartKey();
+    HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc);
+    if (oldLoc == null) {
+      return true;
+    }
+    if (oldLoc.getSeqNum() > loc.getSeqNum()
+        || oldLoc.getServerName().equals(loc.getServerName())) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc
+            + " is newer than us or has the same server name");
+      }
+      return false;
+    }
+    return loc == tableCache.cache.compute(startKey, (k, oldValue) -> {
+      if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
+        return loc;
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue
+            + " is newer than us or has the same server name."
+            + " Maybe it is updated before we replace it");
+      }
+      return oldValue;
+    });
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+      justification = "Called by lambda expression")
+  private void addToCache(HRegionLocation loc) {
+    addToCache(getTableCache(loc.getRegionInfo().getTable()), loc);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Try adding " + loc + " to cache");
+    }
+  }
+
+  private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
+      HRegionLocation loc) {
+    if (future.isDone()) {
+      return true;
+    }
+    boolean completed;
+    if (req.locateToPrevious) {
+      completed = Bytes.equals(loc.getRegionInfo().getEndKey(), req.row);
+    } else {
+      completed = loc.getRegionInfo().containsRow(req.row);
+    }
+    if (completed) {
+      future.complete(loc);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
+      Throwable error, String rowNameInErrorMsg) {
+    if (error != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Failed to locate region in '" + tableName + "', " + rowNameInErrorMsg + "='"
+            + Bytes.toStringBinary(req.row) + "'",
+          error);
+      }
+    }
+    LocateRequest toSend = null;
+    TableCache tableCache = getTableCache(tableName);
+    if (loc != null) {
+      if (!addToCache(tableCache, loc)) {
+        // someone is ahead of us.
+        synchronized (tableCache) {
+          tableCache.pendingRequests.remove(req);
+        }
+        return;
+      }
+    }
+    synchronized (tableCache) {
+      tableCache.pendingRequests.remove(req);
+      if (error instanceof DoNotRetryIOException) {
+        CompletableFuture<?> future = tableCache.allRequests.remove(req);
+        if (future != null) {
+          future.completeExceptionally(error);
+        }
+      }
+      if (loc != null) {
+        for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter =
+            tableCache.allRequests.entrySet().iterator(); iter.hasNext();) {
+          Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
+          if (tryComplete(entry.getKey(), entry.getValue(), loc)) {
+            iter.remove();
+          }
+        }
+      }
+      if (!tableCache.allRequests.isEmpty()
+          && tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) {
+        LocateRequest[] candidates = tableCache.allRequests.keySet().stream()
+            .filter(r -> !tableCache.isPending(r)).toArray(LocateRequest[]::new);
+        if (candidates.length > 0) {
+          // TODO: use a better algorithm to send a request which is more likely to fetch a new
+          // location.
+          toSend = candidates[ThreadLocalRandom.current().nextInt(candidates.length)];
+        }
+      }
+    }
+    if (toSend != null) {
+      if (toSend.locateToPrevious) {
+        locatePreviousInMeta(tableName, toSend);
+      } else {
+        locateInMeta(tableName, toSend);
+      }
+    }
+  }
+
+  private void onScanComplete(TableName tableName, LocateRequest req, List<Result> results,
+      Throwable error, String rowNameInErrorMsg) {
+    if (error != null) {
+      complete(tableName, req, null, error, rowNameInErrorMsg);
+      return;
+    }
+    if (results.isEmpty()) {
+      complete(tableName, req, null, new TableNotFoundException(tableName), rowNameInErrorMsg);
+      return;
+    }
+    RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='"
+          + Bytes.toStringBinary(req.row) + "' is " + locs);
+    }
+    if (locs == null || locs.getDefaultRegionLocation() == null) {
+      complete(tableName, req, null,
+        new IOException(String.format("No location found for '%s', %s='%s'", tableName,
+          rowNameInErrorMsg, Bytes.toStringBinary(req.row))),
+        rowNameInErrorMsg);
+      return;
+    }
+    HRegionLocation loc = locs.getDefaultRegionLocation();
+    HRegionInfo info = loc.getRegionInfo();
+    if (info == null) {
+      complete(tableName, req, null,
+        new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName,
+          rowNameInErrorMsg, Bytes.toStringBinary(req.row))),
+        rowNameInErrorMsg);
+      return;
+    }
+    if (!info.getTable().equals(tableName)) {
+      complete(tableName, req, null,
+        new TableNotFoundException(
+            "Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"),
+        rowNameInErrorMsg);
+      return;
+    }
+    if (info.isSplit()) {
+      complete(tableName, req, null,
+        new RegionOfflineException(
+            "the only available region for the required row is a split parent,"
+                + " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"),
+        rowNameInErrorMsg);
+      return;
+    }
+    if (info.isOffline()) {
+      complete(tableName, req, null,
+        new RegionOfflineException("the region is offline, could"
+            + " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"),
+        rowNameInErrorMsg);
+      return;
+    }
+    if (loc.getServerName() == null) {
+      complete(tableName, req, null,
+        new NoServerForRegionException(
+            String.format("No server address listed for region '%s', %s='%s'",
+              info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(req.row))),
+        rowNameInErrorMsg);
+      return;
+    }
+    if (req.locateToPrevious && !Bytes.equals(info.getEndKey(), req.row)) {
+      complete(tableName, req, null,
+        new DoNotRetryIOException("The end key of '" + info.getRegionNameAsString() + "' is '"
+            + Bytes.toStringBinary(info.getEndKey()) + "', expected '"
+            + Bytes.toStringBinary(req.row) + "'"),
+        rowNameInErrorMsg);
+      return;
+    }
+    complete(tableName, req, loc, null, rowNameInErrorMsg);
+  }
+
+  private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row) {
+    Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    HRegionLocation loc = entry.getValue();
+    byte[] endKey = loc.getRegionInfo().getEndKey();
+    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
+            + Bytes.toStringBinary(row) + "'");
+      }
+      return loc;
+    } else {
+      return null;
+    }
+  }
+
+  private HRegionLocation locatePreviousInCache(TableCache tableCache, TableName tableName,
+      byte[] startRowOfCurrentRegion) {
+    Map.Entry<byte[], HRegionLocation> entry;
+    if (isEmptyStopRow(startRowOfCurrentRegion)) {
+      entry = tableCache.cache.lastEntry();
+    } else {
+      entry = tableCache.cache.lowerEntry(startRowOfCurrentRegion);
+    }
+    if (entry == null) {
+      return null;
+    }
+    HRegionLocation loc = entry.getValue();
+    if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='"
+            + Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
+      }
+      return loc;
+    } else {
+      return null;
+    }
+  }
+
+  private void locateInMeta(TableName tableName, LocateRequest req) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(
+        "Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) + "' in meta");
+    }
+    byte[] metaKey = createRegionName(tableName, req.row, NINES, false);
+    conn.getRawTable(META_TABLE_NAME)
+        .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
+        .whenComplete((results, error) -> onScanComplete(tableName, req, results, error, "row"));
+  }
+
+  private void locatePreviousInMeta(TableName tableName, LocateRequest req) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='"
+          + Bytes.toStringBinary(req.row) + "' in meta");
+    }
+    byte[] metaKey;
+    if (isEmptyStopRow(req.row)) {
+      byte[] binaryTableName = tableName.getName();
+      metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
+    } else {
+      metaKey = createRegionName(tableName, req.row, ZEROES, false);
+    }
+    conn.getRawTable(META_TABLE_NAME)
+        .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
+        .whenComplete((results, error) -> onScanComplete(tableName, req, results, error,
+          "startRowOfCurrentRegion"));
+  }
+
+  private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row,
+      boolean locateToPrevious) {
+    return locateToPrevious ? locatePreviousInCache(tableCache, tableName, row)
+        : locateInCache(tableCache, tableName, row);
+  }
+
+  // locateToPrevious is true means we will use the start key of a region to locate the region
+  // placed before it. Used for reverse scan. See the comment of
+  // AsyncRegionLocator.getPreviousRegionLocation.
+  private CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+      boolean locateToPrevious) {
+    TableCache tableCache = getTableCache(tableName);
+    HRegionLocation loc = locateInCache(tableCache, tableName, row, locateToPrevious);
+    if (loc != null) {
+      return CompletableFuture.completedFuture(loc);
+    }
+    CompletableFuture<HRegionLocation> future;
+    LocateRequest req;
+    boolean sendRequest = false;
+    synchronized (tableCache) {
+      // check again
+      loc = locateInCache(tableCache, tableName, row, locateToPrevious);
+      if (loc != null) {
+        return CompletableFuture.completedFuture(loc);
+      }
+      req = new LocateRequest(row, locateToPrevious);
+      future = tableCache.allRequests.get(req);
+      if (future == null) {
+        future = new CompletableFuture<>();
+        tableCache.allRequests.put(req, future);
+        if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) {
+          tableCache.send(req);
+          sendRequest = true;
+        }
+      }
+    }
+    if (sendRequest) {
+      if (locateToPrevious) {
+        locatePreviousInMeta(tableName, req);
+      } else {
+        locateInMeta(tableName, req);
+      }
+    }
+    return future;
+  }
+
+  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) {
+    return getRegionLocation(tableName, row, false);
+  }
+
+  // Used for reverse scan. See the comment of AsyncRegionLocator.getPreviousRegionLocation.
+  // TODO: need to deal with region merge where the startRowOfCurrentRegion will not be the endRow
+  // of a region.
+  CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
+      byte[] startRowOfCurrentRegion) {
+    return getRegionLocation(tableName, startRowOfCurrentRegion, true);
+  }
+
+  void updateCachedLocation(HRegionLocation loc, Throwable exception) {
+    updateCachedLoation(loc, exception, l -> {
+      TableCache tableCache = cache.get(l.getRegionInfo().getTable());
+      if (tableCache == null) {
+        return null;
+      }
+      return tableCache.cache.get(l.getRegionInfo().getStartKey());
+    }, this::addToCache, this::removeFromCache);
+  }
+
+  void clearCache(TableName tableName) {
+    TableCache tableCache = cache.remove(tableName);
+    if (tableCache == null) {
+      return;
+    }
+    synchronized (tableCache) {
+      if (!tableCache.allRequests.isEmpty()) {
+        IOException error = new IOException("Cache cleared");
+        tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index ae8f2a2..1c3569a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -17,42 +17,23 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY;
-import static org.apache.hadoop.hbase.HConstants.NINES;
-import static org.apache.hadoop.hbase.HConstants.ZEROES;
-import static org.apache.hadoop.hbase.HRegionInfo.createRegionName;
 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
 import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
 import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
-import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
 
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
@@ -66,346 +47,73 @@ class AsyncRegionLocator {
 
   private static final Log LOG = LogFactory.getLog(AsyncRegionLocator.class);
 
-  private final AsyncConnectionImpl conn;
-
   private final HashedWheelTimer retryTimer;
 
-  private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
-
-  private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
-      new AtomicReference<>();
+  private final AsyncMetaRegionLocator metaRegionLocator;
 
-  private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], HRegionLocation>> cache =
-      new ConcurrentHashMap<>();
+  private final AsyncNonMetaRegionLocator nonMetaRegionLocator;
 
   AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
-    this.conn = conn;
+    this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry);
+    this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn);
     this.retryTimer = retryTimer;
   }
 
-  private CompletableFuture<HRegionLocation> locateMetaRegion() {
-    for (;;) {
-      HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
-      if (metaRegionLocation != null) {
-        return CompletableFuture.completedFuture(metaRegionLocation);
-      }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Meta region location cache is null, try fetching from registry.");
-      }
-      if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Start fetching meta region location from registry.");
-        }
-        CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
-        conn.registry.getMetaRegionLocation().whenComplete((locs, error) -> {
-          if (error != null) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Failed to fetch meta region location from registry", error);
-            }
-            metaRelocateFuture.getAndSet(null).completeExceptionally(error);
-            return;
-          }
-          HRegionLocation loc = locs.getDefaultRegionLocation();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("The fetched meta region location is " + loc);
-          }
-          // Here we update cache before reset future, so it is possible that someone can get a
-          // stale value. Consider this:
-          // 1. update cache
-          // 2. someone clear the cache and relocate again
-          // 3. the metaRelocateFuture is not null so the old future is used.
-          // 4. we clear metaRelocateFuture and complete the future in it with the value being
-          // cleared in step 2.
-          // But we do not think it is a big deal as it rarely happens, and even if it happens, the
-          // caller will retry again later, no correctness problems.
-          this.metaRegionLocation.set(loc);
-          metaRelocateFuture.set(null);
-          future.complete(loc);
-        });
-      } else {
-        CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
-        if (future != null) {
-          return future;
-        }
-      }
-    }
-  }
-
-  private static ConcurrentNavigableMap<byte[], HRegionLocation> createTableCache() {
-    return new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
-  }
-
-  private void removeFromCache(HRegionLocation loc) {
-    ConcurrentNavigableMap<byte[], HRegionLocation> tableCache =
-        cache.get(loc.getRegionInfo().getTable());
-    if (tableCache == null) {
-      return;
-    }
-    tableCache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> {
-      if (oldLoc.getSeqNum() > loc.getSeqNum()
-          || !oldLoc.getServerName().equals(loc.getServerName())) {
-        return oldLoc;
-      }
-      return null;
-    });
-  }
-
-  private void addToCache(HRegionLocation loc) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Try adding " + loc + " to cache");
-    }
-    ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = computeIfAbsent(cache,
-      loc.getRegionInfo().getTable(), AsyncRegionLocator::createTableCache);
-    byte[] startKey = loc.getRegionInfo().getStartKey();
-    HRegionLocation oldLoc = tableCache.putIfAbsent(startKey, loc);
-    if (oldLoc == null) {
-      return;
-    }
-    if (oldLoc.getSeqNum() > loc.getSeqNum()
-        || oldLoc.getServerName().equals(loc.getServerName())) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc
-            + " is newer than us or has the same server name");
-      }
-      return;
-    }
-    tableCache.compute(startKey, (k, oldValue) -> {
-      if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
-        return loc;
-      }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue
-            + " is newer than us or has the same server name."
-            + " Maybe it is updated before we replace it");
-      }
-      return oldValue;
-    });
-  }
-
-  private HRegionLocation locateInCache(TableName tableName, byte[] row) {
-    ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = cache.get(tableName);
-    if (tableCache == null) {
-      return null;
-    }
-    Map.Entry<byte[], HRegionLocation> entry = tableCache.floorEntry(row);
-    if (entry == null) {
-      return null;
-    }
-    HRegionLocation loc = entry.getValue();
-    byte[] endKey = loc.getRegionInfo().getEndKey();
-    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
-      return loc;
-    } else {
-      return null;
-    }
-  }
-
-  private void onScanComplete(CompletableFuture<HRegionLocation> future, TableName tableName,
-      byte[] row, List<Result> results, Throwable error, String rowNameInErrorMsg,
-      Consumer<HRegionLocation> otherCheck) {
-    if (error != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Failed to fetch location of '" + tableName + "', " + rowNameInErrorMsg + "='"
-            + Bytes.toStringBinary(row) + "'",
-          error);
-      }
-      future.completeExceptionally(error);
-      return;
-    }
-    if (results.isEmpty()) {
-      future.completeExceptionally(new TableNotFoundException(tableName));
-      return;
-    }
-    RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='"
-          + Bytes.toStringBinary(row) + "' is " + locs);
-    }
-    if (locs == null || locs.getDefaultRegionLocation() == null) {
-      future.completeExceptionally(
-        new IOException(String.format("No location found for '%s', %s='%s'", tableName,
-          rowNameInErrorMsg, Bytes.toStringBinary(row))));
-      return;
-    }
-    HRegionLocation loc = locs.getDefaultRegionLocation();
-    HRegionInfo info = loc.getRegionInfo();
-    if (info == null) {
-      future.completeExceptionally(
-        new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName,
-          rowNameInErrorMsg, Bytes.toStringBinary(row))));
-      return;
-    }
-    if (!info.getTable().equals(tableName)) {
-      future.completeExceptionally(new TableNotFoundException(
-          "Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"));
-      return;
-    }
-    if (info.isSplit()) {
-      future.completeExceptionally(new RegionOfflineException(
-          "the only available region for the required row is a split parent,"
-              + " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"));
-      return;
-    }
-    if (info.isOffline()) {
-      future.completeExceptionally(new RegionOfflineException("the region is offline, could"
-          + " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"));
-      return;
-    }
-    if (loc.getServerName() == null) {
-      future.completeExceptionally(new NoServerForRegionException(
-          String.format("No server address listed for region '%s', %s='%s'",
-            info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(row))));
-      return;
-    }
-    otherCheck.accept(loc);
-    addToCache(loc);
-    future.complete(loc);
-  }
-
-  private CompletableFuture<HRegionLocation> locateInMeta(TableName tableName, byte[] row) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(row) + "' in meta");
-    }
-    CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
-    byte[] metaKey = createRegionName(tableName, row, NINES, false);
-    conn.getRawTable(META_TABLE_NAME)
-        .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
-        .whenComplete(
-          (results, error) -> onScanComplete(future, tableName, row, results, error, "row", loc -> {
-          }));
-    return future;
-  }
-
-  private CompletableFuture<HRegionLocation> locateRegion(TableName tableName, byte[] row) {
-    HRegionLocation loc = locateInCache(tableName, row);
-    if (loc != null) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
-            + Bytes.toStringBinary(row) + "'");
-      }
-      return CompletableFuture.completedFuture(loc);
-    }
-    return locateInMeta(tableName, row);
-  }
-
   private CompletableFuture<HRegionLocation> withTimeout(CompletableFuture<HRegionLocation> future,
       long timeoutNs, Supplier<String> timeoutMsg) {
     if (future.isDone() || timeoutNs <= 0) {
       return future;
     }
-    CompletableFuture<HRegionLocation> timeoutFuture = new CompletableFuture<>();
-    Timeout timeoutTask = retryTimer.newTimeout(
-      t -> timeoutFuture.completeExceptionally(new TimeoutIOException(timeoutMsg.get())), timeoutNs,
-      TimeUnit.NANOSECONDS);
-    future.whenComplete((loc, error) -> {
-      timeoutTask.cancel();
-      if (error != null) {
-        timeoutFuture.completeExceptionally(error);
-      } else {
-        timeoutFuture.complete(loc);
+    Timeout timeoutTask = retryTimer.newTimeout(t -> {
+      if (future.isDone()) {
+        return;
+      }
+      future.completeExceptionally(new TimeoutIOException(timeoutMsg.get()));
+    }, timeoutNs, TimeUnit.NANOSECONDS);
+    return future.whenComplete((loc, error) -> {
+      if (error.getClass() != TimeoutIOException.class) {
+        // cancel timeout task if we are not completed by it.
+        timeoutTask.cancel();
       }
     });
-    return timeoutFuture;
   }
 
   CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
       long timeoutNs) {
     CompletableFuture<HRegionLocation> future =
-        tableName.equals(META_TABLE_NAME) ? locateMetaRegion() : locateRegion(tableName, row);
+        tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation()
+            : nonMetaRegionLocator.getRegionLocation(tableName, row);
     return withTimeout(future, timeoutNs,
       () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
           + "ms) waiting for region location for " + tableName + ", row='"
           + Bytes.toStringBinary(row) + "'");
   }
 
-  private HRegionLocation locatePreviousInCache(TableName tableName,
-      byte[] startRowOfCurrentRegion) {
-    ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = cache.get(tableName);
-    if (tableCache == null) {
-      return null;
-    }
-    Map.Entry<byte[], HRegionLocation> entry;
-    if (isEmptyStopRow(startRowOfCurrentRegion)) {
-      entry = tableCache.lastEntry();
-    } else {
-      entry = tableCache.lowerEntry(startRowOfCurrentRegion);
-    }
-    if (entry == null) {
-      return null;
-    }
-    HRegionLocation loc = entry.getValue();
-    if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) {
-      return loc;
-    } else {
-      return null;
-    }
-  }
-
-  private CompletableFuture<HRegionLocation> locatePreviousInMeta(TableName tableName,
-      byte[] startRowOfCurrentRegion) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='"
-          + Bytes.toStringBinary(startRowOfCurrentRegion) + "' in meta");
-    }
-    byte[] metaKey;
-    if (isEmptyStopRow(startRowOfCurrentRegion)) {
-      byte[] binaryTableName = tableName.getName();
-      metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
-    } else {
-      metaKey = createRegionName(tableName, startRowOfCurrentRegion, ZEROES, false);
-    }
-    CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
-    conn.getRawTable(META_TABLE_NAME)
-        .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
-        .whenComplete((results, error) -> onScanComplete(future, tableName, startRowOfCurrentRegion,
-          results, error, "startRowOfCurrentRegion", loc -> {
-            HRegionInfo info = loc.getRegionInfo();
-            if (!Bytes.equals(info.getEndKey(), startRowOfCurrentRegion)) {
-              future.completeExceptionally(new IOException("The end key of '"
-                  + info.getRegionNameAsString() + "' is '" + Bytes.toStringBinary(info.getEndKey())
-                  + "', expected '" + Bytes.toStringBinary(startRowOfCurrentRegion) + "'"));
-            }
-          }));
-    return future;
-  }
-
-  private CompletableFuture<HRegionLocation> locatePreviousRegion(TableName tableName,
-      byte[] startRowOfCurrentRegion) {
-    HRegionLocation loc = locatePreviousInCache(tableName, startRowOfCurrentRegion);
-    if (loc != null) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='"
-            + Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
-      }
-      return CompletableFuture.completedFuture(loc);
-    }
-    return locatePreviousInMeta(tableName, startRowOfCurrentRegion);
-  }
-
   /**
-   * Locate the previous region using the current regions start key. Used for reverse scan.
-   * <p>
-   * TODO: need to deal with region merge where the startRowOfCurrentRegion will not be the endRow
-   * of a region.
+   * Locate the previous region using the current regions start key. Used for reverse scan as the
+   * end key is not included in a region so we need to treat it differently.
    */
   CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
       byte[] startRowOfCurrentRegion, long timeoutNs) {
-    CompletableFuture<HRegionLocation> future = tableName.equals(META_TABLE_NAME)
-        ? locateMetaRegion() : locatePreviousRegion(tableName, startRowOfCurrentRegion);
+    // meta region can not be split right now so we call the same method as getRegionLocation.
+    // Change it later if the meta table can have more than one regions.
+    CompletableFuture<HRegionLocation> future =
+        tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation()
+            : nonMetaRegionLocator.getPreviousRegionLocation(tableName, startRowOfCurrentRegion);
     return withTimeout(future, timeoutNs,
       () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
           + "ms) waiting for region location for " + tableName + ", startRowOfCurrentRegion='"
           + Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
   }
 
-  private boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
+  static boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
     // Do not need to update if no such location, or the location is newer, or the location is not
     // same with us
     return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum()
         && oldLoc.getServerName().equals(loc.getServerName());
   }
 
-  private void updateCachedLoation(HRegionLocation loc, Throwable exception,
+  static void updateCachedLoation(HRegionLocation loc, Throwable exception,
       Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
       Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
     HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
@@ -445,34 +153,9 @@ class AsyncRegionLocator {
 
   void updateCachedLocation(HRegionLocation loc, Throwable exception) {
     if (loc.getRegionInfo().isMetaTable()) {
-      updateCachedLoation(loc, exception, l -> metaRegionLocation.get(), newLoc -> {
-        for (;;) {
-          HRegionLocation oldLoc = metaRegionLocation.get();
-          if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum()
-              || oldLoc.getServerName().equals(newLoc.getServerName()))) {
-            return;
-          }
-          if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
-            return;
-          }
-        }
-      }, l -> {
-        for (;;) {
-          HRegionLocation oldLoc = metaRegionLocation.get();
-          if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
-            return;
-          }
-        }
-      });
+      metaRegionLocator.updateCachedLocation(loc, exception);
     } else {
-      updateCachedLoation(loc, exception, l -> {
-        ConcurrentNavigableMap<byte[], HRegionLocation> tableCache =
-            cache.get(l.getRegionInfo().getTable());
-        if (tableCache == null) {
-          return null;
-        }
-        return tableCache.get(l.getRegionInfo().getStartKey());
-      }, this::addToCache, this::removeFromCache);
+      nonMetaRegionLocator.updateCachedLocation(loc, exception);
     }
   }
 
@@ -480,6 +163,10 @@ class AsyncRegionLocator {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Clear meta cache for " + tableName);
     }
-    cache.remove(tableName);
+    if (tableName.equals(META_TABLE_NAME)) {
+      metaRegionLocator.clearCache();
+    } else {
+      nonMetaRegionLocator.clearCache(tableName);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
index fe988aa..d24501d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.IntStream;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
@@ -97,7 +98,7 @@ public class TestAsyncGetMultiThread {
 
   @AfterClass
   public static void tearDown() throws Exception {
-    CONN.close();
+    IOUtils.closeQuietly(CONN);
     TEST_UTIL.shutdownMiniCluster();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
new file mode 100644
index 0000000..f3aa26b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.IntStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncNonMetaRegionLocator {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static AsyncConnectionImpl CONN;
+
+  private static AsyncNonMetaRegionLocator LOCATOR;
+
+  private static byte[][] SPLIT_KEYS;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.getAdmin().setBalancerRunning(false, true);
+    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
+    LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+    SPLIT_KEYS = new byte[8][];
+    for (int i = 111; i < 999; i += 111) {
+      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    IOUtils.closeQuietly(CONN);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @After
+  public void tearDownAfterTest() throws IOException {
+    Admin admin = TEST_UTIL.getAdmin();
+    if (admin.tableExists(TABLE_NAME)) {
+      if (admin.isTableEnabled(TABLE_NAME)) {
+        TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+      }
+      TEST_UTIL.getAdmin().deleteTable(TABLE_NAME);
+    }
+    LOCATOR.clearCache(TABLE_NAME);
+  }
+
+  private void createSingleRegionTable() throws IOException, InterruptedException {
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+  }
+
+  @Test
+  public void testNoTable() throws InterruptedException {
+    try {
+      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+    }
+    try {
+      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+    }
+  }
+
+  @Test
+  public void testDisableTable() throws IOException, InterruptedException {
+    createSingleRegionTable();
+    TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+    try {
+      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+    }
+    try {
+      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
+    }
+  }
+
+  private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
+      HRegionLocation loc) {
+    HRegionInfo info = loc.getRegionInfo();
+    assertEquals(TABLE_NAME, info.getTable());
+    assertArrayEquals(startKey, info.getStartKey());
+    assertArrayEquals(endKey, info.getEndKey());
+    assertEquals(serverName, loc.getServerName());
+  }
+
+  @Test
+  public void testSingleRegionTable() throws IOException, InterruptedException, ExecutionException {
+    createSingleRegionTable();
+    ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
+    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+    byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
+    ThreadLocalRandom.current().nextBytes(randKey);
+    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+      LOCATOR.getRegionLocation(TABLE_NAME, randKey).get());
+    // Use a key which is not the endKey of a region will cause error
+    try {
+      assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
+        LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }).get());
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(IOException.class));
+      assertTrue(e.getCause().getMessage().contains("end key of"));
+    }
+  }
+
+  private void createMultiRegionTable() throws IOException, InterruptedException {
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+  }
+
+  private static byte[][] getStartKeys() {
+    byte[][] startKeys = new byte[SPLIT_KEYS.length + 1][];
+    startKeys[0] = EMPTY_START_ROW;
+    System.arraycopy(SPLIT_KEYS, 0, startKeys, 1, SPLIT_KEYS.length);
+    return startKeys;
+  }
+
+  private static byte[][] getEndKeys() {
+    byte[][] endKeys = Arrays.copyOf(SPLIT_KEYS, SPLIT_KEYS.length + 1);
+    endKeys[endKeys.length - 1] = EMPTY_START_ROW;
+    return endKeys;
+  }
+
+  @Test
+  public void testMultiRegionTable() throws IOException, InterruptedException {
+    createMultiRegionTable();
+    byte[][] startKeys = getStartKeys();
+    ServerName[] serverNames = new ServerName[startKeys.length];
+    TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+        .forEach(rs -> {
+          rs.getOnlineRegions(TABLE_NAME).forEach(r -> {
+            serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
+              Bytes::compareTo)] = rs.getServerName();
+          });
+        });
+    IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
+      try {
+        assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
+          serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i]).get());
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    }));
+    LOCATOR.clearCache(TABLE_NAME);
+    byte[][] endKeys = getEndKeys();
+    IntStream.range(0, 2).forEach(
+      n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
+        try {
+          assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
+            LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i]).get());
+        } catch (InterruptedException | ExecutionException e) {
+          throw new RuntimeException(e);
+        }
+      }));
+  }
+
+  @Test
+  public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
+    createSingleRegionTable();
+    ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
+    HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
+    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
+    ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+        .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
+        .findAny().get();
+
+    TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegionInfo().getEncodedName()),
+      Bytes.toBytes(newServerName.getServerName()));
+    while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName()
+        .equals(newServerName)) {
+      Thread.sleep(100);
+    }
+    // Should be same as it is in cache
+    assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+    LOCATOR.updateCachedLocation(loc, null);
+    // null error will not trigger a cache cleanup
+    assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+    LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
+    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
+      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
new file mode 100644
index 0000000..e82703b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.apache.hadoop.hbase.client.AsyncNonMetaRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static AsyncConnectionImpl CONN;
+
+  private static AsyncNonMetaRegionLocator LOCATOR;
+
+  private static byte[][] SPLIT_KEYS;
+
+  private static int MAX_ALLOWED = 2;
+
+  private static AtomicInteger CONCURRENCY = new AtomicInteger(0);
+
+  private static AtomicInteger MAX_CONCURRENCY = new AtomicInteger(0);
+
+  public static final class CountingRegionObserver extends BaseRegionObserver {
+
+    @Override
+    public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
+        RegionScanner s) throws IOException {
+      if (e.getEnvironment().getRegionInfo().isMetaTable()) {
+        int concurrency = CONCURRENCY.incrementAndGet();
+        for (;;) {
+          int max = MAX_CONCURRENCY.get();
+          if (concurrency <= max) {
+            break;
+          }
+          if (MAX_CONCURRENCY.compareAndSet(max, concurrency)) {
+            break;
+          }
+        }
+        Threads.sleepWithoutInterrupt(10);
+      }
+      return s;
+    }
+
+    @Override
+    public void postScannerClose(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s)
+        throws IOException {
+      if (e.getEnvironment().getRegionInfo().isMetaTable()) {
+        CONCURRENCY.decrementAndGet();
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(REGION_COPROCESSOR_CONF_KEY, CountingRegionObserver.class.getName());
+    conf.setInt(MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, MAX_ALLOWED);
+    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.getAdmin().setBalancerRunning(false, true);
+    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
+    LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+    SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
+        .toArray(byte[][]::new);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    IOUtils.closeQuietly(CONN);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void assertLocs(List<CompletableFuture<HRegionLocation>> futures)
+      throws InterruptedException, ExecutionException {
+    assertEquals(256, futures.size());
+    for (int i = 0; i < futures.size(); i++) {
+      HRegionLocation loc = futures.get(i).get();
+      if (i == 0) {
+        assertTrue(isEmptyStartRow(loc.getRegionInfo().getStartKey()));
+      } else {
+        assertEquals(String.format("%02x", i), Bytes.toString(loc.getRegionInfo().getStartKey()));
+      }
+      if (i == futures.size() - 1) {
+        assertTrue(isEmptyStopRow(loc.getRegionInfo().getEndKey()));
+      } else {
+        assertEquals(String.format("%02x", i + 1), Bytes.toString(loc.getRegionInfo().getEndKey()));
+      }
+    }
+  }
+
+  @Test
+  public void test() throws InterruptedException, ExecutionException {
+    List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 128)
+        .mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
+        .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r)).collect(toCollection(ArrayList::new));
+    futures.addAll(IntStream.range(129, 257)
+        .mapToObj(i -> i < 256 ? Bytes.toBytes(String.format("%02x", i)) : EMPTY_START_ROW)
+        .map(r -> LOCATOR.getPreviousRegionLocation(TABLE_NAME, r)).collect(toList()));
+    assertLocs(futures);
+    assertTrue(MAX_CONCURRENCY.get() <= MAX_ALLOWED);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
deleted file mode 100644
index a679192..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
-import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.IntStream;
-
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncRegionLocator {
-
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  private static TableName TABLE_NAME = TableName.valueOf("async");
-
-  private static byte[] FAMILY = Bytes.toBytes("cf");
-
-  private static AsyncConnectionImpl CONN;
-
-  private static AsyncRegionLocator LOCATOR;
-
-  private static byte[][] SPLIT_KEYS;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    TEST_UTIL.startMiniCluster(3);
-    TEST_UTIL.getAdmin().setBalancerRunning(false, true);
-    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
-    LOCATOR = CONN.getLocator();
-    SPLIT_KEYS = new byte[8][];
-    for (int i = 111; i < 999; i += 111) {
-      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
-    }
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    CONN.close();
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  @After
-  public void tearDownAfterTest() throws IOException {
-    Admin admin = TEST_UTIL.getAdmin();
-    if (admin.tableExists(TABLE_NAME)) {
-      if (admin.isTableEnabled(TABLE_NAME)) {
-        TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
-      }
-      TEST_UTIL.getAdmin().deleteTable(TABLE_NAME);
-    }
-    LOCATOR.clearCache(TABLE_NAME);
-  }
-
-  private void createSingleRegionTable() throws IOException, InterruptedException {
-    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
-    TEST_UTIL.waitTableAvailable(TABLE_NAME);
-  }
-
-  @Test
-  public void testNoTable() throws InterruptedException {
-    try {
-      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
-    } catch (ExecutionException e) {
-      assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
-    }
-    try {
-      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get();
-    } catch (ExecutionException e) {
-      assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
-    }
-  }
-
-  @Test
-  public void testDisableTable() throws IOException, InterruptedException {
-    createSingleRegionTable();
-    TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
-    try {
-      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
-    } catch (ExecutionException e) {
-      assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
-    }
-    try {
-      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get();
-    } catch (ExecutionException e) {
-      assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
-    }
-  }
-
-  private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
-      HRegionLocation loc) {
-    HRegionInfo info = loc.getRegionInfo();
-    assertEquals(TABLE_NAME, info.getTable());
-    assertArrayEquals(startKey, info.getStartKey());
-    assertArrayEquals(endKey, info.getEndKey());
-    assertEquals(serverName, loc.getServerName());
-  }
-
-  @Test
-  public void testSingleRegionTable() throws IOException, InterruptedException, ExecutionException {
-    createSingleRegionTable();
-    ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
-    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
-    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-      LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
-    byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
-    ThreadLocalRandom.current().nextBytes(randKey);
-    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-      LOCATOR.getRegionLocation(TABLE_NAME, randKey, 0L).get());
-    // Use a key which is not the endKey of a region will cause error
-    try {
-      assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-        LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }, 0L).get());
-    } catch (ExecutionException e) {
-      assertThat(e.getCause(), instanceOf(IOException.class));
-      assertTrue(e.getCause().getMessage().contains("end key of"));
-    }
-  }
-
-  private void createMultiRegionTable() throws IOException, InterruptedException {
-    TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
-    TEST_UTIL.waitTableAvailable(TABLE_NAME);
-  }
-
-  private static byte[][] getStartKeys() {
-    byte[][] startKeys = new byte[SPLIT_KEYS.length + 1][];
-    startKeys[0] = EMPTY_START_ROW;
-    System.arraycopy(SPLIT_KEYS, 0, startKeys, 1, SPLIT_KEYS.length);
-    return startKeys;
-  }
-
-  private static byte[][] getEndKeys() {
-    byte[][] endKeys = Arrays.copyOf(SPLIT_KEYS, SPLIT_KEYS.length + 1);
-    endKeys[endKeys.length - 1] = EMPTY_START_ROW;
-    return endKeys;
-  }
-
-  @Test
-  public void testMultiRegionTable() throws IOException, InterruptedException {
-    createMultiRegionTable();
-    byte[][] startKeys = getStartKeys();
-    ServerName[] serverNames = new ServerName[startKeys.length];
-    TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
-        .forEach(rs -> {
-          rs.getOnlineRegions(TABLE_NAME).forEach(r -> {
-            serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
-              Bytes::compareTo)] = rs.getServerName();
-          });
-        });
-    IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
-      try {
-        assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
-          serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], 0L).get());
-      } catch (InterruptedException | ExecutionException e) {
-        throw new RuntimeException(e);
-      }
-    }));
-    LOCATOR.clearCache(TABLE_NAME);
-    byte[][] endKeys = getEndKeys();
-    IntStream.range(0, 2).forEach(
-      n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
-        try {
-          assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
-            LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i], 0L).get());
-        } catch (InterruptedException | ExecutionException e) {
-          throw new RuntimeException(e);
-        }
-      }));
-  }
-
-  @Test
-  public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
-    createSingleRegionTable();
-    ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
-    HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
-    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
-    ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
-        .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
-        .findAny().get();
-
-    TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegionInfo().getEncodedName()),
-      Bytes.toBytes(newServerName.getServerName()));
-    while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName()
-        .equals(newServerName)) {
-      Thread.sleep(100);
-    }
-    // Should be same as it is in cache
-    assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
-    LOCATOR.updateCachedLocation(loc, null);
-    // null error will not trigger a cache cleanup
-    assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
-    LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
-    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
-      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
index 2a902a6..40190cb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -90,7 +91,7 @@ public class TestAsyncRegionLocatorTimeout {
 
   @AfterClass
   public static void tearDown() throws Exception {
-    CONN.close();
+    IOUtils.closeQuietly(CONN);
     TEST_UTIL.shutdownMiniCluster();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index 7a85727..bb6cc2d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -103,7 +104,7 @@ public class TestAsyncTable {
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
-    ASYNC_CONN.close();
+    IOUtils.closeQuietly(ASYNC_CONN);
     TEST_UTIL.shutdownMiniCluster();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f041306c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index c8e1c7a..ea999f9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.security.User;
@@ -88,7 +89,7 @@ public class TestAsyncTableNoncedRetry {
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
-    ASYNC_CONN.close();
+    IOUtils.closeQuietly(ASYNC_CONN);
     TEST_UTIL.shutdownMiniCluster();
   }