You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/04 06:40:32 UTC

[06/15] hbase git commit: HBASE-17356 Add replica get support

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index d705d7c..28db7e8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.client;
 import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import com.google.protobuf.RpcChannel;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -45,9 +45,12 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@@ -63,7 +66,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
 
 /**
  * The implementation of RawAsyncTable.
- * <p>
+ * <p/>
  * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
  * be finished inside the rpc framework thread, which means that the callbacks registered to the
  * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
@@ -74,6 +77,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
 @InterfaceAudience.Private
 class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
+  private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
+
   private final AsyncConnectionImpl conn;
 
   private final TableName tableName;
@@ -204,58 +209,126 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
   private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
     return conn.callerFactory.<T> single().table(tableName).row(row)
-        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
-        .startLogErrorsCnt(startLogErrorsCnt);
+      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+      .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+      .startLogErrorsCnt(startLogErrorsCnt);
   }
 
   private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
     return newCaller(row.getRow(), rpcTimeoutNs);
   }
 
+  private CompletableFuture<Result> get(Get get, int replicaId, long timeoutNs) {
+    return this.<Result> newCaller(get, timeoutNs)
+      .action((controller, loc, stub) -> RawAsyncTableImpl
+        .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
+          RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
+          (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
+      .replicaId(replicaId).call();
+  }
+
+  // Connect the two futures, if the src future is done, then mark the dst future as done. And if
+  // the dst future is done, then cancel the src future. This is used for timeline consistent read.
+  private <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
+    addListener(srcFuture, (r, e) -> {
+      if (e != null) {
+        dstFuture.completeExceptionally(e);
+      } else {
+        dstFuture.complete(r);
+      }
+    });
+    // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
+    // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst
+    // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in
+    // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the
+    // tie.
+    addListener(dstFuture, (r, e) -> srcFuture.cancel(false));
+  }
+
+  private void timelineConsistentGet(Get get, RegionLocations locs,
+      CompletableFuture<Result> future) {
+    if (future.isDone()) {
+      // do not send requests to secondary replicas if the future is done, i.e, the primary request
+      // has already been finished.
+      return;
+    }
+    for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
+      CompletableFuture<Result> secondaryFuture = get(get, replicaId, readRpcTimeoutNs);
+      connect(secondaryFuture, future);
+    }
+  }
+
   @Override
   public CompletableFuture<Result> get(Get get) {
-    return this.<Result> newCaller(get, readRpcTimeoutNs)
-        .action((controller, loc, stub) -> RawAsyncTableImpl
-            .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
-              RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
-              (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
-        .call();
+    CompletableFuture<Result> primaryFuture =
+      get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
+    if (get.getConsistency() == Consistency.STRONG) {
+      return primaryFuture;
+    }
+    // Timeline consistent read, where we will send requests to other region replicas
+    CompletableFuture<Result> future = new CompletableFuture<>();
+    connect(primaryFuture, future);
+    long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
+    long startNs = System.nanoTime();
+    addListener(conn.getLocator().getRegionLocations(tableName, get.getRow(),
+      RegionLocateType.CURRENT, false, readRpcTimeoutNs), (locs, error) -> {
+        if (error != null) {
+          LOG.warn(
+            "Failed to locate all the replicas for table={}, row='{}'," +
+              " give up timeline consistent read",
+            tableName, Bytes.toStringBinary(get.getRow()), error);
+          return;
+        }
+        if (locs.size() <= 1) {
+          LOG.warn(
+            "There are no secondary replicas for region {}," + " give up timeline consistent read",
+            locs.getDefaultRegionLocation().getRegion());
+          return;
+        }
+        long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
+        if (delayNs <= 0) {
+          timelineConsistentGet(get, locs, future);
+        } else {
+          AsyncConnectionImpl.RETRY_TIMER.newTimeout(
+            timeout -> timelineConsistentGet(get, locs, future), delayNs, TimeUnit.NANOSECONDS);
+        }
+      });
+    return future;
   }
 
   @Override
   public CompletableFuture<Void> put(Put put) {
     return this.<Void> newCaller(put, writeRpcTimeoutNs)
-        .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
-          put, RequestConverter::buildMutateRequest))
-        .call();
+      .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
+        put, RequestConverter::buildMutateRequest))
+      .call();
   }
 
   @Override
   public CompletableFuture<Void> delete(Delete delete) {
     return this.<Void> newCaller(delete, writeRpcTimeoutNs)
-        .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
-          stub, delete, RequestConverter::buildMutateRequest))
-        .call();
+      .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
+        stub, delete, RequestConverter::buildMutateRequest))
+      .call();
   }
 
   @Override
   public CompletableFuture<Result> append(Append append) {
     checkHasFamilies(append);
     return this.<Result> newCaller(append, rpcTimeoutNs)
-        .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
-          append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
-        .call();
+      .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
+        append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
+      .call();
   }
 
   @Override
   public CompletableFuture<Result> increment(Increment increment) {
     checkHasFamilies(increment);
     return this.<Result> newCaller(increment, rpcTimeoutNs)
-        .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
-          stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
-        .call();
+      .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
+        stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
+      .call();
   }
 
   private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
@@ -313,36 +386,36 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     public CompletableFuture<Boolean> thenPut(Put put) {
       preCheck();
       return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
-          .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller,
-            loc, stub, put,
-            (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
-              new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
-            (c, r) -> r.getProcessed()))
-          .call();
+        .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
+          stub, put,
+          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+            new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
+          (c, r) -> r.getProcessed()))
+        .call();
     }
 
     @Override
     public CompletableFuture<Boolean> thenDelete(Delete delete) {
       preCheck();
       return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
-          .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
-            loc, stub, delete,
-            (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
-              new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
-            (c, r) -> r.getProcessed()))
-          .call();
+        .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
+          loc, stub, delete,
+          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+            new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
+          (c, r) -> r.getProcessed()))
+        .call();
     }
 
     @Override
     public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
       preCheck();
       return RawAsyncTableImpl.this.<Boolean> newCaller(mutation, rpcTimeoutNs)
-          .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
-            stub, mutation,
-            (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
-              new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
-            resp -> resp.getExists()))
-          .call();
+        .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
+          stub, mutation,
+          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+            new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
+          resp -> resp.getExists()))
+        .call();
     }
   }
 
@@ -375,10 +448,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
               if (ex != null) {
                 future.completeExceptionally(ex instanceof IOException ? ex
                   : new IOException(
-                      "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
+                    "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
               } else {
                 future.complete(respConverter
-                    .apply((Result) multiResp.getResults().get(regionName).result.get(0)));
+                  .apply((Result) multiResp.getResults().get(regionName).result.get(0)));
               }
             } catch (IOException e) {
               future.completeExceptionally(e);
@@ -399,7 +472,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
           regionMutationBuilder.setAtomic(true);
           return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
-        }, resp -> null)).call();
+        }, resp -> null))
+      .call();
   }
 
   private Scan setDefaultScanConfig(Scan scan) {
@@ -416,7 +490,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
   public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
     new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
-        maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
+      maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
   }
 
   private long resultSize2CacheSize(long maxResultSize) {
@@ -427,8 +501,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
   @Override
   public ResultScanner getScanner(Scan scan) {
     return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan),
-        resultSize2CacheSize(
-          scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
+      resultSize2CacheSize(
+        scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
   }
 
   @Override
@@ -477,14 +551,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
   private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
     return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
-        .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
+      .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
   }
 
   private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
     return conn.callerFactory.batch().table(tableName).actions(actions)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
-        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
+      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
   }
 
   @Override
@@ -515,7 +589,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
   private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
       ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
     RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
-        region, row, rpcTimeoutNs, operationTimeoutNs);
+      region, row, rpcTimeoutNs, operationTimeoutNs);
     S stub = stubMaker.apply(channel);
     CompletableFuture<R> future = new CompletableFuture<>();
     ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
@@ -553,10 +627,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
   }
 
   private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
-      ServiceCaller<S, R> callable, CoprocessorCallback<R> callback,
-      List<HRegionLocation> locs, byte[] endKey, boolean endKeyInclusive,
-      AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc,
-      Throwable error) {
+      ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
+      byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
+      AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
     if (error != null) {
       callback.onError(error);
       return;
@@ -566,11 +639,11 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     if (locateFinished(region, endKey, endKeyInclusive)) {
       locateFinished.set(true);
     } else {
-      conn.getLocator()
-          .getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
-            operationTimeoutNs)
-          .whenComplete((l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey,
-            endKeyInclusive, locateFinished, unfinishedRequest, l, e));
+      addListener(
+        conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
+          operationTimeoutNs),
+        (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
+          locateFinished, unfinishedRequest, l, e));
     }
     coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> {
       if (e != null) {
@@ -630,11 +703,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
     @Override
     public void execute() {
-      conn.getLocator().getRegionLocation(tableName, startKey,
-        startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs)
-          .whenComplete(
-            (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(),
-              endKey, endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
+      addListener(conn.getLocator().getRegionLocation(tableName, startKey,
+        startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs),
+        (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
+          endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
new file mode 100644
index 0000000..067e66b
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class for processing futures.
+ */
+@InterfaceAudience.Private
+public final class FutureUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class);
+
+  private FutureUtils() {
+  }
+
+  /**
+   * This is method is used when you just want to add a listener to the given future. We will call
+   * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
+   * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
+   * suppress exceptions thrown from the code that completes the future, and this method will catch
+   * all the exception thrown from the {@code action} to catch possible code bugs.
+   * <p/>
+   * And the error phone check will always report FutureReturnValueIgnored because every method in
+   * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
+   * have one future that has not been checked. So we introduce this method and add a suppress
+   * warnings annotation here.
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public static <T> void addListener(CompletableFuture<T> future,
+      BiConsumer<? super T, ? super Throwable> action) {
+    future.whenComplete((resp, error) -> {
+      try {
+        action.accept(resp, error);
+      } catch (Throwable t) {
+        LOG.error("Unexpected error caught when processing CompletableFuture", t);
+      }
+    });
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
new file mode 100644
index 0000000..c14f69f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.util.Bytes;
+
+final class RegionReplicaTestHelper {
+
+  private RegionReplicaTestHelper() {
+  }
+
+  // waits for all replicas to have region location
+  static void waitUntilAllMetaReplicasHavingRegionLocation(AsyncRegistry registry,
+      int regionReplication) throws IOException {
+    TestZKAsyncRegistry.TEST_UTIL.waitFor(
+      TestZKAsyncRegistry.TEST_UTIL.getConfiguration()
+        .getLong("hbase.client.sync.wait.timeout.msec", 60000),
+      200, true, new ExplainingPredicate<IOException>() {
+        @Override
+        public String explainFailure() throws IOException {
+          return "Not all meta replicas get assigned";
+        }
+
+        @Override
+        public boolean evaluate() throws IOException {
+          try {
+            RegionLocations locs = registry.getMetaRegionLocation().get();
+            if (locs.size() < regionReplication) {
+              return false;
+            }
+            for (int i = 0; i < regionReplication; i++) {
+              if (locs.getRegionLocation(i) == null) {
+                return false;
+              }
+            }
+            return true;
+          } catch (Exception e) {
+            TestZKAsyncRegistry.LOG.warn("Failed to get meta region locations", e);
+            return false;
+          }
+        }
+      });
+  }
+
+  static Optional<ServerName> getRSCarryingReplica(HBaseTestingUtility util, TableName tableName,
+      int replicaId) {
+    return util.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+      .filter(rs -> rs.getRegions(tableName).stream()
+        .anyMatch(r -> r.getRegionInfo().getReplicaId() == replicaId))
+      .findAny().map(rs -> rs.getServerName());
+  }
+
+  /**
+   * Return the new location.
+   */
+  static ServerName moveRegion(HBaseTestingUtility util, HRegionLocation currentLoc)
+      throws Exception {
+    ServerName serverName = currentLoc.getServerName();
+    RegionInfo regionInfo = currentLoc.getRegion();
+    TableName tableName = regionInfo.getTable();
+    int replicaId = regionInfo.getReplicaId();
+    ServerName newServerName = util.getHBaseCluster().getRegionServerThreads().stream()
+      .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
+      .get();
+    util.getAdmin().move(regionInfo.getEncodedNameAsBytes(),
+      Bytes.toBytes(newServerName.getServerName()));
+    util.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        Optional<ServerName> newServerName = getRSCarryingReplica(util, tableName, replicaId);
+        return newServerName.isPresent() && !newServerName.get().equals(serverName);
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return regionInfo.getRegionNameAsString() + " is still on " + serverName;
+      }
+    });
+    return newServerName;
+  }
+
+  interface Locator {
+    RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
+        throws Exception;
+
+    void updateCachedLocationOnError(HRegionLocation loc, Throwable error) throws Exception;
+  }
+
+  static void testLocator(HBaseTestingUtility util, TableName tableName, Locator locator)
+      throws Exception {
+    RegionLocations locs =
+      locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false);
+    assertEquals(3, locs.size());
+    for (int i = 0; i < 3; i++) {
+      HRegionLocation loc = locs.getRegionLocation(i);
+      assertNotNull(loc);
+      ServerName serverName = getRSCarryingReplica(util, tableName, i).get();
+      assertEquals(serverName, loc.getServerName());
+    }
+    ServerName newServerName = moveRegion(util, locs.getDefaultRegionLocation());
+    // The cached location should not be changed
+    assertEquals(locs.getDefaultRegionLocation().getServerName(),
+      locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false)
+        .getDefaultRegionLocation().getServerName());
+    // should get the new location when reload = true
+    assertEquals(newServerName,
+      locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, true)
+        .getDefaultRegionLocation().getServerName());
+    // the cached location should be replaced
+    assertEquals(newServerName,
+      locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false)
+        .getDefaultRegionLocation().getServerName());
+
+    ServerName newServerName1 = moveRegion(util, locs.getRegionLocation(1));
+    ServerName newServerName2 = moveRegion(util, locs.getRegionLocation(2));
+
+    // The cached location should not be change
+    assertEquals(locs.getRegionLocation(1).getServerName(),
+      locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName());
+    // clear the cached location for replica 1
+    locator.updateCachedLocationOnError(locs.getRegionLocation(1), new NotServingRegionException());
+    // the cached location for replica 2 should not be changed
+    assertEquals(locs.getRegionLocation(2).getServerName(),
+      locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName());
+    // should get the new location as we have cleared the old location
+    assertEquals(newServerName1,
+      locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName());
+    // as we will get the new location for replica 2 at once, we should also get the new location
+    // for replica 2
+    assertEquals(newServerName2,
+      locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
index 7c08d6d..df1fe08 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
@@ -17,20 +17,19 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.assertEquals;
+import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
 
-import java.util.Optional;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -42,7 +41,7 @@ public class TestAsyncMetaRegionLocator {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class);
+    HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
@@ -53,10 +52,11 @@ public class TestAsyncMetaRegionLocator {
   @BeforeClass
   public static void setUp() throws Exception {
     TEST_UTIL.getConfiguration().set(BaseLoadBalancer.TABLES_ON_MASTER, "none");
+    TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
     TEST_UTIL.startMiniCluster(3);
-    TEST_UTIL.waitUntilAllSystemRegionsAssigned();
-    TEST_UTIL.getAdmin().setBalancerRunning(false, true);
     REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3);
+    TEST_UTIL.getAdmin().balancerSwitch(false, true);
     LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
   }
 
@@ -66,42 +66,21 @@ public class TestAsyncMetaRegionLocator {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  private Optional<ServerName> getRSCarryingMeta() {
-    return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
-        .map(t -> t.getRegionServer())
-        .filter(rs -> !rs.getRegions(TableName.META_TABLE_NAME).isEmpty()).findAny()
-        .map(rs -> rs.getServerName());
-  }
-
   @Test
-  public void testReload() throws Exception {
-    ServerName serverName = getRSCarryingMeta().get();
-    assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName());
-
-    ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
-        .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
-        .findAny().get();
-    TEST_UTIL.getAdmin().move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
-      Bytes.toBytes(newServerName.getServerName()));
-    TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+  public void test() throws Exception {
+    testLocator(TEST_UTIL, TableName.META_TABLE_NAME, new Locator() {
 
       @Override
-      public boolean evaluate() throws Exception {
-        Optional<ServerName> newServerName = getRSCarryingMeta();
-        return newServerName.isPresent() && !newServerName.get().equals(serverName);
+      public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
+          throws Exception {
+        LOCATOR.updateCachedLocationOnError(loc, error);
       }
 
       @Override
-      public String explainFailure() throws Exception {
-        return HRegionInfo.FIRST_META_REGIONINFO.getRegionNameAsString() + " is still on " +
-            serverName;
+      public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
+          throws Exception {
+        return LOCATOR.getRegionLocations(replicaId, reload).get();
       }
     });
-    // The cached location will not change
-    assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName());
-    // should get the new location when reload = true
-    assertEquals(newServerName, LOCATOR.getRegionLocation(true).get().getServerName());
-    // the cached location should be replaced
-    assertEquals(newServerName, LOCATOR.getRegionLocation(false).get().getServerName());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index 38dc78d..eeaf99f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -38,10 +39,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -58,7 +61,7 @@ public class TestAsyncNonMetaRegionLocator {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
+    HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
@@ -78,7 +81,7 @@ public class TestAsyncNonMetaRegionLocator {
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-        registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), User.getCurrent());
     LOCATOR = new AsyncNonMetaRegionLocator(CONN);
     SPLIT_KEYS = new byte[8][];
     for (int i = 111; i < 999; i += 111) {
@@ -109,11 +112,18 @@ public class TestAsyncNonMetaRegionLocator {
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
   }
 
+  private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName,
+      byte[] row, RegionLocateType locateType, boolean reload) {
+    return LOCATOR
+      .getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload)
+      .thenApply(RegionLocations::getDefaultRegionLocation);
+  }
+
   @Test
   public void testNoTable() throws InterruptedException {
     for (RegionLocateType locateType : RegionLocateType.values()) {
       try {
-        LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
+        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
       } catch (ExecutionException e) {
         assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
       }
@@ -126,7 +136,7 @@ public class TestAsyncNonMetaRegionLocator {
     TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
     for (RegionLocateType locateType : RegionLocateType.values()) {
       try {
-        LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
+        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
       } catch (ExecutionException e) {
         assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
       }
@@ -148,13 +158,13 @@ public class TestAsyncNonMetaRegionLocator {
     ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
     for (RegionLocateType locateType : RegionLocateType.values()) {
       assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-        LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
     }
     byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
     ThreadLocalRandom.current().nextBytes(randKey);
     for (RegionLocateType locateType : RegionLocateType.values()) {
       assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-        LOCATOR.getRegionLocation(TABLE_NAME, randKey, locateType, false).get());
+        getDefaultRegionLocation(TABLE_NAME, randKey, locateType, false).get());
     }
   }
 
@@ -179,12 +189,12 @@ public class TestAsyncNonMetaRegionLocator {
   private ServerName[] getLocations(byte[][] startKeys) {
     ServerName[] serverNames = new ServerName[startKeys.length];
     TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
-        .forEach(rs -> {
-          rs.getRegions(TABLE_NAME).forEach(r -> {
-            serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
-              Bytes::compareTo)] = rs.getServerName();
-          });
+      .forEach(rs -> {
+        rs.getRegions(TABLE_NAME).forEach(r -> {
+          serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
+            Bytes::compareTo)] = rs.getServerName();
         });
+      });
     return serverNames;
   }
 
@@ -196,8 +206,9 @@ public class TestAsyncNonMetaRegionLocator {
     IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
       try {
         assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
-          serverNames[i], LOCATOR
-              .getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false).get());
+          serverNames[i],
+          getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false)
+            .get());
       } catch (InterruptedException | ExecutionException e) {
         throw new RuntimeException(e);
       }
@@ -208,7 +219,7 @@ public class TestAsyncNonMetaRegionLocator {
       try {
         assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
           serverNames[i],
-          LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get());
+          getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get());
       } catch (InterruptedException | ExecutionException e) {
         throw new RuntimeException(e);
       }
@@ -220,8 +231,7 @@ public class TestAsyncNonMetaRegionLocator {
       n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
         try {
           assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
-            LOCATOR.getRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false)
-                .get());
+            getDefaultRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false).get());
         } catch (InterruptedException | ExecutionException e) {
           throw new RuntimeException(e);
         }
@@ -232,29 +242,29 @@ public class TestAsyncNonMetaRegionLocator {
   public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
     createSingleRegionTable();
     ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
-    HRegionLocation loc = LOCATOR
-        .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
+    HRegionLocation loc =
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
     assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
     ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
-        .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
-        .findAny().get();
+      .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
+      .get();
 
     TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegion().getEncodedName()),
       Bytes.toBytes(newServerName.getServerName()));
     while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName()
-        .equals(newServerName)) {
+      .equals(newServerName)) {
       Thread.sleep(100);
     }
     // Should be same as it is in cache
-    assertSame(loc, LOCATOR
-        .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
-    LOCATOR.updateCachedLocation(loc, null);
+    assertSame(loc,
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
+    LOCATOR.updateCachedLocationOnError(loc, null);
     // null error will not trigger a cache cleanup
-    assertSame(loc, LOCATOR
-        .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
-    LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
-    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, LOCATOR
-        .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
+    assertSame(loc,
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
+    LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException());
+    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
   }
 
   // usually locate after will return the same result, so we add a test to make it return different
@@ -266,21 +276,21 @@ public class TestAsyncNonMetaRegionLocator {
     TEST_UTIL.createTable(TABLE_NAME, FAMILY, new byte[][] { splitKey });
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     HRegionLocation currentLoc =
-        LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get();
+      getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get();
     ServerName currentServerName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
     assertLocEquals(EMPTY_START_ROW, splitKey, currentServerName, currentLoc);
 
     HRegionLocation afterLoc =
-        LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get();
+      getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get();
     ServerName afterServerName =
-        TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
-            .filter(rs -> rs.getRegions(TABLE_NAME).stream()
-                .anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey())))
-            .findAny().get().getServerName();
+      TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+        .filter(rs -> rs.getRegions(TABLE_NAME).stream()
+          .anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey())))
+        .findAny().get().getServerName();
     assertLocEquals(splitKey, EMPTY_END_ROW, afterServerName, afterLoc);
 
     assertSame(afterLoc,
-      LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get());
+      getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get());
   }
 
   // For HBASE-17402
@@ -292,9 +302,9 @@ public class TestAsyncNonMetaRegionLocator {
     ServerName[] serverNames = getLocations(startKeys);
     for (int i = 0; i < 100; i++) {
       LOCATOR.clearCache(TABLE_NAME);
-      List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 1000)
-          .mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
-          .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
+      List<CompletableFuture<HRegionLocation>> futures =
+        IntStream.range(0, 1000).mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
+          .map(r -> getDefaultRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
           .collect(toList());
       for (int j = 0; j < 1000; j++) {
         int index = Math.min(8, j / 111);
@@ -309,11 +319,11 @@ public class TestAsyncNonMetaRegionLocator {
     ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
     for (RegionLocateType locateType : RegionLocateType.values()) {
       assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-        LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
     }
     ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
-        .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
-        .findAny().get();
+      .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
+      .get();
     Admin admin = TEST_UTIL.getAdmin();
     RegionInfo region = admin.getRegions(TABLE_NAME).stream().findAny().get();
     admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(newServerName.getServerName()));
@@ -334,15 +344,15 @@ public class TestAsyncNonMetaRegionLocator {
     // The cached location will not change
     for (RegionLocateType locateType : RegionLocateType.values()) {
       assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-        LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
     }
     // should get the new location when reload = true
     assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
-      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
     // the cached location should be replaced
     for (RegionLocateType locateType : RegionLocateType.values()) {
       assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
-        LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
     }
   }
 
@@ -351,10 +361,32 @@ public class TestAsyncNonMetaRegionLocator {
   public void testLocateBeforeLastRegion()
       throws IOException, InterruptedException, ExecutionException {
     createMultiRegionTable();
-    LOCATOR.getRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
+    getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
     HRegionLocation loc =
-      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get();
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get();
     // should locate to the last region
     assertArrayEquals(loc.getRegion().getEndKey(), EMPTY_END_ROW);
   }
+
+  @Test
+  public void testRegionReplicas() throws Exception {
+    TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3).build());
+    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
+    testLocator(TEST_UTIL, TABLE_NAME, new Locator() {
+
+      @Override
+      public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
+          throws Exception {
+        LOCATOR.updateCachedLocationOnError(loc, error);
+      }
+
+      @Override
+      public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
+          throws Exception {
+        return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
+          RegionLocateType.CURRENT, reload).get();
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index c6624e7..8cdb4a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
@@ -59,7 +60,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
+    HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
@@ -124,10 +125,10 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-        registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), User.getCurrent());
     LOCATOR = new AsyncNonMetaRegionLocator(CONN);
     SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
-        .toArray(byte[][]::new);
+      .toArray(byte[][]::new);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
   }
@@ -138,11 +139,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  private void assertLocs(List<CompletableFuture<HRegionLocation>> futures)
+  private void assertLocs(List<CompletableFuture<RegionLocations>> futures)
       throws InterruptedException, ExecutionException {
     assertEquals(256, futures.size());
     for (int i = 0; i < futures.size(); i++) {
-      HRegionLocation loc = futures.get(i).get();
+      HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation();
       if (i == 0) {
         assertTrue(isEmptyStartRow(loc.getRegion().getStartKey()));
       } else {
@@ -158,10 +159,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
 
   @Test
   public void test() throws InterruptedException, ExecutionException {
-    List<CompletableFuture<HRegionLocation>> futures =
-        IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
-            .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
-            .collect(toList());
+    List<CompletableFuture<RegionLocations>> futures =
+      IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
+        .map(r -> LOCATOR.getRegionLocations(TABLE_NAME, r, RegionReplicaUtil.DEFAULT_REPLICA_ID,
+          RegionLocateType.CURRENT, false))
+        .collect(toList());
     assertLocs(futures);
     assertTrue("max allowed is " + MAX_ALLOWED + " but actual is " + MAX_CONCURRENCY.get(),
       MAX_CONCURRENCY.get() <= MAX_ALLOWED);

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index a6c2efb..7d8956b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -49,7 +49,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
+    HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
@@ -73,7 +73,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-        registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), User.getCurrent());
   }
 
   @AfterClass
@@ -89,8 +89,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName());
     TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), Bytes.toBytes(
       TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName()));
-    AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME)
-        .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(30).build();
+    AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
+      .setMaxRetries(30).build();
     table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
 
     // move back
@@ -110,8 +110,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
   public void testMaxRetries() throws IOException, InterruptedException {
     try {
       CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
-          .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
-          .action((controller, loc, stub) -> failedFuture()).call().get();
+        .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
+        .action((controller, loc, stub) -> failedFuture()).call().get();
       fail();
     } catch (ExecutionException e) {
       assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
@@ -123,8 +123,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     long startNs = System.nanoTime();
     try {
       CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS)
-          .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
-          .action((controller, loc, stub) -> failedFuture()).call().get();
+        .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
+        .action((controller, loc, stub) -> failedFuture()).call().get();
       fail();
     } catch (ExecutionException e) {
       e.printStackTrace();
@@ -141,30 +141,30 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     AtomicInteger count = new AtomicInteger(0);
     HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
     AsyncRegionLocator mockedLocator =
-        new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
-          @Override
-          CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
-              RegionLocateType locateType, long timeoutNs) {
-            if (tableName.equals(TABLE_NAME)) {
-              CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
-              if (count.getAndIncrement() == 0) {
-                errorTriggered.set(true);
-                future.completeExceptionally(new RuntimeException("Inject error!"));
-              } else {
-                future.complete(loc);
-              }
-              return future;
+      new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
+        @Override
+        CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+            int replicaId, RegionLocateType locateType, long timeoutNs) {
+          if (tableName.equals(TABLE_NAME)) {
+            CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+            if (count.getAndIncrement() == 0) {
+              errorTriggered.set(true);
+              future.completeExceptionally(new RuntimeException("Inject error!"));
             } else {
-              return super.getRegionLocation(tableName, row, locateType, timeoutNs);
+              future.complete(loc);
             }
+            return future;
+          } else {
+            return super.getRegionLocation(tableName, row, replicaId, locateType, timeoutNs);
           }
+        }
 
-          @Override
-          void updateCachedLocation(HRegionLocation loc, Throwable exception) {
-          }
-        };
+        @Override
+        void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+        }
+      };
     try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
-        CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
+      CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
 
       @Override
       AsyncRegionLocator getLocator() {
@@ -172,7 +172,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
       }
     }) {
       AsyncTable<?> table = mockedConn.getTableBuilder(TABLE_NAME)
-          .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
+        .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
       table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
       assertTrue(errorTriggered.get());
       errorTriggered.set(false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
index 13d8000..6c6bb98 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
@@ -69,8 +69,8 @@ public class TestAsyncTableLocatePrefetch {
 
   @Test
   public void test() throws InterruptedException, ExecutionException {
-    assertNotNull(LOCATOR
-      .getRegionLocation(TABLE_NAME, Bytes.toBytes("zzz"), RegionLocateType.CURRENT, false).get());
+    assertNotNull(LOCATOR.getRegionLocations(TABLE_NAME, Bytes.toBytes("zzz"),
+      RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get());
     // we finish the request before we adding the remaining results to cache so sleep a bit here
     Thread.sleep(1000);
     // confirm that the locations of all the regions have been cached.

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
new file mode 100644
index 0000000..0445a0e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableRegionReplicasGet {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasGet.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+  private static byte[] ROW = Bytes.toBytes("row");
+
+  private static byte[] VALUE = Bytes.toBytes("value");
+
+  private static AsyncConnection ASYNC_CONN;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Parameter
+  public Supplier<AsyncTable<?>> getTable;
+
+  private static AsyncTable<?> getRawTable() {
+    return ASYNC_CONN.getTable(TABLE_NAME);
+  }
+
+  private static AsyncTable<?> getTable() {
+    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
+  }
+
+  @Parameters
+  public static List<Object[]> params() {
+    return Arrays.asList(new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getRawTable },
+      new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getTable });
+  }
+
+  private static volatile boolean FAIL_PRIMARY_GET = false;
+
+  private static AtomicInteger PRIMARY_GET_COUNT = new AtomicInteger(0);
+
+  private static AtomicInteger SECONDARY_GET_COUNT = new AtomicInteger(0);
+
+  public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
+        List<Cell> result) throws IOException {
+      RegionInfo region = c.getEnvironment().getRegionInfo();
+      if (!region.getTable().equals(TABLE_NAME)) {
+        return;
+      }
+      if (region.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
+        SECONDARY_GET_COUNT.incrementAndGet();
+      } else {
+        PRIMARY_GET_COUNT.incrementAndGet();
+        if (FAIL_PRIMARY_GET) {
+          throw new IOException("Inject error");
+        }
+      }
+    }
+  }
+
+  private static boolean allReplicasHaveRow() throws IOException {
+    for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+      for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
+        if (region.get(new Get(ROW), false).isEmpty()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // 10 mins
+    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+      TimeUnit.MINUTES.toMillis(10));
+    // 1 second
+    TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND,
+      TimeUnit.SECONDS.toMicros(1));
+    // set a small pause so we will retry very quickly
+    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
+    // infinite retry
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
+    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.getAdmin()
+      .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3)
+        .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
+    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+    AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
+    table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
+    // this is the fastest way to let all replicas have the row
+    TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+    TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
+    TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    IOUtils.closeQuietly(ASYNC_CONN);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testNoReplicaRead() throws Exception {
+    FAIL_PRIMARY_GET = false;
+    SECONDARY_GET_COUNT.set(0);
+    AsyncTable<?> table = getTable.get();
+    Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
+    for (int i = 0; i < 1000; i++) {
+      assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+    }
+    // the primary region is fine and the primary timeout is 1 second which is long enough, so we
+    // should not send any requests to secondary replicas even if the consistency is timeline.
+    Thread.sleep(5000);
+    assertEquals(0, SECONDARY_GET_COUNT.get());
+  }
+
+  @Test
+  public void testReplicaRead() throws Exception {
+    // fail the primary get request
+    FAIL_PRIMARY_GET = true;
+    Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
+    // make sure that we could still get the value from secondary replicas
+    AsyncTable<?> table = getTable.get();
+    assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+    // make sure that the primary request has been canceled
+    Thread.sleep(5000);
+    int count = PRIMARY_GET_COUNT.get();
+    Thread.sleep(10000);
+    assertEquals(count, PRIMARY_GET_COUNT.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
index db7546f..46890d0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertNotSame;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.IntStream;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
@@ -52,43 +50,13 @@ public class TestZKAsyncRegistry {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestZKAsyncRegistry.class);
+    HBaseClassTestRule.forClass(TestZKAsyncRegistry.class);
 
-  private static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class);
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class);
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
   private static ZKAsyncRegistry REGISTRY;
 
-  // waits for all replicas to have region location
-  static void waitUntilAllReplicasHavingRegionLocation(TableName tbl) throws IOException {
-    TEST_UTIL.waitFor(
-      TEST_UTIL.getConfiguration().getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
-      new ExplainingPredicate<IOException>() {
-        @Override
-        public String explainFailure() throws IOException {
-          return TEST_UTIL.explainTableAvailability(tbl);
-        }
-
-        @Override
-        public boolean evaluate() throws IOException {
-          AtomicBoolean ready = new AtomicBoolean(true);
-          try {
-            RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
-            assertEquals(3, locs.getRegionLocations().length);
-            IntStream.range(0, 3).forEach(i -> {
-              HRegionLocation loc = locs.getRegionLocation(i);
-              if (loc == null) {
-                ready.set(false);
-              }
-            });
-          } catch (Exception e) {
-            ready.set(false);
-          }
-          return ready.get();
-        }
-      });
-  }
-
   @BeforeClass
   public static void setUp() throws Exception {
     TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
@@ -107,14 +75,14 @@ public class TestZKAsyncRegistry {
     LOG.info("STARTED TEST");
     String clusterId = REGISTRY.getClusterId().get();
     String expectedClusterId = TEST_UTIL.getHBaseCluster().getMaster().getClusterId();
-    assertEquals("Expected " + expectedClusterId + ", found=" + clusterId,
-        expectedClusterId, clusterId);
+    assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, expectedClusterId,
+      clusterId);
     assertEquals(TEST_UTIL.getHBaseCluster().getClusterMetrics().getLiveServerMetrics().size(),
       REGISTRY.getCurrentNrHRS().get().intValue());
     assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
       REGISTRY.getMasterAddress().get());
     assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue());
-    waitUntilAllReplicasHavingRegionLocation(TableName.META_TABLE_NAME);
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3);
     RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
     assertEquals(3, locs.getRegionLocations().length);
     IntStream.range(0, 3).forEach(i -> {