You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/03 02:55:27 UTC

[1/4] hbase git commit: HBASE-17356 Add replica get support

Repository: hbase
Updated Branches:
  refs/heads/branch-2.0 40d2787e2 -> ff23c0221


http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index d705d7c..28db7e8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.client;
 import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import com.google.protobuf.RpcChannel;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -45,9 +45,12 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@@ -63,7 +66,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
 
 /**
  * The implementation of RawAsyncTable.
- * <p>
+ * <p/>
  * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
  * be finished inside the rpc framework thread, which means that the callbacks registered to the
  * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
@@ -74,6 +77,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
 @InterfaceAudience.Private
 class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
+  private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
+
   private final AsyncConnectionImpl conn;
 
   private final TableName tableName;
@@ -204,58 +209,126 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
   private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
     return conn.callerFactory.<T> single().table(tableName).row(row)
-        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
-        .startLogErrorsCnt(startLogErrorsCnt);
+      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+      .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+      .startLogErrorsCnt(startLogErrorsCnt);
   }
 
   private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
     return newCaller(row.getRow(), rpcTimeoutNs);
   }
 
+  private CompletableFuture<Result> get(Get get, int replicaId, long timeoutNs) {
+    return this.<Result> newCaller(get, timeoutNs)
+      .action((controller, loc, stub) -> RawAsyncTableImpl
+        .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
+          RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
+          (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
+      .replicaId(replicaId).call();
+  }
+
+  // Connect the two futures, if the src future is done, then mark the dst future as done. And if
+  // the dst future is done, then cancel the src future. This is used for timeline consistent read.
+  private <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
+    addListener(srcFuture, (r, e) -> {
+      if (e != null) {
+        dstFuture.completeExceptionally(e);
+      } else {
+        dstFuture.complete(r);
+      }
+    });
+    // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
+    // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst
+    // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in
+    // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the
+    // tie.
+    addListener(dstFuture, (r, e) -> srcFuture.cancel(false));
+  }
+
+  private void timelineConsistentGet(Get get, RegionLocations locs,
+      CompletableFuture<Result> future) {
+    if (future.isDone()) {
+      // do not send requests to secondary replicas if the future is done, i.e, the primary request
+      // has already been finished.
+      return;
+    }
+    for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
+      CompletableFuture<Result> secondaryFuture = get(get, replicaId, readRpcTimeoutNs);
+      connect(secondaryFuture, future);
+    }
+  }
+
   @Override
   public CompletableFuture<Result> get(Get get) {
-    return this.<Result> newCaller(get, readRpcTimeoutNs)
-        .action((controller, loc, stub) -> RawAsyncTableImpl
-            .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
-              RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
-              (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
-        .call();
+    CompletableFuture<Result> primaryFuture =
+      get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
+    if (get.getConsistency() == Consistency.STRONG) {
+      return primaryFuture;
+    }
+    // Timeline consistent read, where we will send requests to other region replicas
+    CompletableFuture<Result> future = new CompletableFuture<>();
+    connect(primaryFuture, future);
+    long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
+    long startNs = System.nanoTime();
+    addListener(conn.getLocator().getRegionLocations(tableName, get.getRow(),
+      RegionLocateType.CURRENT, false, readRpcTimeoutNs), (locs, error) -> {
+        if (error != null) {
+          LOG.warn(
+            "Failed to locate all the replicas for table={}, row='{}'," +
+              " give up timeline consistent read",
+            tableName, Bytes.toStringBinary(get.getRow()), error);
+          return;
+        }
+        if (locs.size() <= 1) {
+          LOG.warn(
+            "There are no secondary replicas for region {}," + " give up timeline consistent read",
+            locs.getDefaultRegionLocation().getRegion());
+          return;
+        }
+        long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
+        if (delayNs <= 0) {
+          timelineConsistentGet(get, locs, future);
+        } else {
+          AsyncConnectionImpl.RETRY_TIMER.newTimeout(
+            timeout -> timelineConsistentGet(get, locs, future), delayNs, TimeUnit.NANOSECONDS);
+        }
+      });
+    return future;
   }
 
   @Override
   public CompletableFuture<Void> put(Put put) {
     return this.<Void> newCaller(put, writeRpcTimeoutNs)
-        .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
-          put, RequestConverter::buildMutateRequest))
-        .call();
+      .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
+        put, RequestConverter::buildMutateRequest))
+      .call();
   }
 
   @Override
   public CompletableFuture<Void> delete(Delete delete) {
     return this.<Void> newCaller(delete, writeRpcTimeoutNs)
-        .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
-          stub, delete, RequestConverter::buildMutateRequest))
-        .call();
+      .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
+        stub, delete, RequestConverter::buildMutateRequest))
+      .call();
   }
 
   @Override
   public CompletableFuture<Result> append(Append append) {
     checkHasFamilies(append);
     return this.<Result> newCaller(append, rpcTimeoutNs)
-        .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
-          append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
-        .call();
+      .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
+        append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
+      .call();
   }
 
   @Override
   public CompletableFuture<Result> increment(Increment increment) {
     checkHasFamilies(increment);
     return this.<Result> newCaller(increment, rpcTimeoutNs)
-        .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
-          stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
-        .call();
+      .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
+        stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
+      .call();
   }
 
   private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
@@ -313,36 +386,36 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     public CompletableFuture<Boolean> thenPut(Put put) {
       preCheck();
       return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
-          .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller,
-            loc, stub, put,
-            (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
-              new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
-            (c, r) -> r.getProcessed()))
-          .call();
+        .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
+          stub, put,
+          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+            new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
+          (c, r) -> r.getProcessed()))
+        .call();
     }
 
     @Override
     public CompletableFuture<Boolean> thenDelete(Delete delete) {
       preCheck();
       return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
-          .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
-            loc, stub, delete,
-            (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
-              new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
-            (c, r) -> r.getProcessed()))
-          .call();
+        .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
+          loc, stub, delete,
+          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+            new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
+          (c, r) -> r.getProcessed()))
+        .call();
     }
 
     @Override
     public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
       preCheck();
       return RawAsyncTableImpl.this.<Boolean> newCaller(mutation, rpcTimeoutNs)
-          .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
-            stub, mutation,
-            (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
-              new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
-            resp -> resp.getExists()))
-          .call();
+        .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
+          stub, mutation,
+          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+            new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
+          resp -> resp.getExists()))
+        .call();
     }
   }
 
@@ -375,10 +448,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
               if (ex != null) {
                 future.completeExceptionally(ex instanceof IOException ? ex
                   : new IOException(
-                      "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
+                    "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
               } else {
                 future.complete(respConverter
-                    .apply((Result) multiResp.getResults().get(regionName).result.get(0)));
+                  .apply((Result) multiResp.getResults().get(regionName).result.get(0)));
               }
             } catch (IOException e) {
               future.completeExceptionally(e);
@@ -399,7 +472,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
           regionMutationBuilder.setAtomic(true);
           return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
-        }, resp -> null)).call();
+        }, resp -> null))
+      .call();
   }
 
   private Scan setDefaultScanConfig(Scan scan) {
@@ -416,7 +490,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
   public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
     new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
-        maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
+      maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
   }
 
   private long resultSize2CacheSize(long maxResultSize) {
@@ -427,8 +501,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
   @Override
   public ResultScanner getScanner(Scan scan) {
     return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan),
-        resultSize2CacheSize(
-          scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
+      resultSize2CacheSize(
+        scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
   }
 
   @Override
@@ -477,14 +551,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
   private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
     return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
-        .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
+      .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
   }
 
   private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
     return conn.callerFactory.batch().table(tableName).actions(actions)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
-        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
+      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
   }
 
   @Override
@@ -515,7 +589,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
   private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
       ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
     RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
-        region, row, rpcTimeoutNs, operationTimeoutNs);
+      region, row, rpcTimeoutNs, operationTimeoutNs);
     S stub = stubMaker.apply(channel);
     CompletableFuture<R> future = new CompletableFuture<>();
     ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
@@ -553,10 +627,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
   }
 
   private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
-      ServiceCaller<S, R> callable, CoprocessorCallback<R> callback,
-      List<HRegionLocation> locs, byte[] endKey, boolean endKeyInclusive,
-      AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc,
-      Throwable error) {
+      ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
+      byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
+      AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
     if (error != null) {
       callback.onError(error);
       return;
@@ -566,11 +639,11 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     if (locateFinished(region, endKey, endKeyInclusive)) {
       locateFinished.set(true);
     } else {
-      conn.getLocator()
-          .getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
-            operationTimeoutNs)
-          .whenComplete((l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey,
-            endKeyInclusive, locateFinished, unfinishedRequest, l, e));
+      addListener(
+        conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
+          operationTimeoutNs),
+        (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
+          locateFinished, unfinishedRequest, l, e));
     }
     coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> {
       if (e != null) {
@@ -630,11 +703,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
     @Override
     public void execute() {
-      conn.getLocator().getRegionLocation(tableName, startKey,
-        startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs)
-          .whenComplete(
-            (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(),
-              endKey, endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
+      addListener(conn.getLocator().getRegionLocation(tableName, startKey,
+        startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs),
+        (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
+          endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
new file mode 100644
index 0000000..067e66b
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class for processing futures.
+ */
+@InterfaceAudience.Private
+public final class FutureUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class);
+
+  private FutureUtils() {
+  }
+
+  /**
+   * This is method is used when you just want to add a listener to the given future. We will call
+   * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
+   * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
+   * suppress exceptions thrown from the code that completes the future, and this method will catch
+   * all the exception thrown from the {@code action} to catch possible code bugs.
+   * <p/>
+   * And the error phone check will always report FutureReturnValueIgnored because every method in
+   * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
+   * have one future that has not been checked. So we introduce this method and add a suppress
+   * warnings annotation here.
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public static <T> void addListener(CompletableFuture<T> future,
+      BiConsumer<? super T, ? super Throwable> action) {
+    future.whenComplete((resp, error) -> {
+      try {
+        action.accept(resp, error);
+      } catch (Throwable t) {
+        LOG.error("Unexpected error caught when processing CompletableFuture", t);
+      }
+    });
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
new file mode 100644
index 0000000..c14f69f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.util.Bytes;
+
+final class RegionReplicaTestHelper {
+
+  private RegionReplicaTestHelper() {
+  }
+
+  // waits for all replicas to have region location
+  static void waitUntilAllMetaReplicasHavingRegionLocation(AsyncRegistry registry,
+      int regionReplication) throws IOException {
+    TestZKAsyncRegistry.TEST_UTIL.waitFor(
+      TestZKAsyncRegistry.TEST_UTIL.getConfiguration()
+        .getLong("hbase.client.sync.wait.timeout.msec", 60000),
+      200, true, new ExplainingPredicate<IOException>() {
+        @Override
+        public String explainFailure() throws IOException {
+          return "Not all meta replicas get assigned";
+        }
+
+        @Override
+        public boolean evaluate() throws IOException {
+          try {
+            RegionLocations locs = registry.getMetaRegionLocation().get();
+            if (locs.size() < regionReplication) {
+              return false;
+            }
+            for (int i = 0; i < regionReplication; i++) {
+              if (locs.getRegionLocation(i) == null) {
+                return false;
+              }
+            }
+            return true;
+          } catch (Exception e) {
+            TestZKAsyncRegistry.LOG.warn("Failed to get meta region locations", e);
+            return false;
+          }
+        }
+      });
+  }
+
+  static Optional<ServerName> getRSCarryingReplica(HBaseTestingUtility util, TableName tableName,
+      int replicaId) {
+    return util.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+      .filter(rs -> rs.getRegions(tableName).stream()
+        .anyMatch(r -> r.getRegionInfo().getReplicaId() == replicaId))
+      .findAny().map(rs -> rs.getServerName());
+  }
+
+  /**
+   * Return the new location.
+   */
+  static ServerName moveRegion(HBaseTestingUtility util, HRegionLocation currentLoc)
+      throws Exception {
+    ServerName serverName = currentLoc.getServerName();
+    RegionInfo regionInfo = currentLoc.getRegion();
+    TableName tableName = regionInfo.getTable();
+    int replicaId = regionInfo.getReplicaId();
+    ServerName newServerName = util.getHBaseCluster().getRegionServerThreads().stream()
+      .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
+      .get();
+    util.getAdmin().move(regionInfo.getEncodedNameAsBytes(),
+      Bytes.toBytes(newServerName.getServerName()));
+    util.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        Optional<ServerName> newServerName = getRSCarryingReplica(util, tableName, replicaId);
+        return newServerName.isPresent() && !newServerName.get().equals(serverName);
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return regionInfo.getRegionNameAsString() + " is still on " + serverName;
+      }
+    });
+    return newServerName;
+  }
+
+  interface Locator {
+    RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
+        throws Exception;
+
+    void updateCachedLocationOnError(HRegionLocation loc, Throwable error) throws Exception;
+  }
+
+  static void testLocator(HBaseTestingUtility util, TableName tableName, Locator locator)
+      throws Exception {
+    RegionLocations locs =
+      locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false);
+    assertEquals(3, locs.size());
+    for (int i = 0; i < 3; i++) {
+      HRegionLocation loc = locs.getRegionLocation(i);
+      assertNotNull(loc);
+      ServerName serverName = getRSCarryingReplica(util, tableName, i).get();
+      assertEquals(serverName, loc.getServerName());
+    }
+    ServerName newServerName = moveRegion(util, locs.getDefaultRegionLocation());
+    // The cached location should not be changed
+    assertEquals(locs.getDefaultRegionLocation().getServerName(),
+      locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false)
+        .getDefaultRegionLocation().getServerName());
+    // should get the new location when reload = true
+    assertEquals(newServerName,
+      locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, true)
+        .getDefaultRegionLocation().getServerName());
+    // the cached location should be replaced
+    assertEquals(newServerName,
+      locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false)
+        .getDefaultRegionLocation().getServerName());
+
+    ServerName newServerName1 = moveRegion(util, locs.getRegionLocation(1));
+    ServerName newServerName2 = moveRegion(util, locs.getRegionLocation(2));
+
+    // The cached location should not be change
+    assertEquals(locs.getRegionLocation(1).getServerName(),
+      locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName());
+    // clear the cached location for replica 1
+    locator.updateCachedLocationOnError(locs.getRegionLocation(1), new NotServingRegionException());
+    // the cached location for replica 2 should not be changed
+    assertEquals(locs.getRegionLocation(2).getServerName(),
+      locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName());
+    // should get the new location as we have cleared the old location
+    assertEquals(newServerName1,
+      locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName());
+    // as we will get the new location for replica 2 at once, we should also get the new location
+    // for replica 2
+    assertEquals(newServerName2,
+      locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
index 7c08d6d..df1fe08 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
@@ -17,20 +17,19 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.assertEquals;
+import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
 
-import java.util.Optional;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -42,7 +41,7 @@ public class TestAsyncMetaRegionLocator {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class);
+    HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
@@ -53,10 +52,11 @@ public class TestAsyncMetaRegionLocator {
   @BeforeClass
   public static void setUp() throws Exception {
     TEST_UTIL.getConfiguration().set(BaseLoadBalancer.TABLES_ON_MASTER, "none");
+    TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
     TEST_UTIL.startMiniCluster(3);
-    TEST_UTIL.waitUntilAllSystemRegionsAssigned();
-    TEST_UTIL.getAdmin().setBalancerRunning(false, true);
     REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3);
+    TEST_UTIL.getAdmin().balancerSwitch(false, true);
     LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
   }
 
@@ -66,42 +66,21 @@ public class TestAsyncMetaRegionLocator {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  private Optional<ServerName> getRSCarryingMeta() {
-    return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
-        .map(t -> t.getRegionServer())
-        .filter(rs -> !rs.getRegions(TableName.META_TABLE_NAME).isEmpty()).findAny()
-        .map(rs -> rs.getServerName());
-  }
-
   @Test
-  public void testReload() throws Exception {
-    ServerName serverName = getRSCarryingMeta().get();
-    assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName());
-
-    ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
-        .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
-        .findAny().get();
-    TEST_UTIL.getAdmin().move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
-      Bytes.toBytes(newServerName.getServerName()));
-    TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+  public void test() throws Exception {
+    testLocator(TEST_UTIL, TableName.META_TABLE_NAME, new Locator() {
 
       @Override
-      public boolean evaluate() throws Exception {
-        Optional<ServerName> newServerName = getRSCarryingMeta();
-        return newServerName.isPresent() && !newServerName.get().equals(serverName);
+      public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
+          throws Exception {
+        LOCATOR.updateCachedLocationOnError(loc, error);
       }
 
       @Override
-      public String explainFailure() throws Exception {
-        return HRegionInfo.FIRST_META_REGIONINFO.getRegionNameAsString() + " is still on " +
-            serverName;
+      public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
+          throws Exception {
+        return LOCATOR.getRegionLocations(replicaId, reload).get();
       }
     });
-    // The cached location will not change
-    assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName());
-    // should get the new location when reload = true
-    assertEquals(newServerName, LOCATOR.getRegionLocation(true).get().getServerName());
-    // the cached location should be replaced
-    assertEquals(newServerName, LOCATOR.getRegionLocation(false).get().getServerName());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index 38dc78d..eeaf99f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -38,10 +39,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -58,7 +61,7 @@ public class TestAsyncNonMetaRegionLocator {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
+    HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
@@ -78,7 +81,7 @@ public class TestAsyncNonMetaRegionLocator {
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-        registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), User.getCurrent());
     LOCATOR = new AsyncNonMetaRegionLocator(CONN);
     SPLIT_KEYS = new byte[8][];
     for (int i = 111; i < 999; i += 111) {
@@ -109,11 +112,18 @@ public class TestAsyncNonMetaRegionLocator {
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
   }
 
+  private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName,
+      byte[] row, RegionLocateType locateType, boolean reload) {
+    return LOCATOR
+      .getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload)
+      .thenApply(RegionLocations::getDefaultRegionLocation);
+  }
+
   @Test
   public void testNoTable() throws InterruptedException {
     for (RegionLocateType locateType : RegionLocateType.values()) {
       try {
-        LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
+        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
       } catch (ExecutionException e) {
         assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
       }
@@ -126,7 +136,7 @@ public class TestAsyncNonMetaRegionLocator {
     TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
     for (RegionLocateType locateType : RegionLocateType.values()) {
       try {
-        LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
+        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
       } catch (ExecutionException e) {
         assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
       }
@@ -148,13 +158,13 @@ public class TestAsyncNonMetaRegionLocator {
     ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
     for (RegionLocateType locateType : RegionLocateType.values()) {
       assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-        LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
     }
     byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
     ThreadLocalRandom.current().nextBytes(randKey);
     for (RegionLocateType locateType : RegionLocateType.values()) {
       assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-        LOCATOR.getRegionLocation(TABLE_NAME, randKey, locateType, false).get());
+        getDefaultRegionLocation(TABLE_NAME, randKey, locateType, false).get());
     }
   }
 
@@ -179,12 +189,12 @@ public class TestAsyncNonMetaRegionLocator {
   private ServerName[] getLocations(byte[][] startKeys) {
     ServerName[] serverNames = new ServerName[startKeys.length];
     TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
-        .forEach(rs -> {
-          rs.getRegions(TABLE_NAME).forEach(r -> {
-            serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
-              Bytes::compareTo)] = rs.getServerName();
-          });
+      .forEach(rs -> {
+        rs.getRegions(TABLE_NAME).forEach(r -> {
+          serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
+            Bytes::compareTo)] = rs.getServerName();
         });
+      });
     return serverNames;
   }
 
@@ -196,8 +206,9 @@ public class TestAsyncNonMetaRegionLocator {
     IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
       try {
         assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
-          serverNames[i], LOCATOR
-              .getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false).get());
+          serverNames[i],
+          getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false)
+            .get());
       } catch (InterruptedException | ExecutionException e) {
         throw new RuntimeException(e);
       }
@@ -208,7 +219,7 @@ public class TestAsyncNonMetaRegionLocator {
       try {
         assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
           serverNames[i],
-          LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get());
+          getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get());
       } catch (InterruptedException | ExecutionException e) {
         throw new RuntimeException(e);
       }
@@ -220,8 +231,7 @@ public class TestAsyncNonMetaRegionLocator {
       n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
         try {
           assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
-            LOCATOR.getRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false)
-                .get());
+            getDefaultRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false).get());
         } catch (InterruptedException | ExecutionException e) {
           throw new RuntimeException(e);
         }
@@ -232,29 +242,29 @@ public class TestAsyncNonMetaRegionLocator {
   public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
     createSingleRegionTable();
     ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
-    HRegionLocation loc = LOCATOR
-        .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
+    HRegionLocation loc =
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
     assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
     ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
-        .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
-        .findAny().get();
+      .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
+      .get();
 
     TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegion().getEncodedName()),
       Bytes.toBytes(newServerName.getServerName()));
     while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName()
-        .equals(newServerName)) {
+      .equals(newServerName)) {
       Thread.sleep(100);
     }
     // Should be same as it is in cache
-    assertSame(loc, LOCATOR
-        .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
-    LOCATOR.updateCachedLocation(loc, null);
+    assertSame(loc,
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
+    LOCATOR.updateCachedLocationOnError(loc, null);
     // null error will not trigger a cache cleanup
-    assertSame(loc, LOCATOR
-        .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
-    LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
-    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, LOCATOR
-        .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
+    assertSame(loc,
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
+    LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException());
+    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
   }
 
   // usually locate after will return the same result, so we add a test to make it return different
@@ -266,21 +276,21 @@ public class TestAsyncNonMetaRegionLocator {
     TEST_UTIL.createTable(TABLE_NAME, FAMILY, new byte[][] { splitKey });
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     HRegionLocation currentLoc =
-        LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get();
+      getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get();
     ServerName currentServerName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
     assertLocEquals(EMPTY_START_ROW, splitKey, currentServerName, currentLoc);
 
     HRegionLocation afterLoc =
-        LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get();
+      getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get();
     ServerName afterServerName =
-        TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
-            .filter(rs -> rs.getRegions(TABLE_NAME).stream()
-                .anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey())))
-            .findAny().get().getServerName();
+      TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
+        .filter(rs -> rs.getRegions(TABLE_NAME).stream()
+          .anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey())))
+        .findAny().get().getServerName();
     assertLocEquals(splitKey, EMPTY_END_ROW, afterServerName, afterLoc);
 
     assertSame(afterLoc,
-      LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get());
+      getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get());
   }
 
   // For HBASE-17402
@@ -292,9 +302,9 @@ public class TestAsyncNonMetaRegionLocator {
     ServerName[] serverNames = getLocations(startKeys);
     for (int i = 0; i < 100; i++) {
       LOCATOR.clearCache(TABLE_NAME);
-      List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 1000)
-          .mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
-          .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
+      List<CompletableFuture<HRegionLocation>> futures =
+        IntStream.range(0, 1000).mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
+          .map(r -> getDefaultRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
           .collect(toList());
       for (int j = 0; j < 1000; j++) {
         int index = Math.min(8, j / 111);
@@ -309,11 +319,11 @@ public class TestAsyncNonMetaRegionLocator {
     ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
     for (RegionLocateType locateType : RegionLocateType.values()) {
       assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-        LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
     }
     ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
-        .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
-        .findAny().get();
+      .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
+      .get();
     Admin admin = TEST_UTIL.getAdmin();
     RegionInfo region = admin.getRegions(TABLE_NAME).stream().findAny().get();
     admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(newServerName.getServerName()));
@@ -334,15 +344,15 @@ public class TestAsyncNonMetaRegionLocator {
     // The cached location will not change
     for (RegionLocateType locateType : RegionLocateType.values()) {
       assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
-        LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
     }
     // should get the new location when reload = true
     assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
-      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
     // the cached location should be replaced
     for (RegionLocateType locateType : RegionLocateType.values()) {
       assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
-        LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
+        getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
     }
   }
 
@@ -351,10 +361,32 @@ public class TestAsyncNonMetaRegionLocator {
   public void testLocateBeforeLastRegion()
       throws IOException, InterruptedException, ExecutionException {
     createMultiRegionTable();
-    LOCATOR.getRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
+    getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
     HRegionLocation loc =
-      LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get();
+      getDefaultRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get();
     // should locate to the last region
     assertArrayEquals(loc.getRegion().getEndKey(), EMPTY_END_ROW);
   }
+
+  @Test
+  public void testRegionReplicas() throws Exception {
+    TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3).build());
+    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
+    testLocator(TEST_UTIL, TABLE_NAME, new Locator() {
+
+      @Override
+      public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
+          throws Exception {
+        LOCATOR.updateCachedLocationOnError(loc, error);
+      }
+
+      @Override
+      public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
+          throws Exception {
+        return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
+          RegionLocateType.CURRENT, reload).get();
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index c6624e7..8cdb4a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
@@ -59,7 +60,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
+    HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
@@ -124,10 +125,10 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-        registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), User.getCurrent());
     LOCATOR = new AsyncNonMetaRegionLocator(CONN);
     SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
-        .toArray(byte[][]::new);
+      .toArray(byte[][]::new);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
   }
@@ -138,11 +139,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  private void assertLocs(List<CompletableFuture<HRegionLocation>> futures)
+  private void assertLocs(List<CompletableFuture<RegionLocations>> futures)
       throws InterruptedException, ExecutionException {
     assertEquals(256, futures.size());
     for (int i = 0; i < futures.size(); i++) {
-      HRegionLocation loc = futures.get(i).get();
+      HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation();
       if (i == 0) {
         assertTrue(isEmptyStartRow(loc.getRegion().getStartKey()));
       } else {
@@ -158,10 +159,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
 
   @Test
   public void test() throws InterruptedException, ExecutionException {
-    List<CompletableFuture<HRegionLocation>> futures =
-        IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
-            .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
-            .collect(toList());
+    List<CompletableFuture<RegionLocations>> futures =
+      IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
+        .map(r -> LOCATOR.getRegionLocations(TABLE_NAME, r, RegionReplicaUtil.DEFAULT_REPLICA_ID,
+          RegionLocateType.CURRENT, false))
+        .collect(toList());
     assertLocs(futures);
     assertTrue("max allowed is " + MAX_ALLOWED + " but actual is " + MAX_CONCURRENCY.get(),
       MAX_CONCURRENCY.get() <= MAX_ALLOWED);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index a6c2efb..7d8956b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -49,7 +49,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
+    HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
@@ -73,7 +73,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-        registry.getClusterId().get(), User.getCurrent());
+      registry.getClusterId().get(), User.getCurrent());
   }
 
   @AfterClass
@@ -89,8 +89,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName());
     TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), Bytes.toBytes(
       TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName()));
-    AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME)
-        .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(30).build();
+    AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
+      .setMaxRetries(30).build();
     table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
 
     // move back
@@ -110,8 +110,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
   public void testMaxRetries() throws IOException, InterruptedException {
     try {
       CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
-          .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
-          .action((controller, loc, stub) -> failedFuture()).call().get();
+        .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
+        .action((controller, loc, stub) -> failedFuture()).call().get();
       fail();
     } catch (ExecutionException e) {
       assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
@@ -123,8 +123,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     long startNs = System.nanoTime();
     try {
       CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS)
-          .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
-          .action((controller, loc, stub) -> failedFuture()).call().get();
+        .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
+        .action((controller, loc, stub) -> failedFuture()).call().get();
       fail();
     } catch (ExecutionException e) {
       e.printStackTrace();
@@ -141,30 +141,30 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     AtomicInteger count = new AtomicInteger(0);
     HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
     AsyncRegionLocator mockedLocator =
-        new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
-          @Override
-          CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
-              RegionLocateType locateType, long timeoutNs) {
-            if (tableName.equals(TABLE_NAME)) {
-              CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
-              if (count.getAndIncrement() == 0) {
-                errorTriggered.set(true);
-                future.completeExceptionally(new RuntimeException("Inject error!"));
-              } else {
-                future.complete(loc);
-              }
-              return future;
+      new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
+        @Override
+        CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+            int replicaId, RegionLocateType locateType, long timeoutNs) {
+          if (tableName.equals(TABLE_NAME)) {
+            CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+            if (count.getAndIncrement() == 0) {
+              errorTriggered.set(true);
+              future.completeExceptionally(new RuntimeException("Inject error!"));
             } else {
-              return super.getRegionLocation(tableName, row, locateType, timeoutNs);
+              future.complete(loc);
             }
+            return future;
+          } else {
+            return super.getRegionLocation(tableName, row, replicaId, locateType, timeoutNs);
           }
+        }
 
-          @Override
-          void updateCachedLocation(HRegionLocation loc, Throwable exception) {
-          }
-        };
+        @Override
+        void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+        }
+      };
     try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
-        CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
+      CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
 
       @Override
       AsyncRegionLocator getLocator() {
@@ -172,7 +172,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
       }
     }) {
       AsyncTable<?> table = mockedConn.getTableBuilder(TABLE_NAME)
-          .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
+        .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
       table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
       assertTrue(errorTriggered.get());
       errorTriggered.set(false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
index 13d8000..6c6bb98 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
@@ -69,8 +69,8 @@ public class TestAsyncTableLocatePrefetch {
 
   @Test
   public void test() throws InterruptedException, ExecutionException {
-    assertNotNull(LOCATOR
-      .getRegionLocation(TABLE_NAME, Bytes.toBytes("zzz"), RegionLocateType.CURRENT, false).get());
+    assertNotNull(LOCATOR.getRegionLocations(TABLE_NAME, Bytes.toBytes("zzz"),
+      RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get());
     // we finish the request before we adding the remaining results to cache so sleep a bit here
     Thread.sleep(1000);
     // confirm that the locations of all the regions have been cached.

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
new file mode 100644
index 0000000..0445a0e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableRegionReplicasGet {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasGet.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] QUALIFIER = Bytes.toBytes("cq");
+
+  private static byte[] ROW = Bytes.toBytes("row");
+
+  private static byte[] VALUE = Bytes.toBytes("value");
+
+  private static AsyncConnection ASYNC_CONN;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Parameter
+  public Supplier<AsyncTable<?>> getTable;
+
+  private static AsyncTable<?> getRawTable() {
+    return ASYNC_CONN.getTable(TABLE_NAME);
+  }
+
+  private static AsyncTable<?> getTable() {
+    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
+  }
+
+  @Parameters
+  public static List<Object[]> params() {
+    return Arrays.asList(new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getRawTable },
+      new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getTable });
+  }
+
+  private static volatile boolean FAIL_PRIMARY_GET = false;
+
+  private static AtomicInteger PRIMARY_GET_COUNT = new AtomicInteger(0);
+
+  private static AtomicInteger SECONDARY_GET_COUNT = new AtomicInteger(0);
+
+  public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
+        List<Cell> result) throws IOException {
+      RegionInfo region = c.getEnvironment().getRegionInfo();
+      if (!region.getTable().equals(TABLE_NAME)) {
+        return;
+      }
+      if (region.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
+        SECONDARY_GET_COUNT.incrementAndGet();
+      } else {
+        PRIMARY_GET_COUNT.incrementAndGet();
+        if (FAIL_PRIMARY_GET) {
+          throw new IOException("Inject error");
+        }
+      }
+    }
+  }
+
+  private static boolean allReplicasHaveRow() throws IOException {
+    for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+      for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
+        if (region.get(new Get(ROW), false).isEmpty()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // 10 mins
+    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+      TimeUnit.MINUTES.toMillis(10));
+    // 1 second
+    TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND,
+      TimeUnit.SECONDS.toMicros(1));
+    // set a small pause so we will retry very quickly
+    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
+    // infinite retry
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
+    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.getAdmin()
+      .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3)
+        .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
+    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+    AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
+    table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
+    // this is the fastest way to let all replicas have the row
+    TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
+    TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
+    TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    IOUtils.closeQuietly(ASYNC_CONN);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testNoReplicaRead() throws Exception {
+    FAIL_PRIMARY_GET = false;
+    SECONDARY_GET_COUNT.set(0);
+    AsyncTable<?> table = getTable.get();
+    Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
+    for (int i = 0; i < 1000; i++) {
+      assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+    }
+    // the primary region is fine and the primary timeout is 1 second which is long enough, so we
+    // should not send any requests to secondary replicas even if the consistency is timeline.
+    Thread.sleep(5000);
+    assertEquals(0, SECONDARY_GET_COUNT.get());
+  }
+
+  @Test
+  public void testReplicaRead() throws Exception {
+    // fail the primary get request
+    FAIL_PRIMARY_GET = true;
+    Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
+    // make sure that we could still get the value from secondary replicas
+    AsyncTable<?> table = getTable.get();
+    assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+    // make sure that the primary request has been canceled
+    Thread.sleep(5000);
+    int count = PRIMARY_GET_COUNT.get();
+    Thread.sleep(10000);
+    assertEquals(count, PRIMARY_GET_COUNT.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
index db7546f..46890d0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertNotSame;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.IntStream;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
@@ -52,43 +50,13 @@ public class TestZKAsyncRegistry {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestZKAsyncRegistry.class);
+    HBaseClassTestRule.forClass(TestZKAsyncRegistry.class);
 
-  private static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class);
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class);
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
   private static ZKAsyncRegistry REGISTRY;
 
-  // waits for all replicas to have region location
-  static void waitUntilAllReplicasHavingRegionLocation(TableName tbl) throws IOException {
-    TEST_UTIL.waitFor(
-      TEST_UTIL.getConfiguration().getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
-      new ExplainingPredicate<IOException>() {
-        @Override
-        public String explainFailure() throws IOException {
-          return TEST_UTIL.explainTableAvailability(tbl);
-        }
-
-        @Override
-        public boolean evaluate() throws IOException {
-          AtomicBoolean ready = new AtomicBoolean(true);
-          try {
-            RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
-            assertEquals(3, locs.getRegionLocations().length);
-            IntStream.range(0, 3).forEach(i -> {
-              HRegionLocation loc = locs.getRegionLocation(i);
-              if (loc == null) {
-                ready.set(false);
-              }
-            });
-          } catch (Exception e) {
-            ready.set(false);
-          }
-          return ready.get();
-        }
-      });
-  }
-
   @BeforeClass
   public static void setUp() throws Exception {
     TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
@@ -107,14 +75,14 @@ public class TestZKAsyncRegistry {
     LOG.info("STARTED TEST");
     String clusterId = REGISTRY.getClusterId().get();
     String expectedClusterId = TEST_UTIL.getHBaseCluster().getMaster().getClusterId();
-    assertEquals("Expected " + expectedClusterId + ", found=" + clusterId,
-        expectedClusterId, clusterId);
+    assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, expectedClusterId,
+      clusterId);
     assertEquals(TEST_UTIL.getHBaseCluster().getClusterMetrics().getLiveServerMetrics().size(),
       REGISTRY.getCurrentNrHRS().get().intValue());
     assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
       REGISTRY.getMasterAddress().get());
     assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue());
-    waitUntilAllReplicasHavingRegionLocation(TableName.META_TABLE_NAME);
+    RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3);
     RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
     assertEquals(3, locs.getRegionLocations().length);
     IntStream.range(0, 3).forEach(i -> {


[2/4] hbase git commit: HBASE-17356 Add replica get support

Posted by zh...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index a826f8c..579d547 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcChannel;
@@ -484,23 +485,23 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
     CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
-    this.<List<TableSchema>> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
-                controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s,
-                    c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp
-                    .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> {
-          if (error != null) {
-            future.completeExceptionally(error);
-            return;
-          }
-          if (!tableSchemas.isEmpty()) {
-            future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
-          } else {
-            future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
-          }
-        });
+    addListener(this.<List<TableSchema>> newMasterCaller()
+      .action((controller, stub) -> this
+        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
+          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
+          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
+          (resp) -> resp.getTableSchemaList()))
+      .call(), (tableSchemas, error) -> {
+        if (error != null) {
+          future.completeExceptionally(error);
+          return;
+        }
+        if (!tableSchemas.isEmpty()) {
+          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
+        } else {
+          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
+        }
+      });
     return future;
   }
 
@@ -583,7 +584,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
     CompletableFuture<Boolean> future = new CompletableFuture<>();
-    AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
         return;
@@ -600,7 +601,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
     CompletableFuture<Boolean> future = new CompletableFuture<>();
-    AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
         return;
@@ -629,40 +630,37 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   private CompletableFuture<Boolean> isTableAvailable(TableName tableName,
       Optional<byte[][]> splitKeys) {
     CompletableFuture<Boolean> future = new CompletableFuture<>();
-    isTableEnabled(tableName).whenComplete(
-      (enabled, error) -> {
-        if (error != null) {
-          future.completeExceptionally(error);
-          return;
-        }
-        if (!enabled) {
-          future.complete(false);
-        } else {
-          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
-              .whenComplete(
-                (locations, error1) -> {
-                  if (error1 != null) {
-                    future.completeExceptionally(error1);
-                    return;
-                  }
-                  List<HRegionLocation> notDeployedRegions =
-                      locations.stream().filter(loc -> loc.getServerName() == null)
-                          .collect(Collectors.toList());
-                  if (notDeployedRegions.size() > 0) {
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug("Table " + tableName + " has " + notDeployedRegions.size()
-                          + " regions");
-                    }
-                    future.complete(false);
-                    return;
-                  }
+    addListener(isTableEnabled(tableName), (enabled, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      if (!enabled) {
+        future.complete(false);
+      } else {
+        addListener(
+          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)),
+          (locations, error1) -> {
+            if (error1 != null) {
+              future.completeExceptionally(error1);
+              return;
+            }
+            List<HRegionLocation> notDeployedRegions = locations.stream()
+              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
+            if (notDeployedRegions.size() > 0) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
+              }
+              future.complete(false);
+              return;
+            }
 
-                  Optional<Boolean> available =
-                      splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
-                  future.complete(available.orElse(true));
-                });
-        }
-      });
+            Optional<Boolean> available =
+              splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
+            future.complete(available.orElse(true));
+          });
+      }
+    });
     return future;
   }
 
@@ -784,20 +782,20 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Void> flush(TableName tableName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    tableExists(tableName).whenComplete((exists, err) -> {
+    addListener(tableExists(tableName), (exists, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
       } else if (!exists) {
         future.completeExceptionally(new TableNotFoundException(tableName));
       } else {
-        isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> {
+        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
           if (err2 != null) {
             future.completeExceptionally(err2);
           } else if (!tableEnabled) {
             future.completeExceptionally(new TableNotEnabledException(tableName));
           } else {
-            execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
-              new HashMap<>()).whenComplete((ret, err3) -> {
+            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
+              new HashMap<>()), (ret, err3) -> {
                 if (err3 != null) {
                   future.completeExceptionally(err3);
                 } else {
@@ -814,27 +812,25 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Void> flushRegion(byte[] regionName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
+    addListener(getRegionLocation(regionName), (location, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      ServerName serverName = location.getServerName();
+      if (serverName == null) {
+        future
+          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+        return;
+      }
+      addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
+        if (err2 != null) {
+          future.completeExceptionally(err2);
+        } else {
+          future.complete(ret);
         }
-        flush(serverName, location.getRegion())
-          .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-          });
       });
+    });
     return future;
   }
 
@@ -852,7 +848,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Void> flushRegionServer(ServerName sn) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegions(sn).whenComplete((hRegionInfos, err) -> {
+    addListener(getRegions(sn), (hRegionInfos, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
         return;
@@ -861,9 +857,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       if (hRegionInfos != null) {
         hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
       }
-      CompletableFuture
-        .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
-        .whenComplete((ret, err2) -> {
+      addListener(CompletableFuture.allOf(
+        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
           if (err2 != null) {
             future.completeExceptionally(err2);
           } else {
@@ -936,7 +931,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegions(sn).whenComplete((hRegionInfos, err) -> {
+    addListener(getRegions(sn), (hRegionInfos, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
         return;
@@ -945,15 +940,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       if (hRegionInfos != null) {
         hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
       }
-      CompletableFuture
-          .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
-          .whenComplete((ret, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-            } else {
-              future.complete(ret);
-            }
-          });
+      addListener(CompletableFuture.allOf(
+        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
     });
     return future;
   }
@@ -961,28 +955,26 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
       boolean major) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
-        }
-        compact(location.getServerName(), location.getRegion(), major, columnFamily)
-            .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
+    addListener(getRegionLocation(regionName), (location, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      ServerName serverName = location.getServerName();
+      if (serverName == null) {
+        future
+          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+        return;
+      }
+      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
+        (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
+    });
     return future;
   }
 
@@ -994,19 +986,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
       // For meta table, we use zk to fetch all locations.
       AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
-      registry.getMetaRegionLocation().whenComplete(
-        (metaRegions, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-          } else if (metaRegions == null || metaRegions.isEmpty()
-              || metaRegions.getDefaultRegionLocation() == null) {
-            future.completeExceptionally(new IOException("meta region does not found"));
-          } else {
-            future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
-          }
-          // close the registry.
-          IOUtils.closeQuietly(registry);
-        });
+      addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+        } else if (metaRegions == null || metaRegions.isEmpty() ||
+          metaRegions.getDefaultRegionLocation() == null) {
+          future.completeExceptionally(new IOException("meta region does not found"));
+        } else {
+          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
+        }
+        // close the registry.
+        IOUtils.closeQuietly(registry);
+      });
       return future;
     } else {
       // For non-meta table, we fetch all locations by scanning hbase:meta table
@@ -1017,40 +1008,40 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   /**
    * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
    */
-  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
-      boolean major, CompactType compactType) {
+  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
+      CompactType compactType) {
     CompletableFuture<Void> future = new CompletableFuture<>();
 
     switch (compactType) {
       case MOB:
-        connection.registry.getMasterAddress().whenComplete((serverName, err) -> {
+        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
           if (err != null) {
             future.completeExceptionally(err);
             return;
           }
           RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
-          compact(serverName, regionInfo, major, columnFamily)
-              .whenComplete((ret, err2) -> {
-                if (err2 != null) {
-                  future.completeExceptionally(err2);
-                } else {
-                  future.complete(ret);
-                }
-              });
+          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+            } else {
+              future.complete(ret);
+            }
+          });
         });
         break;
       case NORMAL:
-        getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
+        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
           if (err != null) {
             future.completeExceptionally(err);
             return;
           }
-          CompletableFuture<?>[] compactFutures = locations.stream().filter(l -> l.getRegion() != null)
+          CompletableFuture<?>[] compactFutures =
+            locations.stream().filter(l -> l.getRegion() != null)
               .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
               .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
               .toArray(CompletableFuture<?>[]::new);
           // future complete unless all of the compact futures are completed.
-          CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> {
+          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
             if (err2 != null) {
               future.completeExceptionally(err2);
             } else {
@@ -1091,29 +1082,28 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
       CompletableFuture<TableName> result) {
-    getRegionLocation(encodeRegionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          result.completeExceptionally(err);
-          return;
-        }
-        RegionInfo regionInfo = location.getRegion();
-        if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
-          result.completeExceptionally(new IllegalArgumentException(
-              "Can't invoke merge on non-default regions directly"));
-          return;
-        }
-        if (!tableName.compareAndSet(null, regionInfo.getTable())) {
-          if (!tableName.get().equals(regionInfo.getTable())) {
-            // tables of this two region should be same.
-            result.completeExceptionally(new IllegalArgumentException(
-                "Cannot merge regions from two different tables " + tableName.get() + " and "
-                    + regionInfo.getTable()));
-          } else {
-            result.complete(tableName.get());
-          }
+    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
+      if (err != null) {
+        result.completeExceptionally(err);
+        return;
+      }
+      RegionInfo regionInfo = location.getRegion();
+      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+        result.completeExceptionally(
+          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
+        return;
+      }
+      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
+        if (!tableName.get().equals(regionInfo.getTable())) {
+          // tables of this two region should be same.
+          result.completeExceptionally(
+            new IllegalArgumentException("Cannot merge regions from two different tables " +
+              tableName.get() + " and " + regionInfo.getTable()));
+        } else {
+          result.complete(tableName.get());
         }
-      });
+      }
+    });
   }
 
   private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
@@ -1178,41 +1168,42 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
     final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
 
-    checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB)
-        .whenComplete((tableName, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-            return;
-          }
+    addListener(checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB),
+      (tableName, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
 
-          MergeTableRegionsRequest request = null;
-          try {
-            request = RequestConverter.buildMergeTableRegionsRequest(
-              new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
-              ng.newNonce());
-          } catch (DeserializationException e) {
-            future.completeExceptionally(e);
-            return;
-          }
+        MergeTableRegionsRequest request = null;
+        try {
+          request = RequestConverter.buildMergeTableRegionsRequest(
+            new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
+            ng.newNonce());
+        } catch (DeserializationException e) {
+          future.completeExceptionally(e);
+          return;
+        }
 
+        addListener(
           this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
             (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
-            new MergeTableRegionProcedureBiConsumer(tableName)).whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-
-        });
+            new MergeTableRegionProcedureBiConsumer(tableName)),
+          (ret, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+            } else {
+              future.complete(ret);
+            }
+          });
+      });
     return future;
   }
 
   @Override
   public CompletableFuture<Void> split(TableName tableName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    tableExists(tableName).whenComplete((exist, error) -> {
+    addListener(tableExists(tableName), (exist, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
         return;
@@ -1221,45 +1212,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         future.completeExceptionally(new TableNotFoundException(tableName));
         return;
       }
-      metaTable
+      addListener(
+        metaTable
           .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
-              .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
-              .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION)))
-          .whenComplete((results, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-              return;
-            }
-            if (results != null && !results.isEmpty()) {
-              List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
-              for (Result r : results) {
-                if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) continue;
-                RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
-                if (rl != null) {
-                  for (HRegionLocation h : rl.getRegionLocations()) {
-                    if (h != null && h.getServerName() != null) {
-                      RegionInfo hri = h.getRegion();
-                      if (hri == null || hri.isSplitParent()
-                          || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID)
-                        continue;
-                      splitFutures.add(split(hri, null));
+            .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
+            .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
+        (results, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+            return;
+          }
+          if (results != null && !results.isEmpty()) {
+            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
+            for (Result r : results) {
+              if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
+                continue;
+              }
+              RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
+              if (rl != null) {
+                for (HRegionLocation h : rl.getRegionLocations()) {
+                  if (h != null && h.getServerName() != null) {
+                    RegionInfo hri = h.getRegion();
+                    if (hri == null || hri.isSplitParent() ||
+                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+                      continue;
                     }
+                    splitFutures.add(split(hri, null));
                   }
                 }
               }
-              CompletableFuture
-                  .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()]))
-                  .whenComplete((ret, exception) -> {
-                    if (exception != null) {
-                      future.completeExceptionally(exception);
-                      return;
-                    }
-                    future.complete(ret);
-                  });
-            } else {
-              future.complete(null);
             }
-          });
+            addListener(
+              CompletableFuture
+                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
+              (ret, exception) -> {
+                if (exception != null) {
+                  future.completeExceptionally(exception);
+                  return;
+                }
+                future.complete(ret);
+              });
+          } else {
+            future.complete(null);
+          }
+        });
     });
     return future;
   }
@@ -1270,54 +1266,52 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     if (splitPoint == null) {
       return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
     }
-    connection.getRegionLocator(tableName).getRegionLocation(splitPoint)
-        .whenComplete((loc, err) -> {
-          if (err != null) {
-            result.completeExceptionally(err);
-          } else if (loc == null || loc.getRegion() == null) {
-            result.completeExceptionally(new IllegalArgumentException(
-                "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
-          } else {
-            splitRegion(loc.getRegion().getRegionName(), splitPoint)
-                .whenComplete((ret, err2) -> {
-                  if (err2 != null) {
-                    result.completeExceptionally(err2);
-                  } else {
-                    result.complete(ret);
-                  }
+    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint),
+      (loc, err) -> {
+        if (err != null) {
+          result.completeExceptionally(err);
+        } else if (loc == null || loc.getRegion() == null) {
+          result.completeExceptionally(new IllegalArgumentException(
+            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
+        } else {
+          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
+            if (err2 != null) {
+              result.completeExceptionally(err2);
+            } else {
+              result.complete(ret);
+            }
 
-                });
-          }
-        });
+          });
+        }
+      });
     return result;
   }
 
   @Override
   public CompletableFuture<Void> splitRegion(byte[] regionName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        RegionInfo regionInfo = location.getRegion();
-        if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
-          future.completeExceptionally(new IllegalArgumentException(
-              "Can't split replicas directly. "
-                  + "Replicas are auto-split when their primary is split."));
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
+    addListener(getRegionLocation(regionName), (location, err) -> {
+      RegionInfo regionInfo = location.getRegion();
+      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+        future
+          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
+            "Replicas are auto-split when their primary is split."));
+        return;
+      }
+      ServerName serverName = location.getServerName();
+      if (serverName == null) {
+        future
+          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+        return;
+      }
+      addListener(split(regionInfo, null), (ret, err2) -> {
+        if (err2 != null) {
+          future.completeExceptionally(err2);
+        } else {
+          future.complete(ret);
         }
-        split(regionInfo, null).whenComplete((ret, err2) -> {
-          if (err2 != null) {
-            future.completeExceptionally(err2);
-          } else {
-            future.complete(ret);
-          }
-        });
       });
+    });
     return future;
   }
 
@@ -1326,35 +1320,34 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     Preconditions.checkNotNull(splitPoint,
       "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        RegionInfo regionInfo = location.getRegion();
-        if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
-          future.completeExceptionally(new IllegalArgumentException(
-              "Can't split replicas directly. "
-                  + "Replicas are auto-split when their primary is split."));
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
-        }
-        if (regionInfo.getStartKey() != null
-            && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
-          future.completeExceptionally(new IllegalArgumentException(
-              "should not give a splitkey which equals to startkey!"));
-          return;
+    addListener(getRegionLocation(regionName), (location, err) -> {
+      RegionInfo regionInfo = location.getRegion();
+      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+        future
+          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
+            "Replicas are auto-split when their primary is split."));
+        return;
+      }
+      ServerName serverName = location.getServerName();
+      if (serverName == null) {
+        future
+          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+        return;
+      }
+      if (regionInfo.getStartKey() != null &&
+        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
+        future.completeExceptionally(
+          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
+        return;
+      }
+      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
+        if (err2 != null) {
+          future.completeExceptionally(err2);
+        } else {
+          future.complete(ret);
         }
-        split(regionInfo, splitPoint).whenComplete((ret, err2) -> {
-          if (err2 != null) {
-            future.completeExceptionally(err2);
-          } else {
-            future.complete(ret);
-          }
-        });
       });
+    });
     return future;
   }
 
@@ -1363,121 +1356,119 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     TableName tableName = hri.getTable();
     SplitTableRegionRequest request = null;
     try {
-      request = RequestConverter
-          .buildSplitTableRegionRequest(hri, splitPoint,
-              ng.getNonceGroup(), ng.newNonce());
+      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
+        ng.newNonce());
     } catch (DeserializationException e) {
       future.completeExceptionally(e);
       return future;
     }
 
-    this.<SplitTableRegionRequest, SplitTableRegionResponse>procedureCall(request,
-        (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
-        new SplitTableRegionProcedureBiConsumer(tableName)).whenComplete((ret, err2) -> {
-      if (err2 != null) {
-        future.completeExceptionally(err2);
-      } else {
-        future.complete(ret);
-      }
-    });
+    addListener(this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(request,
+      (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
+      new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
+        if (err2 != null) {
+          future.completeExceptionally(err2);
+        } else {
+          future.complete(ret);
+        }
+      });
     return future;
   }
 
   @Override
   public CompletableFuture<Void> assign(byte[] regionName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        this.<Void> newMasterCaller()
-            .action(
-              ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
-                controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo
-                    .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done),
-                resp -> null))).call().whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
+    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      addListener(this.<Void> newMasterCaller()
+        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
+          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
+          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
+        .call(), (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
+    });
     return future;
   }
 
   @Override
   public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
+    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      addListener(
         this.<Void> newMasterCaller()
-            .action(
-              ((controller, stub) -> this
-                  .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
-                    RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
-                    (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call()
-            .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
+          .action(((controller, stub) -> this
+            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
+              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
+              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
+          .call(),
+        (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
+    });
     return future;
   }
 
   @Override
   public CompletableFuture<Void> offline(byte[] regionName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
+    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      addListener(
         this.<Void> newMasterCaller()
-            .action(
-              ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
-                controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo
-                    .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done),
-                resp -> null))).call().whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
+          .action(((controller, stub) -> this
+            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
+              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
+              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
+          .call(),
+        (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
+    });
     return future;
   }
 
   @Override
   public CompletableFuture<Void> move(byte[] regionName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
+    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      addListener(
         moveRegion(
-          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null))
-            .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
+          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
+        (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
+    });
     return future;
   }
 
@@ -1486,20 +1477,20 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     Preconditions.checkNotNull(destServerName,
       "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete((regionInfo, err) -> {
+    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
         return;
       }
-      moveRegion(
-        RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName))
-            .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
+      addListener(moveRegion(RequestConverter
+        .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
+        (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
     });
     return future;
   }
@@ -1634,11 +1625,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
 
     CompletableFuture<Void> future = new CompletableFuture<Void>();
-    getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
+    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
       if (!completeExceptionally(future, error)) {
         ReplicationPeerConfig newPeerConfig =
-            ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
-        updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> {
+          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
+        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
           if (!completeExceptionally(future, error)) {
             future.complete(result);
           }
@@ -1656,24 +1647,23 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
 
     CompletableFuture<Void> future = new CompletableFuture<Void>();
-    getReplicationPeerConfig(id).whenComplete(
-      (peerConfig, error) -> {
-        if (!completeExceptionally(future, error)) {
-          ReplicationPeerConfig newPeerConfig = null;
-          try {
-            newPeerConfig = ReplicationPeerConfigUtil
-                .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
-          } catch (ReplicationException e) {
-            future.completeExceptionally(e);
-            return;
-          }
-          updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> {
-            if (!completeExceptionally(future, error)) {
-              future.complete(result);
-            }
-          });
+    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
+      if (!completeExceptionally(future, error)) {
+        ReplicationPeerConfig newPeerConfig = null;
+        try {
+          newPeerConfig = ReplicationPeerConfigUtil
+            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
+        } catch (ReplicationException e) {
+          future.completeExceptionally(e);
+          return;
         }
-      });
+        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
+          if (!completeExceptionally(future, error)) {
+            future.complete(result);
+          }
+        });
+      }
+    });
     return future;
   }
 
@@ -1708,31 +1698,30 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
     CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
-    listTableDescriptors().whenComplete(
-      (tables, error) -> {
-        if (!completeExceptionally(future, error)) {
-          List<TableCFs> replicatedTableCFs = new ArrayList<>();
-          tables.forEach(table -> {
-            Map<String, Integer> cfs = new HashMap<>();
-            Stream.of(table.getColumnFamilies())
-                .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
-                .forEach(column -> {
-                  cfs.put(column.getNameAsString(), column.getScope());
-                });
-            if (!cfs.isEmpty()) {
-              replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
-            }
-          });
-          future.complete(replicatedTableCFs);
-        }
-      });
+    addListener(listTableDescriptors(), (tables, error) -> {
+      if (!completeExceptionally(future, error)) {
+        List<TableCFs> replicatedTableCFs = new ArrayList<>();
+        tables.forEach(table -> {
+          Map<String, Integer> cfs = new HashMap<>();
+          Stream.of(table.getColumnFamilies())
+            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
+            .forEach(column -> {
+              cfs.put(column.getNameAsString(), column.getScope());
+            });
+          if (!cfs.isEmpty()) {
+            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
+          }
+        });
+        future.complete(replicatedTableCFs);
+      }
+    });
     return future;
   }
 
   @Override
   public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
-    SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil
-        .createHBaseProtosSnapshotDesc(snapshotDesc);
+    SnapshotProtos.SnapshotDescription snapshot =
+      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
     try {
       ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
     } catch (IllegalArgumentException e) {
@@ -1740,47 +1729,47 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
     CompletableFuture<Void> future = new CompletableFuture<>();
     final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
-    this.<Long> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
-            stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
-            resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-            return;
-          }
-          TimerTask pollingTask = new TimerTask() {
-            int tries = 0;
-            long startTime = EnvironmentEdgeManager.currentTime();
-            long endTime = startTime + expectedTimeout;
-            long maxPauseTime = expectedTimeout / maxAttempts;
-
-            @Override
-            public void run(Timeout timeout) throws Exception {
-              if (EnvironmentEdgeManager.currentTime() < endTime) {
-                isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> {
-                  if (err2 != null) {
-                    future.completeExceptionally(err2);
-                  } else if (done) {
-                    future.complete(null);
-                  } else {
-                    // retry again after pauseTime.
-                  long pauseTime = ConnectionUtils.getPauseTime(
-                    TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+    addListener(this.<Long> newMasterCaller()
+      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
+        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
+        resp -> resp.getExpectedTimeout()))
+      .call(), (expectedTimeout, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        TimerTask pollingTask = new TimerTask() {
+          int tries = 0;
+          long startTime = EnvironmentEdgeManager.currentTime();
+          long endTime = startTime + expectedTimeout;
+          long maxPauseTime = expectedTimeout / maxAttempts;
+
+          @Override
+          public void run(Timeout timeout) throws Exception {
+            if (EnvironmentEdgeManager.currentTime() < endTime) {
+              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
+                if (err2 != null) {
+                  future.completeExceptionally(err2);
+                } else if (done) {
+                  future.complete(null);
+                } else {
+                  // retry again after pauseTime.
+                  long pauseTime =
+                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
                   pauseTime = Math.min(pauseTime, maxPauseTime);
-                  AsyncConnectionImpl.RETRY_TIMER
-                      .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
+                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
+                    TimeUnit.MILLISECONDS);
                 }
-              } );
-              } else {
-                future.completeExceptionally(new SnapshotCreationException("Snapshot '"
-                    + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout
-                    + " ms", snapshotDesc));
-              }
+              });
+            } else {
+              future.completeExceptionally(
+                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
+                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
             }
-          };
-          AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
-        });
+          }
+        };
+        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+      });
     return future;
   }
 
@@ -1806,52 +1795,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) {
+  public CompletableFuture<Void> restoreSnapshot(String snapshotName,
+      boolean takeFailSafeSnapshot) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    listSnapshots(Pattern.compile(snapshotName)).whenComplete(
-      (snapshotDescriptions, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        TableName tableName = null;
-        if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
-          for (SnapshotDescription snap : snapshotDescriptions) {
-            if (snap.getName().equals(snapshotName)) {
-              tableName = snap.getTableName();
-              break;
-            }
+    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      TableName tableName = null;
+      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
+        for (SnapshotDescription snap : snapshotDescriptions) {
+          if (snap.getName().equals(snapshotName)) {
+            tableName = snap.getTableName();
+            break;
           }
         }
-        if (tableName == null) {
-          future.completeExceptionally(new RestoreSnapshotException(
-              "Unable to find the table name for snapshot=" + snapshotName));
-          return;
-        }
-        final TableName finalTableName = tableName;
-        tableExists(finalTableName)
-            .whenComplete((exists, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else if (!exists) {
-                // if table does not exist, then just clone snapshot into new table.
-              completeConditionalOnFuture(future,
-                internalRestoreSnapshot(snapshotName, finalTableName));
+      }
+      if (tableName == null) {
+        future.completeExceptionally(new RestoreSnapshotException(
+          "Unable to find the table name for snapshot=" + snapshotName));
+        return;
+      }
+      final TableName finalTableName = tableName;
+      addListener(tableExists(finalTableName), (exists, err2) -> {
+        if (err2 != null) {
+          future.completeExceptionally(err2);
+        } else if (!exists) {
+          // if table does not exist, then just clone snapshot into new table.
+          completeConditionalOnFuture(future,
+            internalRestoreSnapshot(snapshotName, finalTableName));
+        } else {
+          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
+            if (err4 != null) {
+              future.completeExceptionally(err4);
+            } else if (!disabled) {
+              future.completeExceptionally(new TableNotDisabledException(finalTableName));
             } else {
-              isTableDisabled(finalTableName).whenComplete(
-                (disabled, err4) -> {
-                  if (err4 != null) {
-                    future.completeExceptionally(err4);
-                  } else if (!disabled) {
-                    future.completeExceptionally(new TableNotDisabledException(finalTableName));
-                  } else {
-                    completeConditionalOnFuture(future,
-                      restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
-                  }
-                });
+              completeConditionalOnFuture(future,
+                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
             }
-          } );
+          });
+        }
       });
+    });
     return future;
   }
 
@@ -1860,49 +1847,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     if (takeFailSafeSnapshot) {
       CompletableFuture<Void> future = new CompletableFuture<>();
       // Step.1 Take a snapshot of the current state
-      String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get(
-        HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
-        HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
-      final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat
-          .replace("{snapshot.name}", snapshotName)
+      String failSafeSnapshotSnapshotNameFormat =
+        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
+          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
+      final String failSafeSnapshotSnapshotName =
+        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
           .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
           .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
       LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
-      snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> {
+      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
         if (err != null) {
           future.completeExceptionally(err);
         } else {
           // Step.2 Restore snapshot
-        internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> {
-          if (err2 != null) {
-            // Step.3.a Something went wrong during the restore and try to rollback.
-          internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete(
-            (void3, err3) -> {
-              if (err3 != null) {
-                future.completeExceptionally(err3);
-              } else {
-                String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
-                    + failSafeSnapshotSnapshotName + " succeeded.";
-                future.completeExceptionally(new RestoreSnapshotException(msg));
-              }
-            });
-        } else {
-          // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
-          LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
-          deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete(
-            (ret3, err3) -> {
-              if (err3 != null) {
-                LOG.error(
-                  "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3);
-                future.completeExceptionally(err3);
-              } else {
-                future.complete(ret3);
-              }
-            });
+          addListener(internalRestoreSnapshot(snapshotName, tableName), (void2, err2) -> {
+            if (err2 != null) {
+              // Step.3.a Something went wrong during the restore and try to rollback.
+              addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName),
+                (void3, err3) -> {
+                  if (err3 != null) {
+                    future.completeExceptionally(err3);
+                  } else {
+                    String msg =
+                      "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
+                        failSafeSnapshotSnapshotName + " succeeded.";
+                    future.completeExceptionally(new RestoreSnapshotException(msg));
+                  }
+                });
+            } else {
+              // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
+              LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
+              addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
+                if (err3 != null) {
+                  LOG.error(
+                    "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
+                    err3);
+                  future.completeExceptionally(err3);
+                } else {
+                  future.complete(ret3);
+                }
+              });
+            }
+          });
         }
-      } );
-      }
-    } );
+      });
       return future;
     } else {
       return internalRestoreSnapshot(snapshotName, tableName);
@@ -1911,7 +1899,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
       CompletableFuture<T> parentFuture) {
-    parentFuture.whenComplete((res, err) -> {
+    addListener(parentFuture, (res, err) -> {
       if (err != null) {
         dependentFuture.completeExceptionally(err);
       } else {
@@ -1923,7 +1911,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    tableExists(tableName).whenComplete((exists, err) -> {
+    addListener(tableExists(tableName), (exists, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
       } else if (exists) {
@@ -1993,31 +1981,29 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
       Pattern tableNamePattern, Pattern snapshotNamePattern) {
     CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
-    listTableNames(tableNamePattern, false).whenComplete(
-      (tableNames, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
+    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      if (tableNames == null || tableNames.size() <= 0) {
+        future.complete(Collections.emptyList());
+        return;
+      }
+      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
+        if (err2 != null) {
+          future.completeExceptionally(err2);
           return;
         }
-        if (tableNames == null || tableNames.size() <= 0) {
+        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
           future.complete(Collections.emptyList());
           return;
         }
-        getCompletedSnapshots(snapshotNamePattern).whenComplete(
-          (snapshotDescList, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-              return;
-            }
-            if (snapshotDescList == null || snapshotDescList.isEmpty()) {
-              future.complete(Collections.emptyList());
-              return;
-            }
-            future.complete(snapshotDescList.stream()
-                .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
-                .collect(Collectors.toList()));
-          });
+        future.complete(snapshotDescList.stream()
+          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
+          .collect(Collectors.toList()));
       });
+    });
     return future;
   }
 
@@ -2064,7 +2050,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
     }
     CompletableFuture<Void> future = new CompletableFuture<>();
-    listSnapshotsFuture.whenComplete(((snapshotDescriptions, err) -> {
+    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
         return;
@@ -2073,12 +2059,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         future.complete(null);
         return;
       }
-      List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>();
-      snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures
-          .add(internalDeleteSnapshot(snapDesc)));
-      CompletableFuture.allOf(
-        deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()]))
-          .thenAccept(v -> future.complete(v));
+      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
+        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
+          if (e != null) {
+            future.completeExceptionally(e);
+          } else {
+            future.complete(v);
+          }
+        });
     }));
     return future;
   }
@@ -2100,50 +2088,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       Map<String, String> props) {
     CompletableFuture<Void> future = new CompletableFuture<>();
     ProcedureDescription procDesc =
-        ProtobufUtil.buildProcedureDescription(signature, instance, props);
-    this.<Long> newMasterCaller()
-        .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
-          controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
-          (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
-        .call().whenComplete((expectedTimeout, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-            return;
-          }
-          TimerTask pollingTask = new TimerTask() {
-            int tries = 0;
-            long startTime = EnvironmentEdgeManager.currentTime();
-            long endTime = startTime + expectedTimeout;
-            long maxPauseTime = expectedTimeout / maxAttempts;
-
-            @Override
-            public void run(Timeout timeout) throws Exception {
-              if (EnvironmentEdgeManager.currentTime() < endTime) {
-                isProcedureFinished(signature, instance, props).whenComplete((done, err2) -> {
-                  if (err2 != null) {
-                    future.completeExceptionally(err2);
-                    return;
-                  }
-                  if (done) {
-                    future.complete(null);
-                  } else {
-                    // retry again after pauseTime.
-                    long pauseTime = ConnectionUtils
-                        .getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
-                    pauseTime = Math.min(pauseTime, maxPauseTime);
-                    AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
-                      TimeUnit.MICROSECONDS);
-                  }
-                });
-              } else {
-                future.completeExceptionally(new IOException("Procedure '" + signature + " : "
-                    + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
-              }
+      ProtobufUtil.buildProcedureDescription(signature, instance, props);
+    addListener(this.<Long> newMasterCaller()
+      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
+        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
+        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
+      .call(), (expectedTimeout, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        TimerTask pollingTask = new TimerTask() {
+          int tries = 0;
+          long startTime = EnvironmentEdgeManager.currentTime();
+          long endTime = startTime + expectedTimeout;
+          long maxPauseTime = expectedTimeout / maxAttempts;
+
+          @Override
+          public void run(Timeout timeout) throws Exception {
+            if (EnvironmentEdgeManager.currentTime() < endTime) {
+              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
+                if (err2 != null) {
+                  future.completeExceptionally(err2);
+                  return;
+                }
+                if (done) {
+                  future.complete(null);
+                } else {
+                  // retry again after pauseTime.
+                  long pauseTime =
+                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+                  pauseTime = Math.min(pauseTime, maxPauseTime);
+                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
+                    TimeUnit.MICROSECONDS);
+                }
+              });
+            } else {
+              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
+                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
             }
-          };
-          // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
-          AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
-        });
+          }
+        };
+        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
+        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+      });
     return future;
   }
 
@@ -2262,15 +2250,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       }
 
       CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
-      future.whenComplete((location, err) -> {
+      addListener(future, (location, err) -> {
         if (err != null) {
           returnedFuture.completeExceptionally(err);
           return;
         }
         if (!location.isPresent() || location.get().getRegion() == null) {
-          returnedFuture.completeExceptionally(new UnknownRegionException(
-              "Invalid region name or encoded region name: "
-                  + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
+          returnedFuture.completeExceptionally(
+            new UnknownRegionException("Invalid region name or encoded region name: " +
+              Bytes.toStringBinary(regionNameOrEncodedRegionName)));
         } else {
           returnedFuture.complete(location.get());
         }
@@ -2294,14 +2282,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
 
     if (Bytes.equals(regionNameOrEncodedRegionName,
-      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
-        || Bytes.equals(regionNameOrEncodedRegionName,
+      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
+      Bytes.equals(regionNameOrEncodedRegionName,
         RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
       return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
     }
 
     CompletableFuture<RegionInfo> future = new CompletableFuture<>();
-    getRegionLocation(regionNameOrEncodedRegionName).whenComplete((location, err) -> {
+    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
       } else {
@@ -2343,7 +2331,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
+  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
 
     abstract void onFinished();
 
@@ -2359,7 +2347,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
+  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
     protected final TableName tableName;
 
     TableProcedureBiConsumer(TableName tableName) {
@@ -2384,7 +2372,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
+  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
     protected final String namespaceName;
 
     NamespaceProcedureBiConsumer(String namespaceName) {
@@ -2408,7 +2396,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
 
     CreateTableProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2420,7 +2408,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
 
     ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
       super(tableName);
@@ -2450,7 +2438,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
 
     TruncateTableProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2462,7 +2450,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
 
     EnableTableProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2474,7 +2462,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
 
     DisableTableProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2486,7 +2474,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
 
     AddColumnFamilyProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2498,7 +2486,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
 
     DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2510,7 +2498,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
 
     ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2522,7 +2510,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
+  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
 
     CreateNamespaceProcedureBiConsumer(String namespaceName) {
       super(namespaceName);
@@ -2534,7 +2522,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
+  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
 
     DeleteNamespaceProcedureBiConsumer(String namespaceName) {
       super(namespaceName);
@@ -2546,7 +2534,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
+  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
 
     ModifyNamespaceProcedureBiConsumer(String namespaceName) {
       super(namespaceName);
@@ -2558,7 +2546,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
 
     MergeTableRegionProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2570,7 +2558,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class SplitTableRegionProcedureBiConsumer extends  TableProcedureBiConsumer {
+  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
 
     SplitTableRegionProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2584,7 +2572,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    procFuture.whenComplete((procId, error) -> {
+    addListener(procFuture, (procId, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
         return;
@@ -2595,30 +2583,33 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   }
 
   private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
-    this.<GetProcedureResultResponse> newMasterCaller().action((controller, stub) -> this
-        .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
-          controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
-          (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
-        .call().whenComplete((response, error) -> {
-          if (error != null) {
-            LOG.warn("failed to get the procedure result procId={}", procId,
-              ConnectionUtils.translateException(error));
-            retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
-              ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
-            return;
-          }
-          if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
-            retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
-              ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
-            return;
-          }
-          if (response.hasException()) {
-            IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
-            future.completeExceptionally(ioe);
-          } else {
-            future.complete(null);
-          }
-        });
+    addListener(
+      this.<GetProcedureResultResponse> newMasterCaller()
+        .action((controller, stub) -> this
+          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
+            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
+            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
+        .call(),
+      (response, error) -> {
+        if (error != null) {
+          LOG.warn("failed to get the procedure result procId={}", procId,
+            ConnectionUtils.translateException(error));
+          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
+            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
+          return;
+        }
+        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
+          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
+            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
+          return;
+        }
+        if (response.hasException()) {
+          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
+          future.completeExceptionally(ioe);
+        } else {
+          future.complete(null);
+        }
+      });
   }
 
   private <T> CompletableFuture<T> failedFuture(Throwable error) {
@@ -2700,24 +2691,26 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Void> updateConfiguration() {
     CompletableFuture<Void> future = new CompletableFuture<Void>();
-    getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS))
-      .whenComplete((status, err) -> {
+    addListener(
+      getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)),
+      (status, err) -> {
         if (err != null) {
           future.completeExceptionally(err);
         } else {
           List<CompletableFuture<Void>> futures = new ArrayList<>();
           status.getLiveServerMetrics().keySet()
-              .forEach(server -> futures.add(updateConfiguration(server)));
+            .forEach(server -> futures.add(updateConfiguration(server)));
           futures.add(updateConfiguration(status.getMasterName()));
           status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
-          CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
-              .whenComplete((result, err2) -> {
-                if (err2 != null) {
-                  future.completeExceptionally(err2);
-                } else {
-                  future.complete(result);
-                }
-              });
+          addListener(
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
+            (result, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(result);
+              }
+            });
         }
       });
     return future;
@@ -2800,88 +2793,87 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
     switch (compactType) {
       case MOB:
-        connection.registry.getMasterAddress().whenComplete((serverName, err) -> {
+        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
           if (err != null) {
             future.completeExceptionally(err);
             return;
           }
           RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
 
-          this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName).action(
-            (controller, stub) -> this
-            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
+          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
+            .action((controller, stub) -> this
+              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
                 controller, stub,
                 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
-                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)
-          ).call().whenComplete((resp2, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-            } else {
-              if (resp2.hasCompactionState()) {
-                future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
+                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
+            .call(), (resp2, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
               } else {
-                future.complete(CompactionState.NONE);
+                if (resp2.hasCompactionState()) {
+                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
+                } else {
+                  future.complete(CompactionState.NONE);
+                }
               }
-            }
-          });
+            });
         });
         break;
       case NORMAL:
-        getTableHRegionLocations(tableName).whenComplete(
-          (locations, err) -> {
-            if (err != null) {
-              future.completeExceptionally(err);
-              return;
-            }
-            List<CompactionState> regionStates = new ArrayList<>();
-            List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
-            locations.stream().filter(loc -> loc.getServerName() != null)
-                .filter(loc -> loc.getRegion() != null)
-                .filter(loc -> !loc.getRegion().isOffline())
-                .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
-                  futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
-                    // If any region compaction state is MAJOR_AND_MINOR
-                    // the table compaction state is MAJOR_AND_MINOR, too.
-                    if (err2 != null) {
-                      future.completeExceptionally(err2);
-                    } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
-                      future.complete(regionState);
-                    } else {
-                      regionStates.add(regionState);
-                    }
-                  }));
-                });
-            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
-                .whenComplete((ret, err3) -> {
-                  // If future not completed, check all regions's compaction state
-                  if (!future.isCompletedExceptionally() && !future.isDone()) {
-                    CompactionState state = CompactionState.NONE;
-                    for (CompactionState regionState : regionStates) {
-                      switch (regionState) {
-                        case MAJOR:
-                          if (state == CompactionState.MINOR) {
-                            future.complete(CompactionState.MAJOR_AND_MINOR);
-                          } else {
-                            state = CompactionState.MAJOR;
-                          }
-                          break;
-                        case MINOR:
-                          if (state == CompactionState.MAJOR) {
-                            future.complete(CompactionState.MAJOR_AND_MINOR);
-                          } else {
-                            state = CompactionState.MINOR;
-                          }
-                          break;
-                        case NONE:
-                        default:
+        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
+          if (err != null) {
+            future.completeExceptionally(err);
+            return;
+          }
+          List<CompactionState> regionStates = new ArrayList<>();
+          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
+          locations.stream().filter(loc -> loc.getServerName() != null)
+            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
+            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
+              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
+                // If any region compaction state is MAJOR_AND_MINOR
+                // the table compaction state is MAJOR_AND_MINOR, too.
+                if (err2 != null) {
+                  future.completeExceptionally(err2);
+                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
+                  future.complete(regionState);
+                } else {
+                  regionStates.add(regionState);
+                }
+              }));
+            });
+          addListener(
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
+            (ret, err3) -> {
+              // If future not completed, check all regions's compaction state
+              if (!future.isCompletedExceptionally() && !future.isDone()) {
+                CompactionState state = CompactionState.NONE;
+                for (CompactionState regionState : regionStates) {
+                  switch (regionState) {
+                    case MAJOR:
+                      if (state == CompactionState.MINOR) {
+                        future.complete(CompactionState.MAJOR_AND_MINOR);
+                      } else {
+                        state = CompactionState.MAJOR;
                       }
-                      if (!future.isDone()) {
-                        future.complete(state);
+                      break;
+                    case MINOR:
+                      if (state == CompactionState.MAJOR) {
+                        future.complete(CompactionState.MAJOR_AND_MINOR);
+                      } else {
+                        state = CompactionState.MINOR;
                       }
-                    }
+                      break;
+                    case NONE:
+                    default:
                   }
-                });
-          });
+                  if (!future.isDone()) {
+                    future.complete(state);
+                  }
+                }
+              }
+            });
+        });
         break;
       default:
         throw new IllegalArgumentException("Unknown compactType: " + compactType);
@@ -2893,37 +2885,38 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
     CompletableFuture<CompactionState> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
-        }
+    addListener(getRegionLocation(regionName), (location, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      ServerName serverName = location.getServerName();
+      if (serverName == null) {
+        future
+          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+        return;
+      }
+      addListener(
         this.<GetRegionInfoResponse> newAdminCaller()
-            .action(
-              (controller, stub) -> this
-                  .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
-                    controller, stub, RequestConverter.buildGetRegionInfoRequest(location
-                        .getRegion().getRegionName(), true), (s, c, req, done) -> s
-                        .getRegionInfo(controller, req, done), resp -> resp))
-            .serverName(serverName).call().whenComplete((resp2, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                if (resp2.hasCompactionState()) {
-                  future.complete(

<TRUNCATED>

[3/4] hbase git commit: HBASE-17356 Add replica get support

Posted by zh...@apache.org.
HBASE-17356 Add replica get support


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ff23c022
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ff23c022
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ff23c022

Branch: refs/heads/branch-2.0
Commit: ff23c022126ab0cb557bc55cd0b38043b1ed385d
Parents: 4caf2fb
Author: zhangduo <zh...@apache.org>
Authored: Tue Jan 1 21:59:37 2019 +0800
Committer: Duo Zhang <zh...@apache.org>
Committed: Thu Jan 3 10:55:06 2019 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/RegionLocations.java    |   30 +-
 .../client/AsyncBatchRpcRetryingCaller.java     |  114 +-
 .../client/AsyncConnectionConfiguration.java    |   12 +
 .../hbase/client/AsyncConnectionImpl.java       |    1 -
 .../hbase/client/AsyncMetaRegionLocator.java    |  125 +-
 .../hbase/client/AsyncNonMetaRegionLocator.java |  291 +--
 .../hadoop/hbase/client/AsyncRegionLocator.java |  129 +-
 .../hbase/client/AsyncRegionLocatorHelper.java  |  147 ++
 .../hbase/client/AsyncRpcRetryingCaller.java    |   15 +-
 .../client/AsyncRpcRetryingCallerFactory.java   |   55 +-
 .../AsyncSingleRequestRpcRetryingCaller.java    |   71 +-
 .../hbase/client/AsyncTableRegionLocator.java   |   28 +-
 .../client/AsyncTableRegionLocatorImpl.java     |    6 +-
 .../hbase/client/ConnectionConfiguration.java   |    5 +-
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 1897 +++++++++---------
 .../hadoop/hbase/client/RawAsyncTableImpl.java  |  208 +-
 .../apache/hadoop/hbase/util/FutureUtils.java   |   60 +
 .../hbase/client/RegionReplicaTestHelper.java   |  161 ++
 .../client/TestAsyncMetaRegionLocator.java      |   55 +-
 .../client/TestAsyncNonMetaRegionLocator.java   |  126 +-
 ...syncNonMetaRegionLocatorConcurrenyLimit.java |   20 +-
 ...TestAsyncSingleRequestRpcRetryingCaller.java |   56 +-
 .../client/TestAsyncTableLocatePrefetch.java    |    4 +-
 .../client/TestAsyncTableRegionReplicasGet.java |  204 ++
 .../hbase/client/TestZKAsyncRegistry.java       |   44 +-
 25 files changed, 2301 insertions(+), 1563 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
index fd6f3c7..f98bf03 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
@@ -56,8 +56,8 @@ public class RegionLocations {
     int index = 0;
     for (HRegionLocation loc : locations) {
       if (loc != null) {
-        if (loc.getRegionInfo().getReplicaId() >= maxReplicaId) {
-          maxReplicaId = loc.getRegionInfo().getReplicaId();
+        if (loc.getRegion().getReplicaId() >= maxReplicaId) {
+          maxReplicaId = loc.getRegion().getReplicaId();
           maxReplicaIdIndex = index;
         }
       }
@@ -72,7 +72,7 @@ public class RegionLocations {
       this.locations = new HRegionLocation[maxReplicaId + 1];
       for (HRegionLocation loc : locations) {
         if (loc != null) {
-          this.locations[loc.getRegionInfo().getReplicaId()] = loc;
+          this.locations[loc.getRegion().getReplicaId()] = loc;
         }
       }
     }
@@ -146,7 +146,7 @@ public class RegionLocations {
   public RegionLocations remove(HRegionLocation location) {
     if (location == null) return this;
     if (location.getRegion() == null) return this;
-    int replicaId = location.getRegionInfo().getReplicaId();
+    int replicaId = location.getRegion().getReplicaId();
     if (replicaId >= locations.length) return this;
 
     // check whether something to remove. HRL.compareTo() compares ONLY the
@@ -203,14 +203,14 @@ public class RegionLocations {
     // in case of region replication going down, we might have a leak here.
     int max = other.locations.length;
 
-    HRegionInfo regionInfo = null;
+    RegionInfo regionInfo = null;
     for (int i = 0; i < max; i++) {
       HRegionLocation thisLoc = this.getRegionLocation(i);
       HRegionLocation otherLoc = other.getRegionLocation(i);
-      if (regionInfo == null && otherLoc != null && otherLoc.getRegionInfo() != null) {
+      if (regionInfo == null && otherLoc != null && otherLoc.getRegion() != null) {
         // regionInfo is the first non-null HRI from other RegionLocations. We use it to ensure that
         // all replica region infos belong to the same region with same region id.
-        regionInfo = otherLoc.getRegionInfo();
+        regionInfo = otherLoc.getRegion();
       }
 
       HRegionLocation selectedLoc = selectRegionLocation(thisLoc,
@@ -232,7 +232,7 @@ public class RegionLocations {
       for (int i=0; i < newLocations.length; i++) {
         if (newLocations[i] != null) {
           if (!RegionReplicaUtil.isReplicasForSameRegion(regionInfo,
-            newLocations[i].getRegionInfo())) {
+            newLocations[i].getRegion())) {
             newLocations[i] = null;
           }
         }
@@ -273,9 +273,9 @@ public class RegionLocations {
       boolean checkForEquals, boolean force) {
     assert location != null;
 
-    int replicaId = location.getRegionInfo().getReplicaId();
+    int replicaId = location.getRegion().getReplicaId();
 
-    HRegionLocation oldLoc = getRegionLocation(location.getRegionInfo().getReplicaId());
+    HRegionLocation oldLoc = getRegionLocation(location.getRegion().getReplicaId());
     HRegionLocation selectedLoc = selectRegionLocation(oldLoc, location,
       checkForEquals, force);
 
@@ -288,8 +288,8 @@ public class RegionLocations {
     // ensure that all replicas share the same start code. Otherwise delete them
     for (int i=0; i < newLocations.length; i++) {
       if (newLocations[i] != null) {
-        if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegionInfo(),
-          newLocations[i].getRegionInfo())) {
+        if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegion(),
+          newLocations[i].getRegion())) {
           newLocations[i] = null;
         }
       }
@@ -317,8 +317,8 @@ public class RegionLocations {
   public HRegionLocation getRegionLocationByRegionName(byte[] regionName) {
     for (HRegionLocation loc : locations) {
       if (loc != null) {
-        if (Bytes.equals(loc.getRegionInfo().getRegionName(), regionName)
-            || Bytes.equals(loc.getRegionInfo().getEncodedNameAsBytes(), regionName)) {
+        if (Bytes.equals(loc.getRegion().getRegionName(), regionName)
+            || Bytes.equals(loc.getRegion().getEncodedNameAsBytes(), regionName)) {
           return loc;
         }
       }
@@ -331,7 +331,7 @@ public class RegionLocations {
   }
 
   public HRegionLocation getDefaultRegionLocation() {
-    return locations[HRegionInfo.DEFAULT_REPLICA_ID];
+    return locations[RegionInfo.DEFAULT_REPLICA_ID];
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 51b89a9..e268b2e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -23,8 +23,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -43,24 +42,26 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * Retry caller for batch.
@@ -121,10 +122,10 @@ class AsyncBatchRpcRetryingCaller<T> {
   private static final class ServerRequest {
 
     public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
-        new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
 
     public void addAction(HRegionLocation loc, Action action) {
-      computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(),
+      computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
         () -> new RegionRequest(loc)).actions.add(action);
     }
   }
@@ -173,11 +174,10 @@ class AsyncBatchRpcRetryingCaller<T> {
       Throwable error, ServerName serverName) {
     if (tries > startLogErrorsCnt) {
       String regions =
-          regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'")
-              .collect(Collectors.joining(",", "[", "]"));
-      LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName
-          + " failed, tries=" + tries,
-        error);
+        regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'")
+          .collect(Collectors.joining(",", "[", "]"));
+      LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName +
+        " failed, tries=" + tries, error);
     }
   }
 
@@ -191,7 +191,7 @@ class AsyncBatchRpcRetryingCaller<T> {
       errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
     }
     errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
-        getExtraContextForError(serverName)));
+      getExtraContextForError(serverName)));
   }
 
   private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
@@ -204,7 +204,7 @@ class AsyncBatchRpcRetryingCaller<T> {
       return;
     }
     ThrowableWithExtraContext errorWithCtx =
-        new ThrowableWithExtraContext(error, currentTime, extras);
+      new ThrowableWithExtraContext(error, currentTime, extras);
     List<ThrowableWithExtraContext> errors = removeErrors(action);
     if (errors == null) {
       errors = Collections.singletonList(errorWithCtx);
@@ -227,7 +227,7 @@ class AsyncBatchRpcRetryingCaller<T> {
         return;
       }
       future.completeExceptionally(new RetriesExhaustedException(tries,
-          Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
+        Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
     });
   }
 
@@ -242,9 +242,9 @@ class AsyncBatchRpcRetryingCaller<T> {
       // multiRequestBuilder will be populated with region actions.
       // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
       // action list.
-      RequestConverter.buildNoDataRegionActions(entry.getKey(),
-        entry.getValue().actions, cells, multiRequestBuilder, regionActionBuilder, actionBuilder,
-        mutationBuilder, nonceGroup, rowMutationsIndexMap);
+      RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions, cells,
+        multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
+        rowMutationsIndexMap);
     }
     return multiRequestBuilder.build();
   }
@@ -254,15 +254,15 @@ class AsyncBatchRpcRetryingCaller<T> {
       RegionResult regionResult, List<Action> failedActions, Throwable regionException) {
     Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
     if (result == null) {
-      LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
-          + Bytes.toStringBinary(action.getAction().getRow()) + "' of "
-          + regionReq.loc.getRegionInfo().getRegionNameAsString());
+      LOG.error("Server " + serverName + " sent us neither result nor exception for row '" +
+        Bytes.toStringBinary(action.getAction().getRow()) + "' of " +
+        regionReq.loc.getRegion().getRegionNameAsString());
       addError(action, new RuntimeException("Invalid response"), serverName);
       failedActions.add(action);
     } else if (result instanceof Throwable) {
       Throwable error = translateException((Throwable) result);
       logException(tries, () -> Stream.of(regionReq), error, serverName);
-      conn.getLocator().updateCachedLocation(regionReq.loc, error);
+      conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
       if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
         failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
           getExtraContextForError(serverName));
@@ -281,20 +281,19 @@ class AsyncBatchRpcRetryingCaller<T> {
       RegionResult regionResult = resp.getResults().get(rn);
       Throwable regionException = resp.getException(rn);
       if (regionResult != null) {
-        regionReq.actions.forEach(
-          action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions,
-            regionException));
+        regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName,
+          regionResult, failedActions, regionException));
       } else {
         Throwable error;
         if (regionException == null) {
-          LOG.error(
-            "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
+          LOG
+            .error("Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
           error = new RuntimeException("Invalid response");
         } else {
           error = translateException(regionException);
         }
         logException(tries, () -> Stream.of(regionReq), error, serverName);
-        conn.getLocator().updateCachedLocation(regionReq.loc, error);
+        conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
         if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
           failAll(regionReq.actions.stream(), tries, error, serverName);
           return;
@@ -314,8 +313,7 @@ class AsyncBatchRpcRetryingCaller<T> {
       remainingNs = remainingTimeNs();
       if (remainingNs <= 0) {
         failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream())
-            .flatMap(r -> r.actions.stream()),
-          tries);
+          .flatMap(r -> r.actions.stream()), tries);
         return;
       }
     } else {
@@ -366,15 +364,15 @@ class AsyncBatchRpcRetryingCaller<T> {
       ServerName serverName) {
     Throwable error = translateException(t);
     logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
-    actionsByRegion
-        .forEach((rn, regionReq) -> conn.getLocator().updateCachedLocation(regionReq.loc, error));
+    actionsByRegion.forEach(
+      (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
     if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
       failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
         serverName);
       return;
     }
     List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
-        .collect(Collectors.toList());
+      .collect(Collectors.toList());
     addError(copiedActions, error, serverName);
     tryResubmit(copiedActions.stream(), tries);
   }
@@ -407,30 +405,30 @@ class AsyncBatchRpcRetryingCaller<T> {
     }
     ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
     ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
-    CompletableFuture.allOf(actions
-        .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
-          RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
-            if (error != null) {
-              error = translateException(error);
-              if (error instanceof DoNotRetryIOException) {
-                failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
-                return;
-              }
-              addError(action, error, null);
-              locateFailed.add(action);
-            } else {
-              computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new)
-                  .addAction(loc, action);
+    addListener(CompletableFuture.allOf(actions
+      .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
+        RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
+          if (error != null) {
+            error = translateException(error);
+            if (error instanceof DoNotRetryIOException) {
+              failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
+              return;
             }
-          }))
-        .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {
-          if (!actionsByServer.isEmpty()) {
-            send(actionsByServer, tries);
-          }
-          if (!locateFailed.isEmpty()) {
-            tryResubmit(locateFailed.stream(), tries);
+            addError(action, error, null);
+            locateFailed.add(action);
+          } else {
+            computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc,
+              action);
           }
-        });
+        }))
+      .toArray(CompletableFuture[]::new)), (v, r) -> {
+        if (!actionsByServer.isEmpty()) {
+          send(actionsByServer, tries);
+        }
+        if (!locateFailed.isEmpty()) {
+          tryResubmit(locateFailed.stream(), tries);
+        }
+      });
   }
 
   public List<CompletableFuture<T>> call() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 915e9dd..fa051a5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -39,6 +39,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
 import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
@@ -94,6 +96,10 @@ class AsyncConnectionConfiguration {
 
   private final long writeBufferPeriodicFlushTimeoutNs;
 
+  // this is for supporting region replica get, if the primary does not finished within this
+  // timeout, we will send request to secondaries.
+  private final long primaryCallTimeoutNs;
+
   @SuppressWarnings("deprecation")
   AsyncConnectionConfiguration(Configuration conf) {
     this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
@@ -124,6 +130,8 @@ class AsyncConnectionConfiguration {
     this.writeBufferPeriodicFlushTimeoutNs =
       TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
         WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
+    this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
+      conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
   }
 
   long getMetaOperationTimeoutNs() {
@@ -181,4 +189,8 @@ class AsyncConnectionConfiguration {
   long getWriteBufferPeriodicFlushTimeoutNs() {
     return writeBufferPeriodicFlushTimeoutNs;
   }
+
+  long getPrimaryCallTimeoutNs() {
+    return primaryCallTimeoutNs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index b26f6b2..507bfc3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -136,7 +136,6 @@ class AsyncConnectionImpl implements AsyncConnection {
   }
 
   // we will override this method for testing retry caller, so do not remove this method.
-  @VisibleForTesting
   AsyncRegionLocator getLocator() {
     return locator;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
index 06b5b57..9fef15d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
@@ -17,11 +17,16 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.AsyncRegionLocator.*;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,43 +41,43 @@ class AsyncMetaRegionLocator {
 
   private final AsyncRegistry registry;
 
-  private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
+  private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();
 
-  private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
-      new AtomicReference<>();
+  private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture =
+    new AtomicReference<>();
 
   AsyncMetaRegionLocator(AsyncRegistry registry) {
     this.registry = registry;
   }
 
-  CompletableFuture<HRegionLocation> getRegionLocation(boolean reload) {
+  /**
+   * Get the region locations for meta region. If the location for the given replica is not
+   * available in the cached locations, then fetch from the HBase cluster.
+   * <p/>
+   * The <code>replicaId</code> parameter is important. If the region replication config for meta
+   * region is changed, then the cached region locations may not have the locations for new
+   * replicas. If we do not check the location for the given replica, we will always return the
+   * cached region locations and cause an infinite loop.
+   */
+  CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
     for (;;) {
       if (!reload) {
-        HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
-        if (metaRegionLocation != null) {
-          return CompletableFuture.completedFuture(metaRegionLocation);
+        RegionLocations locs = this.metaRegionLocations.get();
+        if (isGood(locs, replicaId)) {
+          return CompletableFuture.completedFuture(locs);
         }
       }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Meta region location cache is null, try fetching from registry.");
-      }
+      LOG.trace("Meta region location cache is null, try fetching from registry.");
       if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Start fetching meta region location from registry.");
-        }
-        CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+        LOG.debug("Start fetching meta region location from registry.");
+        CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
         registry.getMetaRegionLocation().whenComplete((locs, error) -> {
           if (error != null) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Failed to fetch meta region location from registry", error);
-            }
+            LOG.debug("Failed to fetch meta region location from registry", error);
             metaRelocateFuture.getAndSet(null).completeExceptionally(error);
             return;
           }
-          HRegionLocation loc = locs.getDefaultRegionLocation();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("The fetched meta region location is " + loc);
-          }
+          LOG.debug("The fetched meta region location is {}" + locs);
           // Here we update cache before reset future, so it is possible that someone can get a
           // stale value. Consider this:
           // 1. update cache
@@ -82,12 +87,12 @@ class AsyncMetaRegionLocator {
           // cleared in step 2.
           // But we do not think it is a big deal as it rarely happens, and even if it happens, the
           // caller will retry again later, no correctness problems.
-          this.metaRegionLocation.set(loc);
+          this.metaRegionLocations.set(locs);
           metaRelocateFuture.set(null);
-          future.complete(loc);
+          future.complete(locs);
         });
       } else {
-        CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
+        CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
         if (future != null) {
           return future;
         }
@@ -95,30 +100,56 @@ class AsyncMetaRegionLocator {
     }
   }
 
-  void updateCachedLocation(HRegionLocation loc, Throwable exception) {
-    AsyncRegionLocator.updateCachedLocation(loc, exception, l -> metaRegionLocation.get(),
-      newLoc -> {
-        for (;;) {
-          HRegionLocation oldLoc = metaRegionLocation.get();
-          if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum() ||
-              oldLoc.getServerName().equals(newLoc.getServerName()))) {
-            return;
-          }
-          if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
-            return;
-          }
-        }
-      }, l -> {
-        for (;;) {
-          HRegionLocation oldLoc = metaRegionLocation.get();
-          if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
-            return;
-          }
+  private HRegionLocation getCacheLocation(HRegionLocation loc) {
+    RegionLocations locs = metaRegionLocations.get();
+    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
+  }
+
+  private void addLocationToCache(HRegionLocation loc) {
+    for (;;) {
+      int replicaId = loc.getRegion().getReplicaId();
+      RegionLocations oldLocs = metaRegionLocations.get();
+      if (oldLocs == null) {
+        RegionLocations newLocs = createRegionLocations(loc);
+        if (metaRegionLocations.compareAndSet(null, newLocs)) {
+          return;
         }
-      });
+      }
+      HRegionLocation oldLoc = oldLocs.getRegionLocation(replicaId);
+      if (oldLoc != null && (oldLoc.getSeqNum() > loc.getSeqNum() ||
+        oldLoc.getServerName().equals(loc.getServerName()))) {
+        return;
+      }
+      RegionLocations newLocs = replaceRegionLocation(oldLocs, loc);
+      if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
+        return;
+      }
+    }
+  }
+
+  private void removeLocationFromCache(HRegionLocation loc) {
+    for (;;) {
+      RegionLocations oldLocs = metaRegionLocations.get();
+      if (oldLocs == null) {
+        return;
+      }
+      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
+      if (!canUpdateOnError(loc, oldLoc)) {
+        return;
+      }
+      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
+      if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
+        return;
+      }
+    }
+  }
+
+  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
+      this::addLocationToCache, this::removeLocationFromCache);
   }
 
   void clearCache() {
-    metaRegionLocation.set(null);
+    metaRegionLocations.set(null);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 7e3d56c..1fcfbb0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.HConstants.NINES;
 import static org.apache.hadoop.hbase.HConstants.ZEROES;
 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.mergeRegionLocations;
+import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
 import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
@@ -39,7 +44,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
@@ -53,6 +60,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Objects;
 
 /**
  * The asynchronous locator for regions other than meta.
@@ -83,9 +91,9 @@ class AsyncNonMetaRegionLocator {
 
   private static final class LocateRequest {
 
-    public final byte[] row;
+    private final byte[] row;
 
-    public final RegionLocateType locateType;
+    private final RegionLocateType locateType;
 
     public LocateRequest(byte[] row, RegionLocateType locateType) {
       this.row = row;
@@ -109,12 +117,12 @@ class AsyncNonMetaRegionLocator {
 
   private static final class TableCache {
 
-    public final ConcurrentNavigableMap<byte[], HRegionLocation> cache =
+    private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
       new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
 
-    public final Set<LocateRequest> pendingRequests = new HashSet<>();
+    private final Set<LocateRequest> pendingRequests = new HashSet<>();
 
-    public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests =
+    private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
       new LinkedHashMap<>();
 
     public boolean hasQuota(int max) {
@@ -133,25 +141,29 @@ class AsyncNonMetaRegionLocator {
       return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
     }
 
-    public void clearCompletedRequests(Optional<HRegionLocation> location) {
-      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter =
+    public void clearCompletedRequests(Optional<RegionLocations> locations) {
+      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
         allRequests.entrySet().iterator(); iter.hasNext();) {
-        Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
-        if (tryComplete(entry.getKey(), entry.getValue(), location)) {
+        Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
+        if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
           iter.remove();
         }
       }
     }
 
-    private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
-        Optional<HRegionLocation> location) {
+    private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
+        Optional<RegionLocations> locations) {
       if (future.isDone()) {
         return true;
       }
-      if (!location.isPresent()) {
+      if (!locations.isPresent()) {
         return false;
       }
-      HRegionLocation loc = location.get();
+      RegionLocations locs = locations.get();
+      HRegionLocation loc = ObjectUtils.firstNonNull(locs.getRegionLocations());
+      // we should at least have one location available, otherwise the request should fail and
+      // should not arrive here
+      assert loc != null;
       boolean completed;
       if (req.locateType.equals(RegionLocateType.BEFORE)) {
         // for locating the row before current row, the common case is to find the previous region
@@ -166,7 +178,7 @@ class AsyncNonMetaRegionLocator {
         completed = loc.getRegion().containsRow(req.row);
       }
       if (completed) {
-        future.complete(loc);
+        future.complete(locs);
         return true;
       } else {
         return false;
@@ -186,59 +198,59 @@ class AsyncNonMetaRegionLocator {
     return computeIfAbsent(cache, tableName, TableCache::new);
   }
 
-  private void removeFromCache(HRegionLocation loc) {
-    TableCache tableCache = cache.get(loc.getRegion().getTable());
-    if (tableCache == null) {
-      return;
+  private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
+    HRegionLocation[] locArr1 = locs1.getRegionLocations();
+    HRegionLocation[] locArr2 = locs2.getRegionLocations();
+    if (locArr1.length != locArr2.length) {
+      return false;
     }
-    tableCache.cache.computeIfPresent(loc.getRegion().getStartKey(), (k, oldLoc) -> {
-      if (oldLoc.getSeqNum() > loc.getSeqNum() ||
-        !oldLoc.getServerName().equals(loc.getServerName())) {
-        return oldLoc;
+    for (int i = 0; i < locArr1.length; i++) {
+      // do not need to compare region info
+      HRegionLocation loc1 = locArr1[i];
+      HRegionLocation loc2 = locArr2[i];
+      if (loc1 == null) {
+        if (loc2 != null) {
+          return false;
+        }
+      } else {
+        if (loc2 == null) {
+          return false;
+        }
+        if (loc1.getSeqNum() != loc2.getSeqNum()) {
+          return false;
+        }
+        if (Objects.equal(loc1.getServerName(), loc2.getServerName())) {
+          return false;
+        }
       }
-      return null;
-    });
+    }
+    return true;
   }
 
   // return whether we add this loc to cache
-  private boolean addToCache(TableCache tableCache, HRegionLocation loc) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Try adding " + loc + " to cache");
-    }
-    byte[] startKey = loc.getRegion().getStartKey();
-    HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc);
-    if (oldLoc == null) {
-      return true;
-    }
-    if (oldLoc.getSeqNum() > loc.getSeqNum() ||
-      oldLoc.getServerName().equals(loc.getServerName())) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc +
-          " is newer than us or has the same server name");
-      }
-      return false;
-    }
-    return loc == tableCache.cache.compute(startKey, (k, oldValue) -> {
-      if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
-        return loc;
+  private boolean addToCache(TableCache tableCache, RegionLocations locs) {
+    LOG.trace("Try adding {} to cache", locs);
+    byte[] startKey = locs.getDefaultRegionLocation().getRegion().getStartKey();
+    for (;;) {
+      RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
+      if (oldLocs == null) {
+        return true;
       }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue +
+      RegionLocations mergedLocs = mergeRegionLocations(locs, oldLocs);
+      if (isEqual(mergedLocs, oldLocs)) {
+        // the merged one is the same with the old one, give up
+        LOG.trace("Will not add {} to cache because the old value {} " +
           " is newer than us or has the same server name." +
-          " Maybe it is updated before we replace it");
+          " Maybe it is updated before we replace it", locs, oldLocs);
+        return false;
       }
-      return oldValue;
-    });
-  }
-
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
-      justification = "Called by lambda expression")
-  private void addToCache(HRegionLocation loc) {
-    addToCache(getTableCache(loc.getRegion().getTable()), loc);
-    LOG.trace("Try adding {} to cache", loc);
+      if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
+        return true;
+      }
+    }
   }
 
-  private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
+  private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
       Throwable error) {
     if (error != null) {
       LOG.warn("Failed to locate region in '" + tableName + "', row='" +
@@ -246,8 +258,8 @@ class AsyncNonMetaRegionLocator {
     }
     Optional<LocateRequest> toSend = Optional.empty();
     TableCache tableCache = getTableCache(tableName);
-    if (loc != null) {
-      if (!addToCache(tableCache, loc)) {
+    if (locs != null) {
+      if (!addToCache(tableCache, locs)) {
         // someone is ahead of us.
         synchronized (tableCache) {
           tableCache.pendingRequests.remove(req);
@@ -269,7 +281,7 @@ class AsyncNonMetaRegionLocator {
           future.completeExceptionally(error);
         }
       }
-      tableCache.clearCompletedRequests(Optional.ofNullable(loc));
+      tableCache.clearCompletedRequests(Optional.ofNullable(locs));
       // Remove a complete locate request in a synchronized block, so the table cache must have
       // quota to send a candidate request.
       toSend = tableCache.getCandidate();
@@ -286,9 +298,11 @@ class AsyncNonMetaRegionLocator {
         Bytes.toStringBinary(req.row), req.locateType, locs);
     }
 
+    // the default region location should always be presented when fetching from meta, otherwise
+    // let's fail the request.
     if (locs == null || locs.getDefaultRegionLocation() == null) {
       complete(tableName, req, null,
-        new IOException(String.format("No location found for '%s', row='%s', locateType=%s",
+        new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
           tableName, Bytes.toStringBinary(req.row), req.locateType)));
       return true;
     }
@@ -296,58 +310,60 @@ class AsyncNonMetaRegionLocator {
     RegionInfo info = loc.getRegion();
     if (info == null) {
       complete(tableName, req, null,
-        new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
+        new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
           tableName, Bytes.toStringBinary(req.row), req.locateType)));
       return true;
     }
     if (info.isSplitParent()) {
       return false;
     }
-    if (loc.getServerName() == null) {
-      complete(tableName, req, null,
-        new IOException(
-          String.format("No server address listed for region '%s', row='%s', locateType=%s",
-            info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType)));
-      return true;
-    }
-    complete(tableName, req, loc, null);
+    complete(tableName, req, locs, null);
     return true;
   }
 
-  private HRegionLocation locateRowInCache(TableCache tableCache, TableName tableName, byte[] row) {
-    Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row);
+  private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
+      int replicaId) {
+    Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
     if (entry == null) {
       return null;
     }
-    HRegionLocation loc = entry.getValue();
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
     byte[] endKey = loc.getRegion().getEndKey();
     if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
-          Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT);
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
       }
-      return loc;
+      return locs;
     } else {
       return null;
     }
   }
 
-  private HRegionLocation locateRowBeforeInCache(TableCache tableCache, TableName tableName,
-      byte[] row) {
+  private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
+      byte[] row, int replicaId) {
     boolean isEmptyStopRow = isEmptyStopRow(row);
-    Map.Entry<byte[], HRegionLocation> entry =
-        isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
+    Map.Entry<byte[], RegionLocations> entry =
+      isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
     if (entry == null) {
       return null;
     }
-    HRegionLocation loc = entry.getValue();
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
     if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
       (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
-          Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE);
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
       }
-      return loc;
+      return locs;
     } else {
       return null;
     }
@@ -390,8 +406,8 @@ class AsyncNonMetaRegionLocator {
             if (tableNotFound) {
               complete(tableName, req, null, new TableNotFoundException(tableName));
             } else if (!completeNormally) {
-              complete(tableName, req, null, new IOException(
-                "Unable to find region for " + Bytes.toStringBinary(req.row) + " in " + tableName));
+              complete(tableName, req, null, new IOException("Unable to find region for '" +
+                Bytes.toStringBinary(req.row) + "' in " + tableName));
             }
           }
 
@@ -423,13 +439,12 @@ class AsyncNonMetaRegionLocator {
                   continue;
                 }
                 RegionInfo info = loc.getRegion();
-                if (info == null || info.isOffline() || info.isSplitParent() ||
-                  loc.getServerName() == null) {
+                if (info == null || info.isOffline() || info.isSplitParent()) {
                   continue;
                 }
-                if (addToCache(tableCache, loc)) {
+                if (addToCache(tableCache, locs)) {
                   synchronized (tableCache) {
-                    tableCache.clearCompletedRequests(Optional.of(loc));
+                    tableCache.clearCompletedRequests(Optional.of(locs));
                   }
                 }
               }
@@ -438,36 +453,36 @@ class AsyncNonMetaRegionLocator {
         });
   }
 
-  private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row,
-      RegionLocateType locateType) {
+  private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
+      int replicaId, RegionLocateType locateType) {
     return locateType.equals(RegionLocateType.BEFORE)
-      ? locateRowBeforeInCache(tableCache, tableName, row)
-      : locateRowInCache(tableCache, tableName, row);
+      ? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
+      : locateRowInCache(tableCache, tableName, row, replicaId);
   }
 
   // locateToPrevious is true means we will use the start key of a region to locate the region
   // placed before it. Used for reverse scan. See the comment of
   // AsyncRegionLocator.getPreviousRegionLocation.
-  private CompletableFuture<HRegionLocation> getRegionLocationInternal(TableName tableName,
-      byte[] row, RegionLocateType locateType, boolean reload) {
+  private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
+      byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
     // AFTER should be convert to CURRENT before calling this method
     assert !locateType.equals(RegionLocateType.AFTER);
     TableCache tableCache = getTableCache(tableName);
     if (!reload) {
-      HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
-      if (loc != null) {
-        return CompletableFuture.completedFuture(loc);
+      RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
+      if (isGood(locs, replicaId)) {
+        return CompletableFuture.completedFuture(locs);
       }
     }
-    CompletableFuture<HRegionLocation> future;
+    CompletableFuture<RegionLocations> future;
     LocateRequest req;
     boolean sendRequest = false;
     synchronized (tableCache) {
       // check again
       if (!reload) {
-        HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
-        if (loc != null) {
-          return CompletableFuture.completedFuture(loc);
+        RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
+        if (isGood(locs, replicaId)) {
+          return CompletableFuture.completedFuture(locs);
         }
       }
       req = new LocateRequest(row, locateType);
@@ -487,28 +502,58 @@ class AsyncNonMetaRegionLocator {
     return future;
   }
 
-  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
-      RegionLocateType locateType, boolean reload) {
-    if (locateType.equals(RegionLocateType.BEFORE)) {
-      return getRegionLocationInternal(tableName, row, locateType, reload);
-    } else {
-      // as we know the exact row after us, so we can just create the new row, and use the same
-      // algorithm to locate it.
-      if (locateType.equals(RegionLocateType.AFTER)) {
-        row = createClosestRowAfter(row);
-      }
-      return getRegionLocationInternal(tableName, row, RegionLocateType.CURRENT, reload);
+  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
+      int replicaId, RegionLocateType locateType, boolean reload) {
+    // as we know the exact row after us, so we can just create the new row, and use the same
+    // algorithm to locate it.
+    if (locateType.equals(RegionLocateType.AFTER)) {
+      row = createClosestRowAfter(row);
+      locateType = RegionLocateType.CURRENT;
     }
+    return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
   }
 
-  void updateCachedLocation(HRegionLocation loc, Throwable exception) {
-    AsyncRegionLocator.updateCachedLocation(loc, exception, l -> {
-      TableCache tableCache = cache.get(l.getRegion().getTable());
-      if (tableCache == null) {
-        return null;
+  private void removeLocationFromCache(HRegionLocation loc) {
+    TableCache tableCache = cache.get(loc.getRegion().getTable());
+    if (tableCache == null) {
+      return;
+    }
+    byte[] startKey = loc.getRegion().getStartKey();
+    for (;;) {
+      RegionLocations oldLocs = tableCache.cache.get(startKey);
+      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
+      if (!canUpdateOnError(loc, oldLoc)) {
+        return;
       }
-      return tableCache.cache.get(l.getRegion().getStartKey());
-    }, this::addToCache, this::removeFromCache);
+      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
+      if (newLocs == null) {
+        if (tableCache.cache.remove(startKey, oldLocs)) {
+          return;
+        }
+      } else {
+        if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
+          return;
+        }
+      }
+    }
+  }
+
+  private void addLocationToCache(HRegionLocation loc) {
+    addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
+  }
+
+  private HRegionLocation getCachedLocation(HRegionLocation loc) {
+    TableCache tableCache = cache.get(loc.getRegion().getTable());
+    if (tableCache == null) {
+      return null;
+    }
+    RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
+    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
+  }
+
+  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
+    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
+      this::addLocationToCache, this::removeLocationFromCache);
   }
 
   void clearCache(TableName tableName) {
@@ -526,11 +571,11 @@ class AsyncNonMetaRegionLocator {
 
   // only used for testing whether we have cached the location for a region.
   @VisibleForTesting
-  HRegionLocation getRegionLocationInCache(TableName tableName, byte[] row) {
+  RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
     TableCache tableCache = cache.get(tableName);
     if (tableCache == null) {
       return null;
     }
-    return locateRowInCache(tableCache, tableName, row);
+    return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 56228ab..d624974 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -18,26 +18,24 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
-import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.Supplier;
-
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionException;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.exceptions.RegionMovedException;
-import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
 
 /**
  * The asynchronous region locator.
@@ -59,8 +57,8 @@ class AsyncRegionLocator {
     this.retryTimer = retryTimer;
   }
 
-  private CompletableFuture<HRegionLocation> withTimeout(CompletableFuture<HRegionLocation> future,
-      long timeoutNs, Supplier<String> timeoutMsg) {
+  private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs,
+      Supplier<String> timeoutMsg) {
     if (future.isDone() || timeoutNs <= 0) {
       return future;
     }
@@ -78,74 +76,75 @@ class AsyncRegionLocator {
     });
   }
 
-  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+  private boolean isMeta(TableName tableName) {
+    return TableName.isMetaTableName(tableName);
+  }
+
+  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
       RegionLocateType type, boolean reload, long timeoutNs) {
+    CompletableFuture<RegionLocations> future = isMeta(tableName)
+      ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload)
+      : nonMetaRegionLocator.getRegionLocations(tableName, row,
+        RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
+    return withTimeout(future, timeoutNs,
+      () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
+        "ms) waiting for region locations for " + tableName + ", row='" +
+        Bytes.toStringBinary(row) + "'");
+  }
+
+  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+      int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
     // meta region can not be split right now so we always call the same method.
     // Change it later if the meta table can have more than one regions.
-    CompletableFuture<HRegionLocation> future =
-        tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation(reload)
-            : nonMetaRegionLocator.getRegionLocation(tableName, row, type, reload);
+    CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
+    CompletableFuture<RegionLocations> locsFuture =
+      isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload)
+        : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
+    addListener(locsFuture, (locs, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      HRegionLocation loc = locs.getRegionLocation(replicaId);
+      if (loc == null) {
+        future
+          .completeExceptionally(new RegionException("No location for " + tableName + ", row='" +
+            Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId));
+      } else if (loc.getServerName() == null) {
+        future.completeExceptionally(new HBaseIOException("No server address listed for region '" +
+          loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) +
+          "', locateType=" + type + ", replicaId=" + replicaId));
+      } else {
+        future.complete(loc);
+      }
+    });
     return withTimeout(future, timeoutNs,
       () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
-          "ms) waiting for region location for " + tableName + ", row='" +
-          Bytes.toStringBinary(row) + "'");
+        "ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) +
+        "', replicaId=" + replicaId);
   }
 
   CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
-      RegionLocateType type, long timeoutNs) {
-    return getRegionLocation(tableName, row, type, false, timeoutNs);
+      int replicaId, RegionLocateType type, long timeoutNs) {
+    return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs);
   }
 
-  static boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
-    // Do not need to update if no such location, or the location is newer, or the location is not
-    // same with us
-    return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() &&
-        oldLoc.getServerName().equals(loc.getServerName());
+  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+      RegionLocateType type, boolean reload, long timeoutNs) {
+    return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload,
+      timeoutNs);
   }
 
-  static void updateCachedLocation(HRegionLocation loc, Throwable exception,
-      Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
-      Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
-    HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Try updating " + loc + ", the old value is " + oldLoc, exception);
-    }
-    if (!canUpdate(loc, oldLoc)) {
-      return;
-    }
-    Throwable cause = findException(exception);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("The actual exception when updating " + loc, cause);
-    }
-    if (cause == null || !isMetaClearingException(cause)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(
-          "Will not update " + loc + " because the exception is null or not the one we care about");
-      }
-      return;
-    }
-    if (cause instanceof RegionMovedException) {
-      RegionMovedException rme = (RegionMovedException) cause;
-      HRegionLocation newLoc =
-          new HRegionLocation(loc.getRegionInfo(), rme.getServerName(), rme.getLocationSeqNum());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(
-          "Try updating " + loc + " with the new location " + newLoc + " constructed by " + rme);
-      }
-      addToCache.accept(newLoc);
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Try removing " + loc + " from cache");
-      }
-      removeFromCache.accept(loc);
-    }
+  CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
+      RegionLocateType type, long timeoutNs) {
+    return getRegionLocation(tableName, row, type, false, timeoutNs);
   }
 
-  void updateCachedLocation(HRegionLocation loc, Throwable exception) {
+  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
     if (loc.getRegion().isMetaRegion()) {
-      metaRegionLocator.updateCachedLocation(loc, exception);
+      metaRegionLocator.updateCachedLocationOnError(loc, exception);
     } else {
-      nonMetaRegionLocator.updateCachedLocation(loc, exception);
+      nonMetaRegionLocator.updateCachedLocationOnError(loc, exception);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
new file mode 100644
index 0000000..5c9c091
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
+import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
+
+import java.util.Arrays;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class for asynchronous region locator.
+ */
+@InterfaceAudience.Private
+final class AsyncRegionLocatorHelper {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocatorHelper.class);
+
+  private AsyncRegionLocatorHelper() {
+  }
+
+  static boolean canUpdateOnError(HRegionLocation loc, HRegionLocation oldLoc) {
+    // Do not need to update if no such location, or the location is newer, or the location is not
+    // the same with us
+    return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() &&
+      oldLoc.getServerName().equals(loc.getServerName());
+  }
+
+  static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
+      Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
+      Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
+    HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
+    LOG.debug("Try updating {} , the old value is {}", loc, oldLoc, exception);
+    if (!canUpdateOnError(loc, oldLoc)) {
+      return;
+    }
+    Throwable cause = findException(exception);
+    LOG.debug("The actual exception when updating {}", loc, cause);
+    if (cause == null || !isMetaClearingException(cause)) {
+      LOG.debug("Will not update {} because the exception is null or not the one we care about",
+        loc);
+      return;
+    }
+    if (cause instanceof RegionMovedException) {
+      RegionMovedException rme = (RegionMovedException) cause;
+      HRegionLocation newLoc =
+        new HRegionLocation(loc.getRegion(), rme.getServerName(), rme.getLocationSeqNum());
+      LOG.debug("Try updating {} with the new location {} constructed by {}", loc, newLoc, rme);
+      addToCache.accept(newLoc);
+    } else {
+      LOG.debug("Try removing {} from cache", loc);
+      removeFromCache.accept(loc);
+    }
+  }
+
+  static RegionLocations createRegionLocations(HRegionLocation loc) {
+    int replicaId = loc.getRegion().getReplicaId();
+    HRegionLocation[] locs = new HRegionLocation[replicaId + 1];
+    locs[replicaId] = loc;
+    return new RegionLocations(locs);
+  }
+
+  /**
+   * Create a new {@link RegionLocations} based on the given {@code oldLocs}, and replace the
+   * location for the given {@code replicaId} with the given {@code loc}.
+   * <p/>
+   * All the {@link RegionLocations} in async locator related class are immutable because we want to
+   * access them concurrently, so here we need to create a new one, instead of calling
+   * {@link RegionLocations#updateLocation(HRegionLocation, boolean, boolean)}.
+   */
+  static RegionLocations replaceRegionLocation(RegionLocations oldLocs, HRegionLocation loc) {
+    int replicaId = loc.getRegion().getReplicaId();
+    HRegionLocation[] locs = oldLocs.getRegionLocations();
+    locs = Arrays.copyOf(locs, Math.max(replicaId + 1, locs.length));
+    locs[replicaId] = loc;
+    return new RegionLocations(locs);
+  }
+
+  /**
+   * Create a new {@link RegionLocations} based on the given {@code oldLocs}, and remove the
+   * location for the given {@code replicaId}.
+   * <p/>
+   * All the {@link RegionLocations} in async locator related class are immutable because we want to
+   * access them concurrently, so here we need to create a new one, instead of calling
+   * {@link RegionLocations#remove(int)}.
+   */
+  static RegionLocations removeRegionLocation(RegionLocations oldLocs, int replicaId) {
+    HRegionLocation[] locs = oldLocs.getRegionLocations();
+    if (locs.length < replicaId + 1) {
+      // Here we do not modify the oldLocs so it is safe to return it.
+      return oldLocs;
+    }
+    locs = Arrays.copyOf(locs, locs.length);
+    locs[replicaId] = null;
+    if (ObjectUtils.firstNonNull(locs) != null) {
+      return new RegionLocations(locs);
+    } else {
+      // if all the locations are null, just return null
+      return null;
+    }
+  }
+
+  /**
+   * Create a new {@link RegionLocations} which is the merging result for the given two
+   * {@link RegionLocations}.
+   * <p/>
+   * All the {@link RegionLocations} in async locator related class are immutable because we want to
+   * access them concurrently, so here we need to create a new one, instead of calling
+   * {@link RegionLocations#mergeLocations(RegionLocations)} directly.
+   */
+  static RegionLocations mergeRegionLocations(RegionLocations newLocs, RegionLocations oldLocs) {
+    RegionLocations locs = new RegionLocations(newLocs.getRegionLocations());
+    locs.mergeLocations(oldLocs);
+    return locs;
+  }
+
+  static boolean isGood(RegionLocations locs, int replicaId) {
+    if (locs == null) {
+      return false;
+    }
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    return loc != null && loc.getServerName() != null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index d30012f..e03049a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -88,15 +88,15 @@ public abstract class AsyncRpcRetryingCaller<T> {
     return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
   }
 
-  protected long remainingTimeNs() {
+  protected final long remainingTimeNs() {
     return operationTimeoutNs - (System.nanoTime() - startNs);
   }
 
-  protected void completeExceptionally() {
+  protected final void completeExceptionally() {
     future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
   }
 
-  protected void resetCallTimeout() {
+  protected final void resetCallTimeout() {
     long callTimeoutNs;
     if (operationTimeoutNs > 0) {
       callTimeoutNs = remainingTimeNs();
@@ -111,8 +111,15 @@ public abstract class AsyncRpcRetryingCaller<T> {
     resetController(controller, callTimeoutNs);
   }
 
-  protected void onError(Throwable error, Supplier<String> errMsg,
+  protected final void onError(Throwable error, Supplier<String> errMsg,
       Consumer<Throwable> updateCachedLocation) {
+    if (future.isDone()) {
+      // Give up if the future is already done, this is possible if user has already canceled the
+      // future. And for timeline consistent read, we will also cancel some requests if we have
+      // already get one of the responses.
+      LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled());
+      return;
+    }
     error = translateException(error);
     if (error instanceof DoNotRetryIOException) {
       future.completeExceptionally(error);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index f80b4e5..a660e74 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -17,22 +17,22 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
 import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
 import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 
@@ -75,6 +75,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private RegionLocateType locateType = RegionLocateType.CURRENT;
 
+    private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
+
     public SingleRequestCallerBuilder<T> table(TableName tableName) {
       this.tableName = tableName;
       return this;
@@ -121,11 +123,17 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public SingleRequestCallerBuilder<T> replicaId(int replicaId) {
+      this.replicaId = replicaId;
+      return this;
+    }
+
     public AsyncSingleRequestRpcRetryingCaller<T> build() {
+      checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
       return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
-          checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
-          checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
-          pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+        checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId,
+        checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
+        pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**
@@ -241,11 +249,11 @@ class AsyncRpcRetryingCallerFactory {
     public AsyncScanSingleRegionRpcRetryingCaller build() {
       checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
       return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
-          checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
-          checkNotNull(resultCache, "resultCache is null"),
-          checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
-          checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
-          pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+        checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
+        checkNotNull(resultCache, "resultCache is null"),
+        checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
+        checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
+        pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**
@@ -311,7 +319,7 @@ class AsyncRpcRetryingCallerFactory {
 
     public <T> AsyncBatchRpcRetryingCaller<T> build() {
       return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
-          maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+        maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
     }
 
     public <T> List<CompletableFuture<T>> call() {
@@ -363,8 +371,8 @@ class AsyncRpcRetryingCallerFactory {
 
     public AsyncMasterRequestRpcRetryingCaller<T> build() {
       return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn,
-          checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
-          rpcTimeoutNs, startLogErrorsCnt);
+        checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
+        rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**
@@ -390,7 +398,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private ServerName serverName;
 
-    public AdminRequestCallerBuilder<T> action(AsyncAdminRequestRetryingCaller.Callable<T> callable) {
+    public AdminRequestCallerBuilder<T> action(
+        AsyncAdminRequestRetryingCaller.Callable<T> callable) {
       this.callable = callable;
       return this;
     }
@@ -420,15 +429,15 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
-    public AdminRequestCallerBuilder<T> serverName(ServerName serverName){
+    public AdminRequestCallerBuilder<T> serverName(ServerName serverName) {
       this.serverName = serverName;
       return this;
     }
 
     public AsyncAdminRequestRetryingCaller<T> build() {
       return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
-          operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName,
-            "serverName is null"), checkNotNull(callable, "action is null"));
+        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+        checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
     }
 
     public CompletableFuture<T> call() {
@@ -436,7 +445,7 @@ class AsyncRpcRetryingCallerFactory {
     }
   }
 
-  public <T> AdminRequestCallerBuilder<T> adminRequest(){
+  public <T> AdminRequestCallerBuilder<T> adminRequest() {
     return new AdminRequestCallerBuilder<>();
   }
 
@@ -488,8 +497,8 @@ class AsyncRpcRetryingCallerFactory {
 
     public AsyncServerRequestRpcRetryingCaller<T> build() {
       return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
-          operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName,
-            "serverName is null"), checkNotNull(callable, "action is null"));
+        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+        checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
     }
 
     public CompletableFuture<T> call() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 56c82fb..1a52e5c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -17,17 +17,19 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 
 /**
  * Retry caller for a single request, such as get, put, delete, etc.
@@ -45,18 +47,21 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
 
   private final byte[] row;
 
+  private final int replicaId;
+
   private final RegionLocateType locateType;
 
   private final Callable<T> callable;
 
   public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
-      TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable,
-      long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
-      int startLogErrorsCnt) {
+      TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
+      Callable<T> callable, long pauseNs, int maxAttempts, long operationTimeoutNs,
+      long rpcTimeoutNs, int startLogErrorsCnt) {
     super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
-        startLogErrorsCnt);
+      startLogErrorsCnt);
     this.tableName = tableName;
     this.row = row;
+    this.replicaId = replicaId;
     this.locateType = locateType;
     this.callable = callable;
   }
@@ -67,23 +72,22 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
       stub = conn.getRegionServerStub(loc.getServerName());
     } catch (IOException e) {
       onError(e,
-        () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row)
-            + "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
-        err -> conn.getLocator().updateCachedLocation(loc, err));
+        () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) +
+          "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
+        err -> conn.getLocator().updateCachedLocationOnError(loc, err));
       return;
     }
     resetCallTimeout();
-    callable.call(controller, loc, stub).whenComplete(
-      (result, error) -> {
-        if (error != null) {
-          onError(error,
-            () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in "
-                + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
-            err -> conn.getLocator().updateCachedLocation(loc, err));
-          return;
-        }
-        future.complete(result);
-      });
+    callable.call(controller, loc, stub).whenComplete((result, error) -> {
+      if (error != null) {
+        onError(error,
+          () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
+            loc.getRegion().getEncodedName() + " of " + tableName + " failed",
+          err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+        return;
+      }
+      future.complete(result);
+    });
   }
 
   @Override
@@ -98,18 +102,17 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
     } else {
       locateTimeoutNs = -1L;
     }
-    conn.getLocator()
-        .getRegionLocation(tableName, row, locateType, locateTimeoutNs)
-        .whenComplete(
-          (loc, error) -> {
-            if (error != null) {
-              onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName
-                  + " failed", err -> {
-              });
-              return;
-            }
-            call(loc);
-          });
+    addListener(
+      conn.getLocator().getRegionLocation(tableName, row, replicaId, locateType, locateTimeoutNs),
+      (loc, error) -> {
+        if (error != null) {
+          onError(error,
+            () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
+            });
+          return;
+        }
+        call(loc);
+      });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
index dbfcef5..3bda38e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.client;
 
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -55,5 +54,30 @@ public interface AsyncTableRegionLocator {
    * @param row Row to find.
    * @param reload true to reload information or false to use cached information
    */
-  CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload);
+  default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
+    return getRegionLocation(row, RegionReplicaUtil.DEFAULT_REPLICA_ID, reload);
+  }
+
+  /**
+   * Finds the region with the given <code>replicaId</code> on which the given row is being served.
+   * <p>
+   * Returns the location of the region with the given <code>replicaId</code> to which the row
+   * belongs.
+   * @param row Row to find.
+   * @param replicaId the replica id of the region
+   */
+  default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId) {
+    return getRegionLocation(row, replicaId, false);
+  }
+
+  /**
+   * Finds the region with the given <code>replicaId</code> on which the given row is being served.
+   * <p>
+   * Returns the location of the region with the given <code>replicaId</code> to which the row
+   * belongs.
+   * @param row Row to find.
+   * @param replicaId the replica id of the region
+   * @param reload true to reload information or false to use cached information
+   */
+  CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
index 7d199df..465a411 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java
@@ -44,7 +44,9 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
   }
 
   @Override
-  public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
-    return locator.getRegionLocation(tableName, row, RegionLocateType.CURRENT, reload, -1L);
+  public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId,
+      boolean reload) {
+    return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload,
+      -1L);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index d996004..55c62e7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -38,6 +38,9 @@ public class ConnectionConfiguration {
   public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second
   public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
   public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
+  public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND =
+    "hbase.client.primaryCallTimeout.get";
+  public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 10ms
 
   private final long writeBufferSize;
   private final long writeBufferPeriodicFlushTimeoutMs;
@@ -86,7 +89,7 @@ public class ConnectionConfiguration {
             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
 
     this.primaryCallTimeoutMicroSecond =
-        conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms
+      conf.getInt(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT);
 
     this.replicaCallTimeoutMicroSecondScan =
         conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms


[4/4] hbase git commit: HBASE-18569 Add prefetch support for async region locator

Posted by zh...@apache.org.
HBASE-18569 Add prefetch support for async region locator


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4caf2fb0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4caf2fb0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4caf2fb0

Branch: refs/heads/branch-2.0
Commit: 4caf2fb0d848821d96b98dc449589eae0124b2b7
Parents: 40d2787
Author: zhangduo <zh...@apache.org>
Authored: Fri Jun 22 08:48:33 2018 +0800
Committer: Duo Zhang <zh...@apache.org>
Committed: Thu Jan 3 10:55:06 2019 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncNonMetaRegionLocator.java | 75 +++++++++++++++---
 .../client/TestAsyncTableLocatePrefetch.java    | 82 ++++++++++++++++++++
 2 files changed, 145 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4caf2fb0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index f6d74a5..7e3d56c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -52,6 +52,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
 /**
  * The asynchronous locator for regions other than meta.
  */
@@ -60,15 +62,23 @@ class AsyncNonMetaRegionLocator {
 
   private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class);
 
+  @VisibleForTesting
   static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
     "hbase.client.meta.max.concurrent.locate.per.table";
 
   private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
 
+  @VisibleForTesting
+  static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
+
+  private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
+
   private final AsyncConnectionImpl conn;
 
   private final int maxConcurrentLocateRequestPerTable;
 
+  private final int locatePrefetchLimit;
+
   private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
 
   private static final class LocateRequest {
@@ -168,6 +178,8 @@ class AsyncNonMetaRegionLocator {
     this.conn = conn;
     this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
       MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
+    this.locatePrefetchLimit =
+      conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
   }
 
   private TableCache getTableCache(TableName tableName) {
@@ -223,9 +235,7 @@ class AsyncNonMetaRegionLocator {
       justification = "Called by lambda expression")
   private void addToCache(HRegionLocation loc) {
     addToCache(getTableCache(loc.getRegion().getTable()), loc);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Try adding " + loc + " to cache");
-    }
+    LOG.trace("Try adding {} to cache", loc);
   }
 
   private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
@@ -271,8 +281,10 @@ class AsyncNonMetaRegionLocator {
   // return whether we should stop the scan
   private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
     RegionLocations locs = MetaTableAccessor.getRegionLocations(result);
-    LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
-      Bytes.toStringBinary(req.row), req.locateType, locs);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
+        Bytes.toStringBinary(req.row), req.locateType, locs);
+    }
 
     if (locs == null || locs.getDefaultRegionLocation() == null) {
       complete(tableName, req, null,
@@ -294,8 +306,8 @@ class AsyncNonMetaRegionLocator {
     if (loc.getServerName() == null) {
       complete(tableName, req, null,
         new IOException(
-            String.format("No server address listed for region '%s', row='%s', locateType=%s",
-              info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType)));
+          String.format("No server address listed for region '%s', row='%s', locateType=%s",
+            info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType)));
       return true;
     }
     complete(tableName, req, loc, null);
@@ -361,7 +373,7 @@ class AsyncNonMetaRegionLocator {
       RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
     conn.getTable(META_TABLE_NAME)
       .scan(new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
-        .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
+        .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
         .setReadType(ReadType.PREAD), new AdvancedScanResultConsumer() {
 
           private boolean completeNormally = false;
@@ -385,12 +397,41 @@ class AsyncNonMetaRegionLocator {
 
           @Override
           public void onNext(Result[] results, ScanController controller) {
-            for (Result result : results) {
-              tableNotFound = false;
-              if (onScanNext(tableName, req, result)) {
+            if (results.length == 0) {
+              return;
+            }
+            tableNotFound = false;
+            int i = 0;
+            for (; i < results.length; i++) {
+              if (onScanNext(tableName, req, results[i])) {
                 completeNormally = true;
                 controller.terminate();
-                return;
+                i++;
+                break;
+              }
+            }
+            // Add the remaining results into cache
+            if (i < results.length) {
+              TableCache tableCache = getTableCache(tableName);
+              for (; i < results.length; i++) {
+                RegionLocations locs = MetaTableAccessor.getRegionLocations(results[i]);
+                if (locs == null) {
+                  continue;
+                }
+                HRegionLocation loc = locs.getDefaultRegionLocation();
+                if (loc == null) {
+                  continue;
+                }
+                RegionInfo info = loc.getRegion();
+                if (info == null || info.isOffline() || info.isSplitParent() ||
+                  loc.getServerName() == null) {
+                  continue;
+                }
+                if (addToCache(tableCache, loc)) {
+                  synchronized (tableCache) {
+                    tableCache.clearCompletedRequests(Optional.of(loc));
+                  }
+                }
               }
             }
           }
@@ -482,4 +523,14 @@ class AsyncNonMetaRegionLocator {
       }
     }
   }
+
+  // only used for testing whether we have cached the location for a region.
+  @VisibleForTesting
+  HRegionLocation getRegionLocationInCache(TableName tableName, byte[] row) {
+    TableCache tableCache = cache.get(tableName);
+    if (tableCache == null) {
+      return null;
+    }
+    return locateRowInCache(tableCache, tableName, row);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4caf2fb0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
new file mode 100644
index 0000000..13d8000
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableLocatePrefetch {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncTableLocatePrefetch.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static AsyncConnection CONN;
+
+  private static AsyncNonMetaRegionLocator LOCATOR;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(AsyncNonMetaRegionLocator.LOCATE_PREFETCH_LIMIT, 100);
+    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+    LOCATOR = new AsyncNonMetaRegionLocator((AsyncConnectionImpl) CONN);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    Closeables.close(CONN, true);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws InterruptedException, ExecutionException {
+    assertNotNull(LOCATOR
+      .getRegionLocation(TABLE_NAME, Bytes.toBytes("zzz"), RegionLocateType.CURRENT, false).get());
+    // we finish the request before we adding the remaining results to cache so sleep a bit here
+    Thread.sleep(1000);
+    // confirm that the locations of all the regions have been cached.
+    assertNotNull(LOCATOR.getRegionLocationInCache(TABLE_NAME, Bytes.toBytes("aaa")));
+    for (byte[] row : HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE) {
+      assertNotNull(LOCATOR.getRegionLocationInCache(TABLE_NAME, row));
+    }
+  }
+}