You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/11 06:44:01 UTC

[hbase] branch branch-2 updated: HBASE-21663 Add replica scan support

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new a09dffd  HBASE-21663 Add replica scan support
a09dffd is described below

commit a09dffd106a2aec3343cd80b6b8c31332df43c0e
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Fri Jan 11 10:49:33 2019 +0800

    HBASE-21663 Add replica scan support
---
 .../client/AsyncAdminRequestRetryingCaller.java    |  15 +-
 .../hbase/client/AsyncBatchRpcRetryingCaller.java  |   6 +-
 .../hadoop/hbase/client/AsyncClientScanner.java    | 103 +++++++-----
 .../hbase/client/AsyncConnectionConfiguration.java |  21 +++
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |   5 +-
 .../AsyncMasterRequestRpcRetryingCaller.java       |  16 +-
 .../hbase/client/AsyncRpcRetryingCaller.java       |  15 +-
 .../client/AsyncRpcRetryingCallerFactory.java      |   6 +-
 .../AsyncScanSingleRegionRpcRetryingCaller.java    |   9 +-
 .../AsyncServerRequestRpcRetryingCaller.java       |  16 +-
 .../AsyncSingleRequestRpcRetryingCaller.java       |  10 +-
 .../hbase/client/ConnectionConfiguration.java      |   9 +-
 .../hadoop/hbase/client/ConnectionUtils.java       | 101 ++++++++++-
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |  93 ++--------
 ... AbstractTestAsyncTableRegionReplicasRead.java} | 102 +++++------
 .../client/TestAsyncTableRegionReplicasGet.java    | 187 +--------------------
 .../client/TestAsyncTableRegionReplicasScan.java   |  76 +++++++++
 17 files changed, 370 insertions(+), 420 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
index 2d634b9..cf31d79 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
@@ -19,11 +19,12 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 
 /**
@@ -40,7 +41,7 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
   private final Callable<T> callable;
   private ServerName serverName;
 
-  public AsyncAdminRequestRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+  public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
       long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
       int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
     super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
@@ -69,10 +70,4 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
       future.complete(result);
     });
   }
-
-  @Override
-  CompletableFuture<T> call() {
-    doCall();
-    return future;
-  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index e268b2e..55590bd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -56,7 +56,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
 
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@@ -80,7 +80,7 @@ class AsyncBatchRpcRetryingCaller<T> {
 
   private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class);
 
-  private final HashedWheelTimer retryTimer;
+  private final Timer retryTimer;
 
   private final AsyncConnectionImpl conn;
 
@@ -130,7 +130,7 @@ class AsyncBatchRpcRetryingCaller<T> {
     }
   }
 
-  public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+  public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
       TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts,
       long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index ac2d3d7..6d4aefd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -19,17 +19,27 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
+
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
@@ -59,6 +69,8 @@ class AsyncClientScanner {
 
   private final AsyncConnectionImpl conn;
 
+  private final Timer retryTimer;
+
   private final long pauseNs;
 
   private final int maxAttempts;
@@ -72,7 +84,7 @@ class AsyncClientScanner {
   private final ScanResultCache resultCache;
 
   public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
-      AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long scanTimeoutNs,
+      AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, int maxAttempts, long scanTimeoutNs,
       long rpcTimeoutNs, int startLogErrorsCnt) {
     if (scan.getStartRow() == null) {
       scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
@@ -84,6 +96,7 @@ class AsyncClientScanner {
     this.consumer = consumer;
     this.tableName = tableName;
     this.conn = conn;
+    this.retryTimer = retryTimer;
     this.pauseNs = pauseNs;
     this.maxAttempts = maxAttempts;
     this.scanTimeoutNs = scanTimeoutNs;
@@ -120,20 +133,19 @@ class AsyncClientScanner {
     }
   }
 
-  private int openScannerTries;
+  private final AtomicInteger openScannerTries = new AtomicInteger();
 
   private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
       HRegionLocation loc, ClientService.Interface stub) {
     boolean isRegionServerRemote = isRemote(loc.getHostname());
     incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
-    if (openScannerTries > 1) {
+    if (openScannerTries.getAndIncrement() > 1) {
       incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
     }
-    openScannerTries++;
     CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
     try {
-      ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(),
-        scan, scan.getCaching(), false);
+      ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan,
+        scan.getCaching(), false);
       stub.scan(controller, request, resp -> {
         if (controller.failed()) {
           future.completeExceptionally(controller.getFailed());
@@ -148,40 +160,53 @@ class AsyncClientScanner {
   }
 
   private void startScan(OpenScannerResponse resp) {
-    conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
-        .remote(resp.isRegionServerRemote)
-        .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
-        .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
-        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
-        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
-        .start(resp.controller, resp.resp).whenComplete((hasMore, error) -> {
-          if (error != null) {
-            consumer.onError(error);
-            return;
-          }
-          if (hasMore) {
-            openScanner();
-          } else {
-            consumer.onComplete();
-          }
-        });
+    addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
+      .location(resp.loc).remote(resp.isRegionServerRemote)
+      .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
+      .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
+      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+      .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
+      .start(resp.controller, resp.resp), (hasMore, error) -> {
+        if (error != null) {
+          consumer.onError(error);
+          return;
+        }
+        if (hasMore) {
+          openScanner();
+        } else {
+          consumer.onComplete();
+        }
+      });
+  }
+
+  private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
+    return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
+      .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
+      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+      .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
+      .call();
+  }
+
+  private long getPrimaryTimeoutNs() {
+    return TableName.isMetaTableName(tableName) ? conn.connConf.getPrimaryMetaScanTimeoutNs()
+      : conn.connConf.getPrimaryScanTimeoutNs();
   }
 
   private void openScanner() {
     incRegionCountMetrics(scanMetrics);
-    openScannerTries = 1;
-    conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
-        .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
-        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
-        .call().whenComplete((resp, error) -> {
-          if (error != null) {
-            consumer.onError(error);
-            return;
-          }
-          startScan(resp);
-        });
+    openScannerTries.set(1);
+    addListener(
+      timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
+        getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer),
+      (resp, error) -> {
+        if (error != null) {
+          consumer.onError(error);
+          return;
+        }
+        startScan(resp);
+      });
   }
 
   public void start() {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index fa051a5..84a5150 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -26,6 +26,8 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TI
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING;
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
@@ -41,6 +43,8 @@ import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRO
 import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
@@ -100,6 +104,10 @@ class AsyncConnectionConfiguration {
   // timeout, we will send request to secondaries.
   private final long primaryCallTimeoutNs;
 
+  private final long primaryScanTimeoutNs;
+
+  private final long primaryMetaScanTimeoutNs;
+
   @SuppressWarnings("deprecation")
   AsyncConnectionConfiguration(Configuration conf) {
     this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
@@ -132,6 +140,11 @@ class AsyncConnectionConfiguration {
         WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
     this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
       conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
+    this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
+      conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT));
+    this.primaryMetaScanTimeoutNs =
+      TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
+        HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
   }
 
   long getMetaOperationTimeoutNs() {
@@ -193,4 +206,12 @@ class AsyncConnectionConfiguration {
   long getPrimaryCallTimeoutNs() {
     return primaryCallTimeoutNs;
   }
+
+  long getPrimaryScanTimeoutNs() {
+    return primaryScanTimeoutNs;
+  }
+
+  long getPrimaryMetaScanTimeoutNs() {
+    return primaryMetaScanTimeoutNs;
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 1b99f84..3cbd950 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -275,7 +275,7 @@ class AsyncConnectionImpl implements AsyncConnection {
 
       @Override
       public AsyncTable<AdvancedScanResultConsumer> build() {
-        return new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
+        return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
       }
     };
   }
@@ -287,7 +287,8 @@ class AsyncConnectionImpl implements AsyncConnection {
 
       @Override
       public AsyncTable<ScanResultConsumer> build() {
-        RawAsyncTableImpl rawTable = new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
+        RawAsyncTableImpl rawTable =
+          new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
         return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
       }
     };
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
index 1c8a0e1..a52e799 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
@@ -17,12 +17,12 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-
 import java.util.concurrent.CompletableFuture;
-
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
 
 /**
@@ -39,7 +39,7 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
 
   private final Callable<T> callable;
 
-  public AsyncMasterRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+  public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
       Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
       long rpcTimeoutNs, int startLogErrorsCnt) {
     super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
@@ -66,10 +66,4 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
       });
     });
   }
-
-  @Override
-  public CompletableFuture<T> call() {
-    doCall();
-    return future;
-  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index e03049a..5383ff8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -31,20 +30,21 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
 
 @InterfaceAudience.Private
 public abstract class AsyncRpcRetryingCaller<T> {
 
   private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcRetryingCaller.class);
 
-  private final HashedWheelTimer retryTimer;
+  private final Timer retryTimer;
 
   private final long startNs;
 
@@ -68,9 +68,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
 
   protected final HBaseRpcController controller;
 
-  public AsyncRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
-      long pauseNs, int maxAttempts, long operationTimeoutNs,
-      long rpcTimeoutNs, int startLogErrorsCnt) {
+  public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs,
+      int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.conn = conn;
     this.pauseNs = pauseNs;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index a660e74..f019fc4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
@@ -45,9 +45,9 @@ class AsyncRpcRetryingCallerFactory {
 
   private final AsyncConnectionImpl conn;
 
-  private final HashedWheelTimer retryTimer;
+  private final Timer retryTimer;
 
-  public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
+  public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) {
     this.conn = conn;
     this.retryTimer = retryTimer;
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 9fdb284..584bfac 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -34,7 +34,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NotServingRegionException;
@@ -49,9 +48,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@@ -72,7 +73,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   private static final Logger LOG =
       LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
 
-  private final HashedWheelTimer retryTimer;
+  private final Timer retryTimer;
 
   private final Scan scan;
 
@@ -297,7 +298,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     }
   }
 
-  public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
+  public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer,
       AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId,
       ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub,
       HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs,
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
index 20b7c31..54b055a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
@@ -17,14 +17,14 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 
 /**
@@ -42,7 +42,7 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
   private final Callable<T> callable;
   private ServerName serverName;
 
-  public AsyncServerRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+  public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
       long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
       int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
     super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
@@ -71,10 +71,4 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
       future.complete(result);
     });
   }
-
-  @Override
-  CompletableFuture<T> call() {
-    doCall();
-    return future;
-  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 1a52e5c..4b60b18 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 
@@ -53,7 +53,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
 
   private final Callable<T> callable;
 
-  public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+  public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
       TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
       Callable<T> callable, long pauseNs, int maxAttempts, long operationTimeoutNs,
       long rpcTimeoutNs, int startLogErrorsCnt) {
@@ -114,10 +114,4 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
         call(loc);
       });
   }
-
-  @Override
-  public CompletableFuture<T> call() {
-    doCall();
-    return future;
-  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 55c62e7..53859c2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -41,6 +41,9 @@ public class ConnectionConfiguration {
   public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND =
     "hbase.client.primaryCallTimeout.get";
   public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 10ms
+  public static final String PRIMARY_SCAN_TIMEOUT_MICROSECOND =
+    "hbase.client.replicaCallTimeout.scan";
+  public static final int PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT = 1000000; // 1s
 
   private final long writeBufferSize;
   private final long writeBufferPeriodicFlushTimeoutMs;
@@ -92,11 +95,11 @@ public class ConnectionConfiguration {
       conf.getInt(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT);
 
     this.replicaCallTimeoutMicroSecondScan =
-        conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms
+      conf.getInt(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT);
 
     this.metaReplicaCallTimeoutMicroSecondScan =
-        conf.getInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
-            HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT);
+      conf.getInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
+        HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT);
 
     this.retries = conf.getInt(
        HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 63ef865..122d754 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
@@ -31,11 +32,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
@@ -53,6 +56,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@@ -123,9 +127,8 @@ public final class ConnectionUtils {
   }
 
   /**
-   * A ClusterConnection that will short-circuit RPC making direct invocations against the
-   * localhost if the invocation target is 'this' server; save on network and protobuf
-   * invocations.
+   * A ClusterConnection that will short-circuit RPC making direct invocations against the localhost
+   * if the invocation target is 'this' server; save on network and protobuf invocations.
    */
   // TODO This has to still do PB marshalling/unmarshalling stuff. Check how/whether we can avoid.
   @VisibleForTesting // Class is visible so can assert we are short-circuiting when expected.
@@ -136,8 +139,7 @@ public final class ConnectionUtils {
 
     private ShortCircuitingClusterConnection(Configuration conf, ExecutorService pool, User user,
         ServerName serverName, AdminService.BlockingInterface admin,
-        ClientService.BlockingInterface client)
-    throws IOException {
+        ClientService.BlockingInterface client) throws IOException {
       super(conf, pool, user);
       this.serverName = serverName;
       this.localHostAdmin = admin;
@@ -157,7 +159,8 @@ public final class ConnectionUtils {
     @Override
     public MasterKeepAliveConnection getMaster() throws IOException {
       if (this.localHostClient instanceof MasterService.BlockingInterface) {
-        return new ShortCircuitMasterConnection((MasterService.BlockingInterface)this.localHostClient);
+        return new ShortCircuitMasterConnection(
+          (MasterService.BlockingInterface) this.localHostClient);
       }
       return super.getMaster();
     }
@@ -335,8 +338,8 @@ public final class ConnectionUtils {
       return result;
     }
     Cell[] rawCells = result.rawCells();
-    int index =
-        Arrays.binarySearch(rawCells, keepCellsAfter, CellComparator.getInstance()::compareWithoutRow);
+    int index = Arrays.binarySearch(rawCells, keepCellsAfter,
+      CellComparator.getInstance()::compareWithoutRow);
     if (index < 0) {
       index = -index - 1;
     } else {
@@ -406,7 +409,7 @@ public final class ConnectionUtils {
 
   static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
     return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
-        .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
+      .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
   }
 
   public static ScanResultCache createScanResultCache(Scan scan) {
@@ -489,4 +492,84 @@ public final class ConnectionUtils {
     }
     scanMetrics.countOfRegions.incrementAndGet();
   }
+
+  /**
+   * Connect the two futures, if the src future is done, then mark the dst future as done. And if
+   * the dst future is done, then cancel the src future. This is used for timeline consistent read.
+   */
+  private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
+    addListener(srcFuture, (r, e) -> {
+      if (e != null) {
+        dstFuture.completeExceptionally(e);
+      } else {
+        dstFuture.complete(r);
+      }
+    });
+    // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
+    // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst
+    // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in
+    // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the
+    // tie.
+    addListener(dstFuture, (r, e) -> srcFuture.cancel(false));
+  }
+
+  private static <T> void sendRequestsToSecondaryReplicas(
+      Function<Integer, CompletableFuture<T>> requestReplica, RegionLocations locs,
+      CompletableFuture<T> future) {
+    if (future.isDone()) {
+      // do not send requests to secondary replicas if the future is done, i.e, the primary request
+      // has already been finished.
+      return;
+    }
+    for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
+      CompletableFuture<T> secondaryFuture = requestReplica.apply(replicaId);
+      connect(secondaryFuture, future);
+    }
+  }
+
+  static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locator,
+      TableName tableName, Query query, byte[] row, RegionLocateType locateType,
+      Function<Integer, CompletableFuture<T>> requestReplica, long rpcTimeoutNs,
+      long primaryCallTimeoutNs, Timer retryTimer) {
+    if (query.getConsistency() == Consistency.STRONG) {
+      return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
+    }
+    // user specifies a replica id explicitly, just send request to the specific replica
+    if (query.getReplicaId() >= 0) {
+      return requestReplica.apply(query.getReplicaId());
+    }
+    // Timeline consistent read, where we may send requests to other region replicas
+    CompletableFuture<T> primaryFuture = requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
+    CompletableFuture<T> future = new CompletableFuture<>();
+    connect(primaryFuture, future);
+    long startNs = System.nanoTime();
+    // after the getRegionLocations, all the locations for the replicas of this region should have
+    // been cached, so it is not big deal to locate them again when actually sending requests to
+    // these replicas.
+    addListener(locator.getRegionLocations(tableName, row, locateType, false, rpcTimeoutNs),
+      (locs, error) -> {
+        if (error != null) {
+          LOG.warn(
+            "Failed to locate all the replicas for table={}, row='{}', locateType={}" +
+              " give up timeline consistent read",
+            tableName, Bytes.toStringBinary(row), locateType, error);
+          return;
+        }
+        if (locs.size() <= 1) {
+          LOG.warn(
+            "There are no secondary replicas for region {}, give up timeline consistent read",
+            locs.getDefaultRegionLocation().getRegion());
+          return;
+        }
+        long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
+        if (delayNs <= 0) {
+          sendRequestsToSecondaryReplicas(requestReplica, locs, future);
+        } else {
+          retryTimer.newTimeout(
+            timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future), delayNs,
+            TimeUnit.NANOSECONDS);
+        }
+      });
+    return future;
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 2ab9f6a..3a94566 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import com.google.protobuf.RpcChannel;
@@ -36,7 +37,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -45,11 +45,10 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@@ -77,10 +76,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
 @InterfaceAudience.Private
 class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
-
   private final AsyncConnectionImpl conn;
 
+  private final Timer retryTimer;
+
   private final TableName tableName;
 
   private final int defaultScannerCaching;
@@ -103,8 +102,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
   private final int startLogErrorsCnt;
 
-  RawAsyncTableImpl(AsyncConnectionImpl conn, AsyncTableBuilderBase<?> builder) {
+  RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) {
     this.conn = conn;
+    this.retryTimer = retryTimer;
     this.tableName = builder.tableName;
     this.rpcTimeoutNs = builder.rpcTimeoutNs;
     this.readRpcTimeoutNs = builder.readRpcTimeoutNs;
@@ -219,8 +219,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     return newCaller(row.getRow(), rpcTimeoutNs);
   }
 
-  private CompletableFuture<Result> get(Get get, int replicaId, long timeoutNs) {
-    return this.<Result> newCaller(get, timeoutNs)
+  private CompletableFuture<Result> get(Get get, int replicaId) {
+    return this.<Result> newCaller(get, readRpcTimeoutNs)
       .action((controller, loc, stub) -> RawAsyncTableImpl
         .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
           RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
@@ -228,78 +228,11 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
       .replicaId(replicaId).call();
   }
 
-  // Connect the two futures, if the src future is done, then mark the dst future as done. And if
-  // the dst future is done, then cancel the src future. This is used for timeline consistent read.
-  private <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
-    addListener(srcFuture, (r, e) -> {
-      if (e != null) {
-        dstFuture.completeExceptionally(e);
-      } else {
-        dstFuture.complete(r);
-      }
-    });
-    // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
-    // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst
-    // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in
-    // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the
-    // tie.
-    addListener(dstFuture, (r, e) -> srcFuture.cancel(false));
-  }
-
-  private void timelineConsistentGet(Get get, RegionLocations locs,
-      CompletableFuture<Result> future) {
-    if (future.isDone()) {
-      // do not send requests to secondary replicas if the future is done, i.e, the primary request
-      // has already been finished.
-      return;
-    }
-    for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
-      CompletableFuture<Result> secondaryFuture = get(get, replicaId, readRpcTimeoutNs);
-      connect(secondaryFuture, future);
-    }
-  }
-
   @Override
   public CompletableFuture<Result> get(Get get) {
-    if (get.getConsistency() == Consistency.STRONG) {
-      return get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
-    }
-    // user specifies a replica id explicitly, just send request to the specific replica
-    if (get.getReplicaId() >= 0) {
-      return get(get, get.getReplicaId(), readRpcTimeoutNs);
-    }
-
-    // Timeline consistent read, where we may send requests to other region replicas
-    CompletableFuture<Result> primaryFuture =
-      get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
-    CompletableFuture<Result> future = new CompletableFuture<>();
-    connect(primaryFuture, future);
-    long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
-    long startNs = System.nanoTime();
-    addListener(conn.getLocator().getRegionLocations(tableName, get.getRow(),
-      RegionLocateType.CURRENT, false, readRpcTimeoutNs), (locs, error) -> {
-        if (error != null) {
-          LOG.warn(
-            "Failed to locate all the replicas for table={}, row='{}'," +
-              " give up timeline consistent read",
-            tableName, Bytes.toStringBinary(get.getRow()), error);
-          return;
-        }
-        if (locs.size() <= 1) {
-          LOG.warn(
-            "There are no secondary replicas for region {}," + " give up timeline consistent read",
-            locs.getDefaultRegionLocation().getRegion());
-          return;
-        }
-        long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
-        if (delayNs <= 0) {
-          timelineConsistentGet(get, locs, future);
-        } else {
-          AsyncConnectionImpl.RETRY_TIMER.newTimeout(
-            timeout -> timelineConsistentGet(get, locs, future), delayNs, TimeUnit.NANOSECONDS);
-        }
-      });
-    return future;
+    return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
+      RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
+      conn.connConf.getPrimaryCallTimeoutNs(), retryTimer);
   }
 
   @Override
@@ -494,8 +427,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
   }
 
   public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
-    new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
-      maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
+    new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
+      pauseNs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
   }
 
   private long resultSize2CacheSize(long maxResultSize) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableRegionReplicasRead.java
similarity index 72%
copy from hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
copy to hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableRegionReplicasRead.java
index 2117116..c70af51 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableRegionReplicasRead.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
@@ -32,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
@@ -41,45 +39,32 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 
-@RunWith(Parameterized.class)
-@Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncTableRegionReplicasGet {
+public abstract class AbstractTestAsyncTableRegionReplicasRead {
 
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasGet.class);
+  protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  protected static TableName TABLE_NAME = TableName.valueOf("async");
 
-  private static TableName TABLE_NAME = TableName.valueOf("async");
+  protected static byte[] FAMILY = Bytes.toBytes("cf");
 
-  private static byte[] FAMILY = Bytes.toBytes("cf");
+  protected static byte[] QUALIFIER = Bytes.toBytes("cq");
 
-  private static byte[] QUALIFIER = Bytes.toBytes("cq");
+  protected static byte[] ROW = Bytes.toBytes("row");
 
-  private static byte[] ROW = Bytes.toBytes("row");
+  protected static byte[] VALUE = Bytes.toBytes("value");
 
-  private static byte[] VALUE = Bytes.toBytes("value");
+  protected static int REPLICA_COUNT = 3;
 
-  private static int REPLICA_COUNT = 3;
-
-  private static AsyncConnection ASYNC_CONN;
+  protected static AsyncConnection ASYNC_CONN;
 
   @Rule
   public TestName testName = new TestName();
@@ -97,13 +82,14 @@ public class TestAsyncTableRegionReplicasGet {
 
   @Parameters
   public static List<Object[]> params() {
-    return Arrays.asList(new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getRawTable },
-      new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getTable });
+    return Arrays.asList(
+      new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getRawTable },
+      new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getTable });
   }
 
-  private static volatile boolean FAIL_PRIMARY_GET = false;
+  protected static volatile boolean FAIL_PRIMARY_GET = false;
 
-  private static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
+  protected static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
     new ConcurrentHashMap<>();
 
   public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
@@ -113,9 +99,8 @@ public class TestAsyncTableRegionReplicasGet {
       return Optional.of(this);
     }
 
-    @Override
-    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
-        List<Cell> result) throws IOException {
+    private void recordAndTryFail(ObserverContext<RegionCoprocessorEnvironment> c)
+        throws IOException {
       RegionInfo region = c.getEnvironment().getRegionInfo();
       if (!region.getTable().equals(TABLE_NAME)) {
         return;
@@ -126,12 +111,24 @@ public class TestAsyncTableRegionReplicasGet {
         throw new IOException("Inject error");
       }
     }
+
+    @Override
+    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
+        List<Cell> result) throws IOException {
+      recordAndTryFail(c);
+    }
+
+    @Override
+    public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
+        throws IOException {
+      recordAndTryFail(c);
+    }
   }
 
-  private static boolean allReplicasHaveRow() throws IOException {
+  private static boolean allReplicasHaveRow(byte[] row) throws IOException {
     for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
       for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
-        if (region.get(new Get(ROW), false).isEmpty()) {
+        if (region.get(new Get(row), false).isEmpty()) {
           return false;
         }
       }
@@ -139,30 +136,40 @@ public class TestAsyncTableRegionReplicasGet {
     return true;
   }
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
+  protected static void startClusterAndCreateTable() throws Exception {
     // 10 mins
     TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
       TimeUnit.MINUTES.toMillis(10));
+    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
+      TimeUnit.MINUTES.toMillis(10));
+    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+      TimeUnit.MINUTES.toMillis(10));
+
     // 1 second
     TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND,
       TimeUnit.SECONDS.toMicros(1));
+    TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND,
+      TimeUnit.SECONDS.toMicros(1));
+
     // set a small pause so we will retry very quickly
     TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
+
     // infinite retry
     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
+
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
       .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
       .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
     TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
     ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
-    AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
-    table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
+  }
+
+  protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException {
     // this is the fastest way to let all replicas have the row
     TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
     TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
-    TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow());
+    TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow(row));
   }
 
   @AfterClass
@@ -171,26 +178,26 @@ public class TestAsyncTableRegionReplicasGet {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  private static int getSecondaryGetCount() {
+  protected static int getSecondaryGetCount() {
     return REPLICA_ID_TO_COUNT.entrySet().stream()
       .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
       .mapToInt(e -> e.getValue().get()).sum();
   }
 
-  private static int getPrimaryGetCount() {
+  protected static int getPrimaryGetCount() {
     AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
     return primaryGetCount != null ? primaryGetCount.get() : 0;
   }
 
+  // replicaId = -1 means do not set replica
+  protected abstract void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception;
+
   @Test
   public void testNoReplicaRead() throws Exception {
     FAIL_PRIMARY_GET = false;
     REPLICA_ID_TO_COUNT.clear();
     AsyncTable<?> table = getTable.get();
-    Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
-    for (int i = 0; i < 1000; i++) {
-      assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
-    }
+    readAndCheck(table, -1);
     // the primary region is fine and the primary timeout is 1 second which is long enough, so we
     // should not send any requests to secondary replicas even if the consistency is timeline.
     Thread.sleep(5000);
@@ -202,10 +209,9 @@ public class TestAsyncTableRegionReplicasGet {
     // fail the primary get request
     FAIL_PRIMARY_GET = true;
     REPLICA_ID_TO_COUNT.clear();
-    Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
     // make sure that we could still get the value from secondary replicas
     AsyncTable<?> table = getTable.get();
-    assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+    readAndCheck(table, -1);
     // make sure that the primary request has been canceled
     Thread.sleep(5000);
     int count = getPrimaryGetCount();
@@ -217,11 +223,9 @@ public class TestAsyncTableRegionReplicasGet {
   public void testReadSpecificReplica() throws Exception {
     FAIL_PRIMARY_GET = false;
     REPLICA_ID_TO_COUNT.clear();
-    Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
     AsyncTable<?> table = getTable.get();
     for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
-      get.setReplicaId(replicaId);
-      assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+      readAndCheck(table, replicaId);
       assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
index 2117116..3e1d994 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
@@ -18,211 +18,38 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
-import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
 @Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncTableRegionReplicasGet {
+public class TestAsyncTableRegionReplicasGet extends AbstractTestAsyncTableRegionReplicasRead {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasGet.class);
 
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  private static TableName TABLE_NAME = TableName.valueOf("async");
-
-  private static byte[] FAMILY = Bytes.toBytes("cf");
-
-  private static byte[] QUALIFIER = Bytes.toBytes("cq");
-
-  private static byte[] ROW = Bytes.toBytes("row");
-
-  private static byte[] VALUE = Bytes.toBytes("value");
-
-  private static int REPLICA_COUNT = 3;
-
-  private static AsyncConnection ASYNC_CONN;
-
-  @Rule
-  public TestName testName = new TestName();
-
-  @Parameter
-  public Supplier<AsyncTable<?>> getTable;
-
-  private static AsyncTable<?> getRawTable() {
-    return ASYNC_CONN.getTable(TABLE_NAME);
-  }
-
-  private static AsyncTable<?> getTable() {
-    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
-  }
-
-  @Parameters
-  public static List<Object[]> params() {
-    return Arrays.asList(new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getRawTable },
-      new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getTable });
-  }
-
-  private static volatile boolean FAIL_PRIMARY_GET = false;
-
-  private static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
-    new ConcurrentHashMap<>();
-
-  public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
-
-    @Override
-    public Optional<RegionObserver> getRegionObserver() {
-      return Optional.of(this);
-    }
-
-    @Override
-    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
-        List<Cell> result) throws IOException {
-      RegionInfo region = c.getEnvironment().getRegionInfo();
-      if (!region.getTable().equals(TABLE_NAME)) {
-        return;
-      }
-      REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger())
-        .incrementAndGet();
-      if (region.getRegionId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) {
-        throw new IOException("Inject error");
-      }
-    }
-  }
-
-  private static boolean allReplicasHaveRow() throws IOException {
-    for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
-      for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
-        if (region.get(new Get(ROW), false).isEmpty()) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    // 10 mins
-    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
-      TimeUnit.MINUTES.toMillis(10));
-    // 1 second
-    TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND,
-      TimeUnit.SECONDS.toMicros(1));
-    // set a small pause so we will retry very quickly
-    TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
-    // infinite retry
-    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
-    TEST_UTIL.startMiniCluster(3);
-    TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
-      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
-      .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
-    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
-    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+    startClusterAndCreateTable();
     AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
     table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
-    // this is the fastest way to let all replicas have the row
-    TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
-    TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
-    TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow());
+    waitUntilAllReplicasHaveRow(ROW);
   }
 
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    IOUtils.closeQuietly(ASYNC_CONN);
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  private static int getSecondaryGetCount() {
-    return REPLICA_ID_TO_COUNT.entrySet().stream()
-      .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
-      .mapToInt(e -> e.getValue().get()).sum();
-  }
-
-  private static int getPrimaryGetCount() {
-    AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
-    return primaryGetCount != null ? primaryGetCount.get() : 0;
-  }
-
-  @Test
-  public void testNoReplicaRead() throws Exception {
-    FAIL_PRIMARY_GET = false;
-    REPLICA_ID_TO_COUNT.clear();
-    AsyncTable<?> table = getTable.get();
-    Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
-    for (int i = 0; i < 1000; i++) {
-      assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
-    }
-    // the primary region is fine and the primary timeout is 1 second which is long enough, so we
-    // should not send any requests to secondary replicas even if the consistency is timeline.
-    Thread.sleep(5000);
-    assertEquals(0, getSecondaryGetCount());
-  }
-
-  @Test
-  public void testReplicaRead() throws Exception {
-    // fail the primary get request
-    FAIL_PRIMARY_GET = true;
-    REPLICA_ID_TO_COUNT.clear();
+  @Override
+  protected void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception {
     Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
-    // make sure that we could still get the value from secondary replicas
-    AsyncTable<?> table = getTable.get();
-    assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
-    // make sure that the primary request has been canceled
-    Thread.sleep(5000);
-    int count = getPrimaryGetCount();
-    Thread.sleep(10000);
-    assertEquals(count, getPrimaryGetCount());
-  }
-
-  @Test
-  public void testReadSpecificReplica() throws Exception {
-    FAIL_PRIMARY_GET = false;
-    REPLICA_ID_TO_COUNT.clear();
-    Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
-    AsyncTable<?> table = getTable.get();
-    for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
+    if (replicaId >= 0) {
       get.setReplicaId(replicaId);
-      assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
-      assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());
     }
+    assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasScan.java
new file mode 100644
index 0000000..dd5c8e5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasScan.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableRegionReplicasScan extends AbstractTestAsyncTableRegionReplicasRead {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasScan.class);
+
+  private static int ROW_COUNT = 1000;
+
+  private static byte[] getRow(int i) {
+    return Bytes.toBytes(String.format("%s-%03d", Bytes.toString(ROW), i));
+  }
+
+  private static byte[] getValue(int i) {
+    return Bytes.toBytes(String.format("%s-%03d", Bytes.toString(VALUE), i));
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    startClusterAndCreateTable();
+    AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
+    for (int i = 0; i < ROW_COUNT; i++) {
+      table.put(new Put(getRow(i)).addColumn(FAMILY, QUALIFIER, getValue(i))).get();
+    }
+    waitUntilAllReplicasHaveRow(getRow(ROW_COUNT - 1));
+  }
+
+  @Override
+  protected void readAndCheck(AsyncTable<?> table, int replicaId) throws IOException {
+    Scan scan = new Scan().setConsistency(Consistency.TIMELINE).setCaching(1);
+    if (replicaId >= 0) {
+      scan.setReplicaId(replicaId);
+    }
+    try (ResultScanner scanner = table.getScanner(scan)) {
+      for (int i = 0; i < 1000; i++) {
+        Result result = scanner.next();
+        assertNotNull(result);
+        assertArrayEquals(getValue(i), result.getValue(FAMILY, QUALIFIER));
+      }
+    }
+  }
+}