You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/04 06:40:34 UTC

[08/15] hbase git commit: HBASE-17356 Add replica get support

HBASE-17356 Add replica get support


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/db66e6cc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/db66e6cc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/db66e6cc

Branch: refs/heads/HBASE-21512
Commit: db66e6cc9e1c6ea027631388aba688cb623b7d0a
Parents: e4b6b4a
Author: zhangduo <zh...@apache.org>
Authored: Tue Jan 1 21:59:37 2019 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Jan 3 08:38:20 2019 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/RegionLocations.java    |   30 +-
 .../client/AsyncBatchRpcRetryingCaller.java     |  114 +-
 .../client/AsyncConnectionConfiguration.java    |   12 +
 .../hbase/client/AsyncConnectionImpl.java       |    1 -
 .../hbase/client/AsyncMetaRegionLocator.java    |  125 +-
 .../hbase/client/AsyncNonMetaRegionLocator.java |  291 +--
 .../hadoop/hbase/client/AsyncRegionLocator.java |  129 +-
 .../hbase/client/AsyncRegionLocatorHelper.java  |  147 ++
 .../hbase/client/AsyncRpcRetryingCaller.java    |   15 +-
 .../client/AsyncRpcRetryingCallerFactory.java   |   55 +-
 .../AsyncSingleRequestRpcRetryingCaller.java    |   71 +-
 .../hbase/client/AsyncTableRegionLocator.java   |   28 +-
 .../client/AsyncTableRegionLocatorImpl.java     |    6 +-
 .../hbase/client/ConnectionConfiguration.java   |    5 +-
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 2033 +++++++++---------
 .../hadoop/hbase/client/RawAsyncTableImpl.java  |  208 +-
 .../apache/hadoop/hbase/util/FutureUtils.java   |   60 +
 .../hbase/client/RegionReplicaTestHelper.java   |  161 ++
 .../client/TestAsyncMetaRegionLocator.java      |   55 +-
 .../client/TestAsyncNonMetaRegionLocator.java   |  126 +-
 ...syncNonMetaRegionLocatorConcurrenyLimit.java |   20 +-
 ...TestAsyncSingleRequestRpcRetryingCaller.java |   56 +-
 .../client/TestAsyncTableLocatePrefetch.java    |    4 +-
 .../client/TestAsyncTableRegionReplicasGet.java |  204 ++
 .../hbase/client/TestZKAsyncRegistry.java       |   44 +-
 25 files changed, 2366 insertions(+), 1634 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
index fd6f3c7..f98bf03 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
@@ -56,8 +56,8 @@ public class RegionLocations {
     int index = 0;
     for (HRegionLocation loc : locations) {
       if (loc != null) {
-        if (loc.getRegionInfo().getReplicaId() >= maxReplicaId) {
-          maxReplicaId = loc.getRegionInfo().getReplicaId();
+        if (loc.getRegion().getReplicaId() >= maxReplicaId) {
+          maxReplicaId = loc.getRegion().getReplicaId();
           maxReplicaIdIndex = index;
         }
       }
@@ -72,7 +72,7 @@ public class RegionLocations {
       this.locations = new HRegionLocation[maxReplicaId + 1];
       for (HRegionLocation loc : locations) {
         if (loc != null) {
-          this.locations[loc.getRegionInfo().getReplicaId()] = loc;
+          this.locations[loc.getRegion().getReplicaId()] = loc;
         }
       }
     }
@@ -146,7 +146,7 @@ public class RegionLocations {
   public RegionLocations remove(HRegionLocation location) {
     if (location == null) return this;
     if (location.getRegion() == null) return this;
-    int replicaId = location.getRegionInfo().getReplicaId();
+    int replicaId = location.getRegion().getReplicaId();
     if (replicaId >= locations.length) return this;
 
     // check whether something to remove. HRL.compareTo() compares ONLY the
@@ -203,14 +203,14 @@ public class RegionLocations {
     // in case of region replication going down, we might have a leak here.
     int max = other.locations.length;
 
-    HRegionInfo regionInfo = null;
+    RegionInfo regionInfo = null;
     for (int i = 0; i < max; i++) {
       HRegionLocation thisLoc = this.getRegionLocation(i);
       HRegionLocation otherLoc = other.getRegionLocation(i);
-      if (regionInfo == null && otherLoc != null && otherLoc.getRegionInfo() != null) {
+      if (regionInfo == null && otherLoc != null && otherLoc.getRegion() != null) {
         // regionInfo is the first non-null HRI from other RegionLocations. We use it to ensure that
         // all replica region infos belong to the same region with same region id.
-        regionInfo = otherLoc.getRegionInfo();
+        regionInfo = otherLoc.getRegion();
       }
 
       HRegionLocation selectedLoc = selectRegionLocation(thisLoc,
@@ -232,7 +232,7 @@ public class RegionLocations {
       for (int i=0; i < newLocations.length; i++) {
         if (newLocations[i] != null) {
           if (!RegionReplicaUtil.isReplicasForSameRegion(regionInfo,
-            newLocations[i].getRegionInfo())) {
+            newLocations[i].getRegion())) {
             newLocations[i] = null;
           }
         }
@@ -273,9 +273,9 @@ public class RegionLocations {
       boolean checkForEquals, boolean force) {
     assert location != null;
 
-    int replicaId = location.getRegionInfo().getReplicaId();
+    int replicaId = location.getRegion().getReplicaId();
 
-    HRegionLocation oldLoc = getRegionLocation(location.getRegionInfo().getReplicaId());
+    HRegionLocation oldLoc = getRegionLocation(location.getRegion().getReplicaId());
     HRegionLocation selectedLoc = selectRegionLocation(oldLoc, location,
       checkForEquals, force);
 
@@ -288,8 +288,8 @@ public class RegionLocations {
     // ensure that all replicas share the same start code. Otherwise delete them
     for (int i=0; i < newLocations.length; i++) {
       if (newLocations[i] != null) {
-        if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegionInfo(),
-          newLocations[i].getRegionInfo())) {
+        if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegion(),
+          newLocations[i].getRegion())) {
           newLocations[i] = null;
         }
       }
@@ -317,8 +317,8 @@ public class RegionLocations {
   public HRegionLocation getRegionLocationByRegionName(byte[] regionName) {
     for (HRegionLocation loc : locations) {
       if (loc != null) {
-        if (Bytes.equals(loc.getRegionInfo().getRegionName(), regionName)
-            || Bytes.equals(loc.getRegionInfo().getEncodedNameAsBytes(), regionName)) {
+        if (Bytes.equals(loc.getRegion().getRegionName(), regionName)
+            || Bytes.equals(loc.getRegion().getEncodedNameAsBytes(), regionName)) {
           return loc;
         }
       }
@@ -331,7 +331,7 @@ public class RegionLocations {
   }
 
   public HRegionLocation getDefaultRegionLocation() {
-    return locations[HRegionInfo.DEFAULT_REPLICA_ID];
+    return locations[RegionInfo.DEFAULT_REPLICA_ID];
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 51b89a9..e268b2e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -23,8 +23,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -43,24 +42,26 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * Retry caller for batch.
@@ -121,10 +122,10 @@ class AsyncBatchRpcRetryingCaller<T> {
   private static final class ServerRequest {
 
     public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
-        new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
 
     public void addAction(HRegionLocation loc, Action action) {
-      computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(),
+      computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
         () -> new RegionRequest(loc)).actions.add(action);
     }
   }
@@ -173,11 +174,10 @@ class AsyncBatchRpcRetryingCaller<T> {
       Throwable error, ServerName serverName) {
     if (tries > startLogErrorsCnt) {
       String regions =
-          regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'")
-              .collect(Collectors.joining(",", "[", "]"));
-      LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName
-          + " failed, tries=" + tries,
-        error);
+        regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'")
+          .collect(Collectors.joining(",", "[", "]"));
+      LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName +
+        " failed, tries=" + tries, error);
     }
   }
 
@@ -191,7 +191,7 @@ class AsyncBatchRpcRetryingCaller<T> {
       errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
     }
     errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
-        getExtraContextForError(serverName)));
+      getExtraContextForError(serverName)));
   }
 
   private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
@@ -204,7 +204,7 @@ class AsyncBatchRpcRetryingCaller<T> {
       return;
     }
     ThrowableWithExtraContext errorWithCtx =
-        new ThrowableWithExtraContext(error, currentTime, extras);
+      new ThrowableWithExtraContext(error, currentTime, extras);
     List<ThrowableWithExtraContext> errors = removeErrors(action);
     if (errors == null) {
       errors = Collections.singletonList(errorWithCtx);
@@ -227,7 +227,7 @@ class AsyncBatchRpcRetryingCaller<T> {
         return;
       }
       future.completeExceptionally(new RetriesExhaustedException(tries,
-          Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
+        Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
     });
   }
 
@@ -242,9 +242,9 @@ class AsyncBatchRpcRetryingCaller<T> {
       // multiRequestBuilder will be populated with region actions.
       // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
       // action list.
-      RequestConverter.buildNoDataRegionActions(entry.getKey(),
-        entry.getValue().actions, cells, multiRequestBuilder, regionActionBuilder, actionBuilder,
-        mutationBuilder, nonceGroup, rowMutationsIndexMap);
+      RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions, cells,
+        multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
+        rowMutationsIndexMap);
     }
     return multiRequestBuilder.build();
   }
@@ -254,15 +254,15 @@ class AsyncBatchRpcRetryingCaller<T> {
       RegionResult regionResult, List<Action> failedActions, Throwable regionException) {
     Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
     if (result == null) {
-      LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
-          + Bytes.toStringBinary(action.getAction().getRow()) + "' of "
-          + regionReq.loc.getRegionInfo().getRegionNameAsString());
+      LOG.error("Server " + serverName + " sent us neither result nor exception for row '" +
+        Bytes.toStringBinary(action.getAction().getRow()) + "' of " +
+        regionReq.loc.getRegion().getRegionNameAsString());
       addError(action, new RuntimeException("Invalid response"), serverName);
       failedActions.add(action);
     } else if (result instanceof Throwable) {
       Throwable error = translateException((Throwable) result);
       logException(tries, () -> Stream.of(regionReq), error, serverName);
-      conn.getLocator().updateCachedLocation(regionReq.loc, error);
+      conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
       if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
         failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
           getExtraContextForError(serverName));
@@ -281,20 +281,19 @@ class AsyncBatchRpcRetryingCaller<T> {
       RegionResult regionResult = resp.getResults().get(rn);
       Throwable regionException = resp.getException(rn);
       if (regionResult != null) {
-        regionReq.actions.forEach(
-          action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions,
-            regionException));
+        regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName,
+          regionResult, failedActions, regionException));
       } else {
         Throwable error;
         if (regionException == null) {
-          LOG.error(
-            "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
+          LOG
+            .error("Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
           error = new RuntimeException("Invalid response");
         } else {
           error = translateException(regionException);
         }
         logException(tries, () -> Stream.of(regionReq), error, serverName);
-        conn.getLocator().updateCachedLocation(regionReq.loc, error);
+        conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
         if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
           failAll(regionReq.actions.stream(), tries, error, serverName);
           return;
@@ -314,8 +313,7 @@ class AsyncBatchRpcRetryingCaller<T> {
       remainingNs = remainingTimeNs();
       if (remainingNs <= 0) {
         failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream())
-            .flatMap(r -> r.actions.stream()),
-          tries);
+          .flatMap(r -> r.actions.stream()), tries);
         return;
       }
     } else {
@@ -366,15 +364,15 @@ class AsyncBatchRpcRetryingCaller<T> {
       ServerName serverName) {
     Throwable error = translateException(t);
     logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
-    actionsByRegion
-        .forEach((rn, regionReq) -> conn.getLocator().updateCachedLocation(regionReq.loc, error));
+    actionsByRegion.forEach(
+      (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
     if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
       failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
         serverName);
       return;
     }
     List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
-        .collect(Collectors.toList());
+      .collect(Collectors.toList());
     addError(copiedActions, error, serverName);
     tryResubmit(copiedActions.stream(), tries);
   }
@@ -407,30 +405,30 @@ class AsyncBatchRpcRetryingCaller<T> {
     }
     ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
     ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
-    CompletableFuture.allOf(actions
-        .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
-          RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
-            if (error != null) {
-              error = translateException(error);
-              if (error instanceof DoNotRetryIOException) {
-                failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
-                return;
-              }
-              addError(action, error, null);
-              locateFailed.add(action);
-            } else {
-              computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new)
-                  .addAction(loc, action);
+    addListener(CompletableFuture.allOf(actions
+      .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
+        RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
+          if (error != null) {
+            error = translateException(error);
+            if (error instanceof DoNotRetryIOException) {
+              failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
+              return;
             }
-          }))
-        .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {
-          if (!actionsByServer.isEmpty()) {
-            send(actionsByServer, tries);
-          }
-          if (!locateFailed.isEmpty()) {
-            tryResubmit(locateFailed.stream(), tries);
+            addError(action, error, null);
+            locateFailed.add(action);
+          } else {
+            computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc,
+              action);
           }
-        });
+        }))
+      .toArray(CompletableFuture[]::new)), (v, r) -> {
+        if (!actionsByServer.isEmpty()) {
+          send(actionsByServer, tries);
+        }
+        if (!locateFailed.isEmpty()) {
+          tryResubmit(locateFailed.stream(), tries);
+        }
+      });
   }
 
   public List<CompletableFuture<T>> call() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 915e9dd..fa051a5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -39,6 +39,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
 import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
@@ -94,6 +96,10 @@ class AsyncConnectionConfiguration {
 
   private final long writeBufferPeriodicFlushTimeoutNs;
 
+  // this is for supporting region replica get, if the primary does not finished within this
+  // timeout, we will send request to secondaries.
+  private final long primaryCallTimeoutNs;
+
   @SuppressWarnings("deprecation")
   AsyncConnectionConfiguration(Configuration conf) {
     this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
@@ -124,6 +130,8 @@ class AsyncConnectionConfiguration {
     this.writeBufferPeriodicFlushTimeoutNs =
       TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
         WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
+    this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
+      conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
   }
 
   long getMetaOperationTimeoutNs() {
@@ -181,4 +189,8 @@ class AsyncConnectionConfiguration {
   long getWriteBufferPeriodicFlushTimeoutNs() {
     return writeBufferPeriodicFlushTimeoutNs;
   }
+
+  long getPrimaryCallTimeoutNs() {
+    return primaryCallTimeoutNs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 078395b..361d5b2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -152,7 +152,6 @@ class AsyncConnectionImpl implements AsyncConnection {
   }
 
   // we will override this method for testing retry caller, so do not remove this method.
-  @VisibleForTesting
   AsyncRegionLocator getLocator() {
     return locator;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
index 06b5b57..9fef15d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
@@ -17,11 +17,16 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.AsyncRegionLocator.*;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,43 +41,43 @@ class AsyncMetaRegionLocator {
 
   private final AsyncRegistry registry;
 
-  private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
+  private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();
 
-  private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
-      new AtomicReference<>();
+  private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture =
+    new AtomicReference<>();
 
   AsyncMetaRegionLocator(AsyncRegistry registry) {
     this.registry = registry;
   }
 
-  CompletableFuture<HRegionLocation> getRegionLocation(boolean reload) {
+  /**
+   * Get the region locations for meta region. If the location for the given replica is not
+   * available in the cached locations, then fetch from the HBase cluster.
+   * <p/>
+   * The <code>replicaId</code> parameter is important. If the region replication config for meta
+   * region is changed, then the cached region locations may not have the locations for new
+   * replicas. If we do not check the location for the given replica, we will always return the
+   * cached region locations and cause an infinite loop.
+   */
+  CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
     for (;;) {
       if (!reload) {
-        HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
-        if (metaRegionLocation != null) {
-          return CompletableFuture.completedFuture(metaRegionLocation);
+        RegionLocations locs = this.metaRegionLocations.get();
+        if (isGood(locs, replicaId)) {
+          return CompletableFuture.completedFuture(locs);
         }
       }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Meta region location cache is null, try fetching from registry.");
-      }
+      LOG.trace("Meta region location cache is null, try fetching from registry.");
       if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Start fetching meta region location from registry.");
-        }
-        CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+        LOG.debug("Start fetching meta region location from registry.");
+        CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
         registry.getMetaRegionLocation().whenComplete((locs, error) -> {
           if (error != null) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Failed to fetch meta region location from registry", error);
-            }
+            LOG.debug("Failed to fetch meta region location from registry", error);
             metaRelocateFuture.getAndSet(null).completeExceptionally(error);
             return;
           }
-          HRegionLocation loc = locs.getDefaultRegionLocation();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("The fetched meta region location is " + loc);
-          }
+          LOG.debug("The fetched meta region location is {}" + locs);
           // Here we update cache before reset future, so it is possible that someone can get a
           // stale value. Consider this:
           // 1. update cache
@@ -82,12 +87,12 @@ class AsyncMetaRegionLocator {
           // cleared in step 2.
           // But we do not think it is a big deal as it rarely happens, and even if it happens, the
           // caller will retry again later, no correctness problems.
-          this.metaRegionLocation.set(loc);
+          this.metaRegionLocations.set(locs);
           metaRelocateFuture.set(null);
-          future.complete(loc);
+          future.complete(locs);
         });
       } else {
-        CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+        CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
         if (future != null) {
           return future;
         }
@@ -95,30 +100,56 @@ class AsyncMetaRegionLocator {
     }
   }
 
-  void updateCachedLocation(HRegionLocation loc, Throwable exception) {
-    AsyncRegionLocator.updateCachedLocation(loc, exception, l -> metaRegionLocation.get(),
-      newLoc -> {
-        for (;;) {
-          HRegionLocation oldLoc = metaRegionLocation.get();
-          if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum() ||
-              oldLoc.getServerName().equals(newLoc.getServerName()))) {
-            return;
-          }
-          if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
-            return;
-          }
-        }
-      }, l -> {
-        for (;;) {
-          HRegionLocation oldLoc = metaRegionLocation.get();
-          if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
-            return;
-          }
+  private HRegionLocation getCacheLocation(HRegionLocation loc) {
+    RegionLocations locs = metaRegionLocations.get();
+    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
+  }
+
+  private void addLocationToCache(HRegionLocation loc) {
+    for (;;) {
+      int replicaId = loc.getRegion().getReplicaId();
+      RegionLocations oldLocs = metaRegionLocations.get();
+      if (oldLocs == null) {
+        RegionLocations newLocs = createRegionLocations(loc);
+        if (metaRegionLocations.compareAndSet(null, newLocs)) {
+          return;
         }
-      });
+      }
+      HRegionLocation oldLoc = oldLocs.getRegionLocation(replicaId);
+      if (oldLoc != null && (oldLoc.getSeqNum() > loc.getSeqNum() ||
+        oldLoc.getServerName().equals(loc.getServerName()))) {
+        return;
+      }
+      RegionLocations newLocs = replaceRegionLocation(oldLocs, loc);
+      if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
+        return;
+      }
+    }
+  }
+
+  private void removeLocationFromCache(HRegionLocation loc) {
+    for (;;) {
+      RegionLocations oldLocs = metaRegionLocations.get();
+      if (oldLocs == null) {
+        return;
+      }
+      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
+      if (!canUpdateOnError(loc, oldLoc)) {
+        return;
+      }
+      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
+      if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
+        return;
+      }
+    }
+  }
+
+  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
+      this::addLocationToCache, this::removeLocationFromCache);
   }
 
   void clearCache() {
-    metaRegionLocation.set(null);
+    metaRegionLocations.set(null);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 7e3d56c..1fcfbb0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.HConstants.NINES;
 import static org.apache.hadoop.hbase.HConstants.ZEROES;
 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.mergeRegionLocations;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
 import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
@@ -39,7 +44,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
@@ -53,6 +60,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Objects;
 
 /**
  * The asynchronous locator for regions other than meta.
@@ -83,9 +91,9 @@ class AsyncNonMetaRegionLocator {
 
   private static final class LocateRequest {
 
-    public final byte[] row;
+    private final byte[] row;
 
-    public final RegionLocateType locateType;
+    private final RegionLocateType locateType;
 
     public LocateRequest(byte[] row, RegionLocateType locateType) {
       this.row = row;
@@ -109,12 +117,12 @@ class AsyncNonMetaRegionLocator {
 
   private static final class TableCache {
 
-    public final ConcurrentNavigableMap<byte[], HRegionLocation> cache =
+    private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
       new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
 
-    public final Set<LocateRequest> pendingRequests = new HashSet<>();
+    private final Set<LocateRequest> pendingRequests = new HashSet<>();
 
-    public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests =
+    private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
       new LinkedHashMap<>();
 
     public boolean hasQuota(int max) {
@@ -133,25 +141,29 @@ class AsyncNonMetaRegionLocator {
       return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
     }
 
-    public void clearCompletedRequests(Optional<HRegionLocation> location) {
-      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter =
+    public void clearCompletedRequests(Optional<RegionLocations> locations) {
+      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
         allRequests.entrySet().iterator(); iter.hasNext();) {
-        Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
-        if (tryComplete(entry.getKey(), entry.getValue(), location)) {
+        Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
+        if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
           iter.remove();
         }
       }
     }
 
-    private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
-        Optional<HRegionLocation> location) {
+    private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
+        Optional<RegionLocations> locations) {
       if (future.isDone()) {
         return true;
       }
-      if (!location.isPresent()) {
+      if (!locations.isPresent()) {
         return false;
       }
-      HRegionLocation loc = location.get();
+      RegionLocations locs = locations.get();
+      HRegionLocation loc = ObjectUtils.firstNonNull(locs.getRegionLocations());
+      // we should at least have one location available, otherwise the request should fail and
+      // should not arrive here
+      assert loc != null;
       boolean completed;
       if (req.locateType.equals(RegionLocateType.BEFORE)) {
         // for locating the row before current row, the common case is to find the previous region
@@ -166,7 +178,7 @@ class AsyncNonMetaRegionLocator {
         completed = loc.getRegion().containsRow(req.row);
       }
       if (completed) {
-        future.complete(loc);
+        future.complete(locs);
         return true;
       } else {
         return false;
@@ -186,59 +198,59 @@ class AsyncNonMetaRegionLocator {
     return computeIfAbsent(cache, tableName, TableCache::new);
   }
 
-  private void removeFromCache(HRegionLocation loc) {
-    TableCache tableCache = cache.get(loc.getRegion().getTable());
-    if (tableCache == null) {
-      return;
+  private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
+    HRegionLocation[] locArr1 = locs1.getRegionLocations();
+    HRegionLocation[] locArr2 = locs2.getRegionLocations();
+    if (locArr1.length != locArr2.length) {
+      return false;
     }
-    tableCache.cache.computeIfPresent(loc.getRegion().getStartKey(), (k, oldLoc) -> {
-      if (oldLoc.getSeqNum() > loc.getSeqNum() ||
-        !oldLoc.getServerName().equals(loc.getServerName())) {
-        return oldLoc;
+    for (int i = 0; i < locArr1.length; i++) {
+      // do not need to compare region info
+      HRegionLocation loc1 = locArr1[i];
+      HRegionLocation loc2 = locArr2[i];
+      if (loc1 == null) {
+        if (loc2 != null) {
+          return false;
+        }
+      } else {
+        if (loc2 == null) {
+          return false;
+        }
+        if (loc1.getSeqNum() != loc2.getSeqNum()) {
+          return false;
+        }
+        if (Objects.equal(loc1.getServerName(), loc2.getServerName())) {
+          return false;
+        }
       }
-      return null;
-    });
+    }
+    return true;
   }
 
   // return whether we add this loc to cache
-  private boolean addToCache(TableCache tableCache, HRegionLocation loc) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Try adding " + loc + " to cache");
-    }
-    byte[] startKey = loc.getRegion().getStartKey();
-    HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc);
-    if (oldLoc == null) {
-      return true;
-    }
-    if (oldLoc.getSeqNum() > loc.getSeqNum() ||
-      oldLoc.getServerName().equals(loc.getServerName())) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc +
-          " is newer than us or has the same server name");
-      }
-      return false;
-    }
-    return loc == tableCache.cache.compute(startKey, (k, oldValue) -> {
-      if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
-        return loc;
+  private boolean addToCache(TableCache tableCache, RegionLocations locs) {
+    LOG.trace("Try adding {} to cache", locs);
+    byte[] startKey = locs.getDefaultRegionLocation().getRegion().getStartKey();
+    for (;;) {
+      RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
+      if (oldLocs == null) {
+        return true;
       }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue +
+      RegionLocations mergedLocs = mergeRegionLocations(locs, oldLocs);
+      if (isEqual(mergedLocs, oldLocs)) {
+        // the merged one is the same with the old one, give up
+        LOG.trace("Will not add {} to cache because the old value {} " +
           " is newer than us or has the same server name." +
-          " Maybe it is updated before we replace it");
+          " Maybe it is updated before we replace it", locs, oldLocs);
+        return false;
       }
-      return oldValue;
-    });
-  }
-
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
-      justification = "Called by lambda expression")
-  private void addToCache(HRegionLocation loc) {
-    addToCache(getTableCache(loc.getRegion().getTable()), loc);
-    LOG.trace("Try adding {} to cache", loc);
+      if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
+        return true;
+      }
+    }
   }
 
-  private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
+  private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
       Throwable error) {
     if (error != null) {
       LOG.warn("Failed to locate region in '" + tableName + "', row='" +
@@ -246,8 +258,8 @@ class AsyncNonMetaRegionLocator {
     }
     Optional<LocateRequest> toSend = Optional.empty();
     TableCache tableCache = getTableCache(tableName);
-    if (loc != null) {
-      if (!addToCache(tableCache, loc)) {
+    if (locs != null) {
+      if (!addToCache(tableCache, locs)) {
         // someone is ahead of us.
         synchronized (tableCache) {
           tableCache.pendingRequests.remove(req);
@@ -269,7 +281,7 @@ class AsyncNonMetaRegionLocator {
           future.completeExceptionally(error);
         }
       }
-      tableCache.clearCompletedRequests(Optional.ofNullable(loc));
+      tableCache.clearCompletedRequests(Optional.ofNullable(locs));
       // Remove a complete locate request in a synchronized block, so the table cache must have
       // quota to send a candidate request.
       toSend = tableCache.getCandidate();
@@ -286,9 +298,11 @@ class AsyncNonMetaRegionLocator {
         Bytes.toStringBinary(req.row), req.locateType, locs);
     }
 
+    // the default region location should always be presented when fetching from meta, otherwise
+    // let's fail the request.
     if (locs == null || locs.getDefaultRegionLocation() == null) {
       complete(tableName, req, null,
-        new IOException(String.format("No location found for '%s', row='%s', locateType=%s",
+        new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
           tableName, Bytes.toStringBinary(req.row), req.locateType)));
       return true;
     }
@@ -296,58 +310,60 @@ class AsyncNonMetaRegionLocator {
     RegionInfo info = loc.getRegion();
     if (info == null) {
       complete(tableName, req, null,
-        new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
+        new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
           tableName, Bytes.toStringBinary(req.row), req.locateType)));
       return true;
     }
     if (info.isSplitParent()) {
       return false;
     }
-    if (loc.getServerName() == null) {
-      complete(tableName, req, null,
-        new IOException(
-          String.format("No server address listed for region '%s', row='%s', locateType=%s",
-            info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType)));
-      return true;
-    }
-    complete(tableName, req, loc, null);
+    complete(tableName, req, locs, null);
     return true;
   }
 
-  private HRegionLocation locateRowInCache(TableCache tableCache, TableName tableName, byte[] row) {
-    Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row);
+  private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
+      int replicaId) {
+    Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
     if (entry == null) {
       return null;
     }
-    HRegionLocation loc = entry.getValue();
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
     byte[] endKey = loc.getRegion().getEndKey();
     if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
-          Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT);
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
       }
-      return loc;
+      return locs;
     } else {
       return null;
     }
   }
 
-  private HRegionLocation locateRowBeforeInCache(TableCache tableCache, TableName tableName,
-      byte[] row) {
+  private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
+      byte[] row, int replicaId) {
     boolean isEmptyStopRow = isEmptyStopRow(row);
-    Map.Entry<byte[], HRegionLocation> entry =
-        isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
+    Map.Entry<byte[], RegionLocations> entry =
+      isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
     if (entry == null) {
       return null;
     }
-    HRegionLocation loc = entry.getValue();
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
     if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
       (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
-          Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE);
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
       }
-      return loc;
+      return locs;
     } else {
       return null;
     }
@@ -390,8 +406,8 @@ class AsyncNonMetaRegionLocator {
             if (tableNotFound) {
               complete(tableName, req, null, new TableNotFoundException(tableName));
             } else if (!completeNormally) {
-              complete(tableName, req, null, new IOException(
-                "Unable to find region for " + Bytes.toStringBinary(req.row) + " in " + tableName));
+              complete(tableName, req, null, new IOException("Unable to find region for '" +
+                Bytes.toStringBinary(req.row) + "' in " + tableName));
             }
           }
 
@@ -423,13 +439,12 @@ class AsyncNonMetaRegionLocator {
                   continue;
                 }
                 RegionInfo info = loc.getRegion();
-                if (info == null || info.isOffline() || info.isSplitParent() ||
-                  loc.getServerName() == null) {
+                if (info == null || info.isOffline() || info.isSplitParent()) {
                   continue;
                 }
-                if (addToCache(tableCache, loc)) {
+                if (addToCache(tableCache, locs)) {
                   synchronized (tableCache) {
-                    tableCache.clearCompletedRequests(Optional.of(loc));
+                    tableCache.clearCompletedRequests(Optional.of(locs));
                   }
                 }
               }
@@ -438,36 +453,36 @@ class AsyncNonMetaRegionLocator {
         });
   }
 
-  private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row,
-      RegionLocateType locateType) {
+  private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
+      int replicaId, RegionLocateType locateType) {
     return locateType.equals(RegionLocateType.BEFORE)
-      ? locateRowBeforeInCache(tableCache, tableName, row)
-      : locateRowInCache(tableCache, tableName, row);
+      ? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
+      : locateRowInCache(tableCache, tableName, row, replicaId);
   }
 
   // locateToPrevious is true means we will use the start key of a region to locate the region
   // placed before it. Used for reverse scan. See the comment of
   // AsyncRegionLocator.getPreviousRegionLocation.
-  private CompletableFuture<HRegionLocation> getRegionLocationInternal(TableName tableName,
-      byte[] row, RegionLocateType locateType, boolean reload) {
+  private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
+      byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
     // AFTER should be convert to CURRENT before calling this method
     assert !locateType.equals(RegionLocateType.AFTER);
     TableCache tableCache = getTableCache(tableName);
     if (!reload) {
-      HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
-      if (loc != null) {
-        return CompletableFuture.completedFuture(loc);
+      RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
+      if (isGood(locs, replicaId)) {
+        return CompletableFuture.completedFuture(locs);
       }
     }
-    CompletableFuture<HRegionLocation> future;
+    CompletableFuture<RegionLocations> future;
     LocateRequest req;
     boolean sendRequest = false;
     synchronized (tableCache) {
       // check again
       if (!reload) {
-        HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
-        if (loc != null) {
-          return CompletableFuture.completedFuture(loc);
+        RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
+        if (isGood(locs, replicaId)) {
+          return CompletableFuture.completedFuture(locs);
         }
       }
       req = new LocateRequest(row, locateType);
@@ -487,28 +502,58 @@ class AsyncNonMetaRegionLocator {
     return future;
   }
 
-  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
-      RegionLocateType locateType, boolean reload) {
-    if (locateType.equals(RegionLocateType.BEFORE)) {
-      return getRegionLocationInternal(tableName, row, locateType, reload);
-    } else {
-      // as we know the exact row after us, so we can just create the new row, and use the same
-      // algorithm to locate it.
-      if (locateType.equals(RegionLocateType.AFTER)) {
-        row = createClosestRowAfter(row);
-      }
-      return getRegionLocationInternal(tableName, row, RegionLocateType.CURRENT, reload);
+  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
+      int replicaId, RegionLocateType locateType, boolean reload) {
+    // as we know the exact row after us, so we can just create the new row, and use the same
+    // algorithm to locate it.
+    if (locateType.equals(RegionLocateType.AFTER)) {
+      row = createClosestRowAfter(row);
+      locateType = RegionLocateType.CURRENT;
     }
+    return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
   }
 
-  void updateCachedLocation(HRegionLocation loc, Throwable exception) {
-    AsyncRegionLocator.updateCachedLocation(loc, exception, l -> {
-      TableCache tableCache = cache.get(l.getRegion().getTable());
-      if (tableCache == null) {
-        return null;
+  private void removeLocationFromCache(HRegionLocation loc) {
+    TableCache tableCache = cache.get(loc.getRegion().getTable());
+    if (tableCache == null) {
+      return;
+    }
+    byte[] startKey = loc.getRegion().getStartKey();
+    for (;;) {
+      RegionLocations oldLocs = tableCache.cache.get(startKey);
+      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
+      if (!canUpdateOnError(loc, oldLoc)) {
+        return;
       }
-      return tableCache.cache.get(l.getRegion().getStartKey());
-    }, this::addToCache, this::removeFromCache);
+      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
+      if (newLocs == null) {
+        if (tableCache.cache.remove(startKey, oldLocs)) {
+          return;
+        }
+      } else {
+        if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
+          return;
+        }
+      }
+    }
+  }
+
+  private void addLocationToCache(HRegionLocation loc) {
+    addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
+  }
+
+  private HRegionLocation getCachedLocation(HRegionLocation loc) {
+    TableCache tableCache = cache.get(loc.getRegion().getTable());
+    if (tableCache == null) {
+      return null;
+    }
+    RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
+    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
+  }
+
+  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
+      this::addLocationToCache, this::removeLocationFromCache);
   }
 
   void clearCache(TableName tableName) {
@@ -526,11 +571,11 @@ class AsyncNonMetaRegionLocator {
 
   // only used for testing whether we have cached the location for a region.
   @VisibleForTesting
-  HRegionLocation getRegionLocationInCache(TableName tableName, byte[] row) {
+  RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
     TableCache tableCache = cache.get(tableName);
     if (tableCache == null) {
       return null;
     }
-    return locateRowInCache(tableCache, tableName, row);
+    return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 56228ab..d624974 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -18,26 +18,24 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
-import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.Supplier;
-
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionException;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.exceptions.RegionMovedException;
-import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
 
 /**
  * The asynchronous region locator.
@@ -59,8 +57,8 @@ class AsyncRegionLocator {
     this.retryTimer = retryTimer;
   }
 
-  private CompletableFuture<HRegionLocation> withTimeout(CompletableFuture<HRegionLocation> future,
-      long timeoutNs, Supplier<String> timeoutMsg) {
+  private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs,
+      Supplier<String> timeoutMsg) {
     if (future.isDone() || timeoutNs <= 0) {
       return future;
     }
@@ -78,74 +76,75 @@ class AsyncRegionLocator {
     });
   }
 
-  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+  private boolean isMeta(TableName tableName) {
+    return TableName.isMetaTableName(tableName);
+  }
+
+  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
       RegionLocateType type, boolean reload, long timeoutNs) {
+    CompletableFuture<RegionLocations> future = isMeta(tableName)
+      ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload)
+      : nonMetaRegionLocator.getRegionLocations(tableName, row,
+        RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
+    return withTimeout(future, timeoutNs,
+      () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
+        "ms) waiting for region locations for " + tableName + ", row='" +
+        Bytes.toStringBinary(row) + "'");
+  }
+
+  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+      int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
     // meta region can not be split right now so we always call the same method.
     // Change it later if the meta table can have more than one regions.
-    CompletableFuture<HRegionLocation> future =
-        tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation(reload)
-            : nonMetaRegionLocator.getRegionLocation(tableName, row, type, reload);
+    CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+    CompletableFuture<RegionLocations> locsFuture =
+      isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload)
+        : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
+    addListener(locsFuture, (locs, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      HRegionLocation loc = locs.getRegionLocation(replicaId);
+      if (loc == null) {
+        future
+          .completeExceptionally(new RegionException("No location for " + tableName + ", row='" +
+            Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId));
+      } else if (loc.getServerName() == null) {
+        future.completeExceptionally(new HBaseIOException("No server address listed for region '" +
+          loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) +
+          "', locateType=" + type + ", replicaId=" + replicaId));
+      } else {
+        future.complete(loc);
+      }
+    });
     return withTimeout(future, timeoutNs,
       () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
-          "ms) waiting for region location for " + tableName + ", row='" +
-          Bytes.toStringBinary(row) + "'");
+        "ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) +
+        "', replicaId=" + replicaId);
   }
 
   CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
-      RegionLocateType type, long timeoutNs) {
-    return getRegionLocation(tableName, row, type, false, timeoutNs);
+      int replicaId, RegionLocateType type, long timeoutNs) {
+    return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs);
   }
 
-  static boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
-    // Do not need to update if no such location, or the location is newer, or the location is not
-    // same with us
-    return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() &&
-        oldLoc.getServerName().equals(loc.getServerName());
+  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+      RegionLocateType type, boolean reload, long timeoutNs) {
+    return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload,
+      timeoutNs);
   }
 
-  static void updateCachedLocation(HRegionLocation loc, Throwable exception,
-      Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
-      Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
-    HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Try updating " + loc + ", the old value is " + oldLoc, exception);
-    }
-    if (!canUpdate(loc, oldLoc)) {
-      return;
-    }
-    Throwable cause = findException(exception);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("The actual exception when updating " + loc, cause);
-    }
-    if (cause == null || !isMetaClearingException(cause)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(
-          "Will not update " + loc + " because the exception is null or not the one we care about");
-      }
-      return;
-    }
-    if (cause instanceof RegionMovedException) {
-      RegionMovedException rme = (RegionMovedException) cause;
-      HRegionLocation newLoc =
-          new HRegionLocation(loc.getRegionInfo(), rme.getServerName(), rme.getLocationSeqNum());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(
-          "Try updating " + loc + " with the new location " + newLoc + " constructed by " + rme);
-      }
-      addToCache.accept(newLoc);
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Try removing " + loc + " from cache");
-      }
-      removeFromCache.accept(loc);
-    }
+  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+      RegionLocateType type, long timeoutNs) {
+    return getRegionLocation(tableName, row, type, false, timeoutNs);
   }
 
-  void updateCachedLocation(HRegionLocation loc, Throwable exception) {
+  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
     if (loc.getRegion().isMetaRegion()) {
-      metaRegionLocator.updateCachedLocation(loc, exception);
+      metaRegionLocator.updateCachedLocationOnError(loc, exception);
     } else {
-      nonMetaRegionLocator.updateCachedLocation(loc, exception);
+      nonMetaRegionLocator.updateCachedLocationOnError(loc, exception);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
new file mode 100644
index 0000000..5c9c091
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
+import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
+
+import java.util.Arrays;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class for asynchronous region locator.
+ */
+@InterfaceAudience.Private
+final class AsyncRegionLocatorHelper {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocatorHelper.class);
+
+  private AsyncRegionLocatorHelper() {
+  }
+
+  static boolean canUpdateOnError(HRegionLocation loc, HRegionLocation oldLoc) {
+    // Do not need to update if no such location, or the location is newer, or the location is not
+    // the same with us
+    return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() &&
+      oldLoc.getServerName().equals(loc.getServerName());
+  }
+
+  static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
+      Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
+      Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
+    HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
+    LOG.debug("Try updating {} , the old value is {}", loc, oldLoc, exception);
+    if (!canUpdateOnError(loc, oldLoc)) {
+      return;
+    }
+    Throwable cause = findException(exception);
+    LOG.debug("The actual exception when updating {}", loc, cause);
+    if (cause == null || !isMetaClearingException(cause)) {
+      LOG.debug("Will not update {} because the exception is null or not the one we care about",
+        loc);
+      return;
+    }
+    if (cause instanceof RegionMovedException) {
+      RegionMovedException rme = (RegionMovedException) cause;
+      HRegionLocation newLoc =
+        new HRegionLocation(loc.getRegion(), rme.getServerName(), rme.getLocationSeqNum());
+      LOG.debug("Try updating {} with the new location {} constructed by {}", loc, newLoc, rme);
+      addToCache.accept(newLoc);
+    } else {
+      LOG.debug("Try removing {} from cache", loc);
+      removeFromCache.accept(loc);
+    }
+  }
+
+  static RegionLocations createRegionLocations(HRegionLocation loc) {
+    int replicaId = loc.getRegion().getReplicaId();
+    HRegionLocation[] locs = new HRegionLocation[replicaId + 1];
+    locs[replicaId] = loc;
+    return new RegionLocations(locs);
+  }
+
+  /**
+   * Create a new {@link RegionLocations} based on the given {@code oldLocs}, and replace the
+   * location for the given {@code replicaId} with the given {@code loc}.
+   * <p/>
+   * All the {@link RegionLocations} in async locator related class are immutable because we want to
+   * access them concurrently, so here we need to create a new one, instead of calling
+   * {@link RegionLocations#updateLocation(HRegionLocation, boolean, boolean)}.
+   */
+  static RegionLocations replaceRegionLocation(RegionLocations oldLocs, HRegionLocation loc) {
+    int replicaId = loc.getRegion().getReplicaId();
+    HRegionLocation[] locs = oldLocs.getRegionLocations();
+    locs = Arrays.copyOf(locs, Math.max(replicaId + 1, locs.length));
+    locs[replicaId] = loc;
+    return new RegionLocations(locs);
+  }
+
+  /**
+   * Create a new {@link RegionLocations} based on the given {@code oldLocs}, and remove the
+   * location for the given {@code replicaId}.
+   * <p/>
+   * All the {@link RegionLocations} in async locator related class are immutable because we want to
+   * access them concurrently, so here we need to create a new one, instead of calling
+   * {@link RegionLocations#remove(int)}.
+   */
+  static RegionLocations removeRegionLocation(RegionLocations oldLocs, int replicaId) {
+    HRegionLocation[] locs = oldLocs.getRegionLocations();
+    if (locs.length < replicaId + 1) {
+      // Here we do not modify the oldLocs so it is safe to return it.
+      return oldLocs;
+    }
+    locs = Arrays.copyOf(locs, locs.length);
+    locs[replicaId] = null;
+    if (ObjectUtils.firstNonNull(locs) != null) {
+      return new RegionLocations(locs);
+    } else {
+      // if all the locations are null, just return null
+      return null;
+    }
+  }
+
+  /**
+   * Create a new {@link RegionLocations} which is the merging result for the given two
+   * {@link RegionLocations}.
+   * <p/>
+   * All the {@link RegionLocations} in async locator related class are immutable because we want to
+   * access them concurrently, so here we need to create a new one, instead of calling
+   * {@link RegionLocations#mergeLocations(RegionLocations)} directly.
+   */
+  static RegionLocations mergeRegionLocations(RegionLocations newLocs, RegionLocations oldLocs) {
+    RegionLocations locs = new RegionLocations(newLocs.getRegionLocations());
+    locs.mergeLocations(oldLocs);
+    return locs;
+  }
+
+  static boolean isGood(RegionLocations locs, int replicaId) {
+    if (locs == null) {
+      return false;
+    }
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    return loc != null && loc.getServerName() != null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index d30012f..e03049a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -88,15 +88,15 @@ public abstract class AsyncRpcRetryingCaller<T> {
     return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
   }
 
-  protected long remainingTimeNs() {
+  protected final long remainingTimeNs() {
     return operationTimeoutNs - (System.nanoTime() - startNs);
   }
 
-  protected void completeExceptionally() {
+  protected final void completeExceptionally() {
     future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
   }
 
-  protected void resetCallTimeout() {
+  protected final void resetCallTimeout() {
     long callTimeoutNs;
     if (operationTimeoutNs > 0) {
       callTimeoutNs = remainingTimeNs();
@@ -111,8 +111,15 @@ public abstract class AsyncRpcRetryingCaller<T> {
     resetController(controller, callTimeoutNs);
   }
 
-  protected void onError(Throwable error, Supplier<String> errMsg,
+  protected final void onError(Throwable error, Supplier<String> errMsg,
       Consumer<Throwable> updateCachedLocation) {
+    if (future.isDone()) {
+      // Give up if the future is already done, this is possible if user has already canceled the
+      // future. And for timeline consistent read, we will also cancel some requests if we have
+      // already get one of the responses.
+      LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled());
+      return;
+    }
     error = translateException(error);
     if (error instanceof DoNotRetryIOException) {
       future.completeExceptionally(error);

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index f80b4e5..a660e74 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -17,22 +17,22 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
 import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
 import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 
@@ -75,6 +75,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private RegionLocateType locateType = RegionLocateType.CURRENT;
 
+    private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
+
     public SingleRequestCallerBuilder<T> table(TableName tableName) {
       this.tableName = tableName;
       return this;
@@ -121,11 +123,17 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public SingleRequestCallerBuilder<T> replicaId(int replicaId) {
+      this.replicaId = replicaId;
+      return this;
+    }
+
     public AsyncSingleRequestRpcRetryingCaller<T> build() {
+      checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
       return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
-          checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
-          checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
-          pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+        checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId,
+        checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
+        pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**
@@ -241,11 +249,11 @@ class AsyncRpcRetryingCallerFactory {
     public AsyncScanSingleRegionRpcRetryingCaller build() {
       checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
       return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
-          checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
-          checkNotNull(resultCache, "resultCache is null"),
-          checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
-          checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
-          pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+        checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
+        checkNotNull(resultCache, "resultCache is null"),
+        checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
+        checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
+        pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**
@@ -311,7 +319,7 @@ class AsyncRpcRetryingCallerFactory {
 
     public <T> AsyncBatchRpcRetryingCaller<T> build() {
       return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
-          maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+        maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
     }
 
     public <T> List<CompletableFuture<T>> call() {
@@ -363,8 +371,8 @@ class AsyncRpcRetryingCallerFactory {
 
     public AsyncMasterRequestRpcRetryingCaller<T> build() {
       return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn,
-          checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
-          rpcTimeoutNs, startLogErrorsCnt);
+        checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
+        rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**
@@ -390,7 +398,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private ServerName serverName;
 
-    public AdminRequestCallerBuilder<T> action(AsyncAdminRequestRetryingCaller.Callable<T> callable) {
+    public AdminRequestCallerBuilder<T> action(
+        AsyncAdminRequestRetryingCaller.Callable<T> callable) {
       this.callable = callable;
       return this;
     }
@@ -420,15 +429,15 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
-    public AdminRequestCallerBuilder<T> serverName(ServerName serverName){
+    public AdminRequestCallerBuilder<T> serverName(ServerName serverName) {
       this.serverName = serverName;
       return this;
     }
 
     public AsyncAdminRequestRetryingCaller<T> build() {
       return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
-          operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName,
-            "serverName is null"), checkNotNull(callable, "action is null"));
+        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+        checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
     }
 
     public CompletableFuture<T> call() {
@@ -436,7 +445,7 @@ class AsyncRpcRetryingCallerFactory {
     }
   }
 
-  public <T> AdminRequestCallerBuilder<T> adminRequest(){
+  public <T> AdminRequestCallerBuilder<T> adminRequest() {
     return new AdminRequestCallerBuilder<>();
   }
 
@@ -488,8 +497,8 @@ class AsyncRpcRetryingCallerFactory {
 
     public AsyncServerRequestRpcRetryingCaller<T> build() {
       return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
-          operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName,
-            "serverName is null"), checkNotNull(callable, "action is null"));
+        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+        checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
     }
 
     public CompletableFuture<T> call() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 56c82fb..1a52e5c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -17,17 +17,19 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 
 /**
  * Retry caller for a single request, such as get, put, delete, etc.
@@ -45,18 +47,21 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
 
   private final byte[] row;
 
+  private final int replicaId;
+
   private final RegionLocateType locateType;
 
   private final Callable<T> callable;
 
   public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
-      TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable,
-      long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
-      int startLogErrorsCnt) {
+      TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
+      Callable<T> callable, long pauseNs, int maxAttempts, long operationTimeoutNs,
+      long rpcTimeoutNs, int startLogErrorsCnt) {
     super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
-        startLogErrorsCnt);
+      startLogErrorsCnt);
     this.tableName = tableName;
     this.row = row;
+    this.replicaId = replicaId;
     this.locateType = locateType;
     this.callable = callable;
   }
@@ -67,23 +72,22 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
       stub = conn.getRegionServerStub(loc.getServerName());
     } catch (IOException e) {
       onError(e,
-        () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row)
-            + "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
-        err -> conn.getLocator().updateCachedLocation(loc, err));
+        () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) +
+          "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
+        err -> conn.getLocator().updateCachedLocationOnError(loc, err));
       return;
     }
     resetCallTimeout();
-    callable.call(controller, loc, stub).whenComplete(
-      (result, error) -> {
-        if (error != null) {
-          onError(error,
-            () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in "
-                + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
-            err -> conn.getLocator().updateCachedLocation(loc, err));
-          return;
-        }
-        future.complete(result);
-      });
+    callable.call(controller, loc, stub).whenComplete((result, error) -> {
+      if (error != null) {
+        onError(error,
+          () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
+            loc.getRegion().getEncodedName() + " of " + tableName + " failed",
+          err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+        return;
+      }
+      future.complete(result);
+    });
   }
 
   @Override
@@ -98,18 +102,17 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
     } else {
       locateTimeoutNs = -1L;
     }
-    conn.getLocator()
-        .getRegionLocation(tableName, row, locateType, locateTimeoutNs)
-        .whenComplete(
-          (loc, error) -> {
-            if (error != null) {
-              onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName
-                  + " failed", err -> {
-              });
-              return;
-            }
-            call(loc);
-          });
+    addListener(
+      conn.getLocator().getRegionLocation(tableName, row, replicaId, locateType, locateTimeoutNs),
+      (loc, error) -> {
+        if (error != null) {
+          onError(error,
+            () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
+            });
+          return;
+        }
+        call(loc);
+      });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
index dbfcef5..3bda38e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.client;
 
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -55,5 +54,30 @@ public interface AsyncTableRegionLocator {
    * @param row Row to find.
    * @param reload true to reload information or false to use cached information
    */
-  CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload);
+  default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
+    return getRegionLocation(row, RegionReplicaUtil.DEFAULT_REPLICA_ID, reload);
+  }
+
+  /**
+   * Finds the region with the given <code>replicaId</code> on which the given row is being served.
+   * <p>
+   * Returns the location of the region with the given <code>replicaId</code> to which the row
+   * belongs.
+   * @param row Row to find.
+   * @param replicaId the replica id of the region
+   */
+  default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId) {
+    return getRegionLocation(row, replicaId, false);
+  }
+
+  /**
+   * Finds the region with the given <code>replicaId</code> on which the given row is being served.
+   * <p>
+   * Returns the location of the region with the given <code>replicaId</code> to which the row
+   * belongs.
+   * @param row Row to find.
+   * @param replicaId the replica id of the region
+   * @param reload true to reload information or false to use cached information
+   */
+  CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index 7d199df..465a411 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -44,7 +44,9 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
   }
 
   @Override
-  public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
-    return locator.getRegionLocation(tableName, row, RegionLocateType.CURRENT, reload, -1L);
+  public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId,
+      boolean reload) {
+    return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload,
+      -1L);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index d996004..55c62e7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -38,6 +38,9 @@ public class ConnectionConfiguration {
   public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second
   public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
   public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
+  public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND =
+    "hbase.client.primaryCallTimeout.get";
+  public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 10ms
 
   private final long writeBufferSize;
   private final long writeBufferPeriodicFlushTimeoutMs;
@@ -86,7 +89,7 @@ public class ConnectionConfiguration {
             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
 
     this.primaryCallTimeoutMicroSecond =
-        conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms
+      conf.getInt(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT);
 
     this.replicaCallTimeoutMicroSecondScan =
         conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms